ワークフローのスケジュールとオーケストレーション

Databricks ワークフローには、Azure Databricks でデータ処理タスクをスケジュールおよび調整できるようにするツールがあります。 Databricks ワークフローを使用して、Databricks ジョブを構成します。

この記事では、Databricks ジョブを使用した運用ワークロードの管理に関連する概念および選択について紹介します。

Databricks ジョブとは

ジョブとは、Azure Databricks で運用ワークロードをスケジュールおよび調整するための主要な単位です。 ジョブには 1 つ以上のタスクが含まれます。 タスクとジョブにより、次の構成とデプロイを行うことができます。

  • Spark、SQL、OSS Python、ML、任意のコードなどのカスタム ロジック。
  • カスタム環境とライブラリを使用したコンピューティング リソース。
  • ワークロードを実行するためのスケジュールとトリガー。
  • タスク間の制御フローの条件付きロジック。

ジョブにより、タスク間のリレーションシップを定義するための手続き型アプローチが提供されます。 Delta Live Tables パイプラインでは、データセット変換の間のリレーションシップを定義するための宣言型アプローチが提供されます。 Delta Live Tables パイプラインは、タスクとしてジョブに含めることができます。 「ジョブの Delta Live Tables パイプライン タスク」を参照してください。

ジョブには、Databricks ノートブックを実行する 1 つのタスクから、条件付きロジックと依存関係を使用して実行される何千ものタスクまで、複雑さの異なるさまざまなものがあります。

ジョブを構成および実行する方法

ジョブを作成および実行するには、ジョブ UI、Databricks CLI を使用するか、Jobs API を呼び出します。 UI または API を使用して、失敗したジョブまたは取り消されたジョブを修復して再実行できます。 UI、CLI、API、通知 (メール、Webhook の宛先、Slack 通知など) を使用して、ジョブの実行結果を監視できます。

ジョブの構成と調整にコードとしてのインフラストラクチャ (IaC) アプローチを使用する場合は、Databricks アセット バンドル (DAB) を使用します。 バンドルには、ジョブとタスクの YAML 定義を含め、Databricks CLI を使用して管理できます。また、さまざまなターゲット ワークスペース (開発、ステージング、運用など) で共有および実行できます。 DAB を使用してジョブを構成および調整する方法については、「Databricks アセット バンドル」を参照してください。

Databricks CLI の使用については、「Databricks CLI とは」を参照してください。 Jobs API の使用方法については、「Jobs API」をご覧ください。

ジョブに必要な最小構成

Azure Databricks でのすべてのジョブには、以下が必要です。

  • 実行するロジックを含むソース コード (Databricks ノートブックなど)。
  • ロジックを実行するためのコンピューティング リソース。 コンピューティング リソースには、サーバーレス コンピューティング、クラシック ジョブ コンピューティング、または All-Purpose Compute があります。 「ジョブ用のコンピューティングを構成する」を参照してください。
  • ジョブをいつ実行するかを示す指定されたスケジュール。 必要に応じて、スケジュールの設定を省略し、ジョブを手動でトリガーできます。
  • 一意の名前

Note

Databricks ノートブックでコードを作成する場合、[スケジュール] ボタンを使用して、そのノートブックをジョブとして構成できます。 「スケジュールされたノートブック ジョブの作成と管理」をご覧ください。

タスクとは

タスクとは、ジョブ内の手順として実行されるロジックの 1 単位を表します。 タスクには複雑さの異なるさまざまなものがあり、次のものが含まれます。

  • ノートブック
  • JAR
  • SQL クエリ
  • DLT パイプライン
  • 別のジョブ
  • 制御フロー タスク

タスク間の依存関係を指定することにより、それらの実行順を制御できます。 シーケンスまたは並列で実行するようにタスクを構成できます。

ジョブはタスクの状態情報およびメタデータと相互作用しますが、タスクのスコープは分離されています。 タスク値を使用すると、スケジュールされたタスク間でコンテキストを共有できます。 「タスク値を使用してタスク間で情報を渡す」を参照してください。

ジョブで使用できる制御フロー オプション

ジョブおよびジョブのタスクを構成する際に、ジョブ全体および個々のタスクの実行方法を制御する設定をカスタマイズできます。 オプションは次のとおりです。

トリガーの種類

ジョブを構成するときは、トリガーの種類を指定する必要があります。 次の種類のトリガーから選択できます。

手動でジョブをトリガーすることもできますが、ほとんどの場合、これは次のような特定のユース ケースのためのものです。

  • 外部のオーケストレーション ツールを使用して、REST API 呼び出しを使用してジョブをトリガーする場合。
  • 検証またはデータ品質の問題の解決に手動操作が必要な、まれにしか実行されないジョブがある場合。
  • 移行など、実行する必要があるのが 1 回または数回だけのワークロードを実行している場合。

Databricks ジョブのトリガーの種類」を参照してください。

再試行

再試行では、エラー メッセージが出されてジョブが失敗した場合に、特定のタスクを再実行する回数が指定されます。 多くの場合、エラーは一時的であり、再起動によって解決されます。 構造化ストリーミングでのスキーマの進化など、Azure Databricks の一部の機能では、環境をリセットしてワークフローを続行できるようにするために、再試行してジョブを実行することが前提となっています。

再試行を 1 つのタスクに対して指定する場合、エラーが発生すると、指定された回数までタスクが再起動されます。 すべてのジョブ構成でタスクの再試行がサポートされているわけではありません。 「再試行ポリシーを設定する」を参照してください。

連続トリガー モードで実行すると、Databricks ではエクスポネンシャル バックオフを使用して自動的に再試行します。 「継続的ジョブのエラーはどのように処理されますか?」を参照してください。

Run if 条件付きタスク

Run if というタスクの種類を使用すると、他のタスクの結果に基づいて後のタスクの条件を指定できます。 ジョブにタスクを追加し、アップストリームに依存するタスクを指定します。 それらのタスクの状態に基づいて、1 つ以上のダウンストリーム タスクを実行するように構成できます。 ジョブでサポートされている依存関係は次のとおりです。

  • すべてが成功した
  • 少なくとも 1 つが成功した
  • いずれも失敗しなかった
  • すべてが完了した
  • 少なくとも 1 つが失敗した
  • すべて失敗した

タスクの依存関係を設定する」を参照してください。

If/else 条件付きタスク

If/else というタスクの種類を使用すると、何らかの値に基づいて条件を指定できます。 「If/else タスク を使用して分岐ロジックをジョブに追加する」をご覧ください。

ジョブでは、ロジックで定義する taskValues がサポートされており、タスクからジョブ環境に何らかの計算または状態の結果を返すことができます。 if/else 条件は、taskValues、ジョブ パラメーター、または動的な値に対して定義できます。

Azure Databricks では、条件で次のオペランドをサポートしています。

  • ==
  • !=
  • >
  • >=
  • <
  • <=

関連項目:

For each タスク

For each タスクを使用して、別のタスクをループで実行し、タスクの各イテレーションに異なるパラメーター セットを渡します。

ジョブに For each タスクを追加するには、For each タスクとネストされたタスクの 2 つのタスクを定義する必要があります。 入れ子になったタスクは、For each タスクのイテレーションごとに実行するタスクであり、標準的な Databricks ジョブ タスクの種類の 1 つです。 入れ子になったタスクにパラメーターを渡すには、複数のメソッドがサポートされています。

パラメーター化された Azure Databricks ジョブ タスクをループで実行する」を参照してください。

期間のしきい値

期間のしきい値を指定すると、指定した期間を超えた場合に警告を送信したり、タスクやジョブを停止できます。 この設定を構成する場合の例として、以下が挙げられます。

  • タスクが、反応しなくなった状態で固まってしまうことが多い。
  • ワークフローの SLA を超過した場合に、エンジニアに警告する必要がある。
  • 想定外のコストが発生しないように、大規模なクラスターを使用して構成されたジョブを失敗させる必要がある。

ジョブの予想される完了時間またはタイムアウトを構成する」および「タスクの予想される完了時間またはタイムアウトを構成する」を参照してください。

コンカレンシー

ほとんどのジョブは、既定のコンカレンシーである、1 つのコンカレント ジョブを使用して構成されています。 つまり、新しいジョブをトリガーするまでに前のジョブの実行が完了していない場合、次のジョブの実行はスキップされます。

コンカレンシーを増やすユース ケースもありますが、ほとんどのワークロードではこの設定を変更する必要はありません。

コンカレンシーの構成の詳細については、「Databricks ジョブのキューイングとコンカレンシーの設定」を参照してください。

ジョブを監視する方法

ジョブ UI を使用すると、実行中の実行を含むジョブの実行を確認できます。 「Databricksジョブのモニタリングと可観測性」を参照してください。

ジョブまたはタスクの開始、完了、または失敗時に通知を受け取ることができます。 通知は、1 つ以上のメール アドレスまたはシステムの宛先に送信できます。 「ジョブ イベントのメール通知とシステム通知を追加する」を参照してください。

システム テーブルには、アカウント内のジョブ アクティビティに関連するレコードを表示できる lakeflow スキーマが含まれています。 「ジョブ システム テーブル リファレンス」を参照してください。

ジョブ システム テーブルを課金テーブルと結合し、アカウント全体のジョブ コストを監視することもできます。 「システム テーブルを使用してジョブ コストを監視する」を参照してください。

制限事項

次の制限があります。

  • 1 つのワークスペースでのタスクの同時実行は、2000 に制限されています。 すぐに開始できない実行を要求した場合は、429 Too Many Requests 応答が返されます。
  • 1 時間に 1 つのワークスペースで作成できるジョブの数は、10000 に制限されます ("実行の送信" を含む)。 この制限は、REST API およびノートブック ワークフローによって作成されるジョブにも影響します。
  • ワークスペースには、最大 12,000 個の保存済みジョブを含めることができます。
  • ジョブには、最大 100 個のタスクを含めることができます。

ワークフローをプログラムで管理するには

Databricks には、ワークフローをプログラムでスケジュールおよび調整できるようにする、次のようなツールと API があります。

開発者ツールの詳細については、「開発者ツール」を参照してください

Apache AirFlow を使用したワークフロー オーケストレーション

Apache Airflow を使用すると、データ ワークフローを管理およびスケジュールできます。 Airflow では、Python ファイルにワークフローを定義すると、Airflow でワークフローのスケジュールと実行が管理されます。 「Apache Airflow を使用して Azure Databricks ジョブを調整する」を参照してください。

Azure Data Factory を使用したワークフロー オーケストレーション

Azure Data Factory (ADF) は、データの保管、移行、処理のサービスを自動化されたデータ パイプラインにまとめることができるクラウド データ統合サービスです。 ADF を使用すると、ADF パイプラインの一部として Azure Databricks ジョブを調整できます。

ADF から Azure Databricks に対して認証する方法など、ADF Web アクティビティを使用してジョブを実行する方法については、「Azure Data Factory から Azure Databricks ジョブ オーケストレーションを活用する」を参照してください。

ADF にはまた、ADF パイプラインで Databricks ノートブック、Python スクリプト、または JAR にパッケージ化されたコードを実行するためのビルトインのサポートがあります。

ADF パイプラインで Databricks ノートブックを実行する方法については、「Azure Data Factory で Databricks Notebook アクティビティを使用して Databricks ノートブックを実行する」と、次に「Databricks Notebook を実行してデータを変換する」を参照してください。

ADF パイプラインで Python スクリプトを実行する方法については、「Azure Databricks で Python アクティビティを実行してデータを変換する」を参照してください。

ADF パイプラインで JAR にパッケージ化されたコードを実行する方法については、「Azure Databricks で JAR アクティビティを実行してデータを変換する」を参照してください。