ワークフローのスケジュールと調整

Databricks ワークフローには、Azure Databricks でデータ処理タスクをスケジュールおよび調整できるようにするツールのコレクションが用意されています。 Databricks ワークフローを使用して、Databricks ジョブを構成します。

この記事では、Databricks ジョブを使用した運用ワークロードの管理に関連する概念を紹介します。

Note

Delta Live Tables には、データ処理パイプラインを作成するための宣言構文が用意されています。 「Delta Live Tables とは」を参照してください。

Databricks ジョブとは

Databricks ジョブを使用すると、指定したコンピューティング環境で指定したスケジュールに基づいて実行するようにタスクを構成できます。 Delta Live Tables パイプラインに加え、ジョブは、Azure Databricks でデータ処理と ML ロジックを運用環境にデプロイするために使用される主要なツールです。

ジョブには、Databricks ノートブックを実行する 1 つのタスクから、条件付きロジックと依存関係を使用して実行される何千ものタスクまで、複雑さの異なるさまざまなものがあります。

ジョブを構成および実行する方法

ジョブを作成および実行するには、ジョブ UI、Databricks CLI を使用するか、Jobs API を呼び出します。 UI または API を使用して、失敗したジョブまたは取り消されたジョブを修復して再実行できます。 UI、CLI、API、通知 (メール、Webhook の宛先、Slack 通知など) を使用して、ジョブの実行結果を監視できます。

Databricks CLI の使用については、「Databricks CLI とは」を参照してください。 Jobs API の使用方法については、「Jobs API」をご覧ください。

ジョブに必要な最小構成

Azure Databricks でのすべてのジョブには、以下が必要です。

  • 実行するロジックを含むソース コード。
  • ロジックを実行するためのコンピューティング リソース。 コンピューティング リソースには、サーバーレス コンピューティング、クラシック ジョブ コンピューティング、または All-Purpose Compute があります。 「ジョブで Azure Databricks コンピューティングを使用する」を参照してください。
  • ジョブをいつ実行するかを示す指定されたスケジュール、または手動トリガー。
  • 一意の名前

Note

Databricks ノートブックでコードを作成する場合、[スケジュール] ボタンを使用して、そのノートブックをジョブとして構成できます。 「スケジュールされたノートブック ジョブの作成と管理」をご覧ください。

タスクとは

タスクは、ジョブ内のロジックの 1 単位を表します。 タスクには複雑さの異なるさまざまなものがあり、次のものが含まれます。

  • ノートブック
  • JAR
  • SQL クエリ
  • DLT パイプライン
  • 別のジョブ
  • 制御フロー タスク

タスク間の依存関係を指定することにより、それらの実行順を制御できます。 シーケンスまたは並列で実行するようにタスクを構成できます。

ジョブはタスクの状態情報およびメタデータと相互作用しますが、タスクのスコープは分離されています。 タスク値を使用すると、スケジュールされたタスク間でコンテキストを共有できます。 「Azure Databricks ジョブのタスク間で情報を共有する」を参照してください。

ジョブで使用できる制御フロー オプション

ジョブおよびジョブ内のタスクを構成するときに、ジョブ全体および個々のタスクの実行方法を制御する設定をカスタマイズできます。

トリガーの種類

ジョブを構成するときは、トリガーの種類を指定する必要があります。 次の種類のトリガーから選択できます。

ジョブを手動でトリガーすることもできますが、ほとんどの場合、これは次のような特定のユース ケースのためのものです。

  • REST API 呼び出しを使用してジョブをトリガーするために外部のオーケストレーション ツールを使用する場合。
  • 検証またはデータ品質の問題の解決に人間の関与が必要な、まれにしか実行されないジョブがある場合。
  • 移行など、実行する必要があるのが 1 回または数回だけのワークロードを実行している場合。

新しいファイルが到着したときにジョブをトリガーする」を参照してください。

再試行

再試行では、エラー メッセージが出されてジョブが失敗した場合に、特定のジョブまたはタスクを再実行する回数を指定します。 多くの場合、エラーは一時的で、再起動によって解決されます。また、構造化ストリーミングでのスキーマの進化など、Azure Databricks の一部の機能では、環境をリセットしてワークフローを続行できるようにするために、再試行してジョブを実行することが前提となっています。

再試行を構成するためのオプションは、サポートされているコンテキストの UI に表示されます。 これらには次のものが含まれます。

  • 再試行をジョブ全体に対して指定できます。つまり、いずれかのタスクが失敗すると、ジョブ全体が再起動されます。
  • 再試行を 1 つのタスクに対して指定できます。この場合、タスクは、エラーが発生すると、指定された回数まで再起動されます。

連続トリガー モードで実行すると、Databricks ではエクスポネンシャル バックオフを使用して自動的に再試行します。 「継続的ジョブのエラーはどのように処理されますか?」を参照してください。

Run if 条件付きタスク

Run if というタスクの種類を使用すると、他のタスクの結果に基づいて後のタスクの条件を指定できます。 ジョブにタスクを追加し、アップストリームに依存するタスクを指定します。 それらのタスクの状態に基づいて、1 つ以上のダウンストリーム タスクを実行するように構成できます。 ジョブでサポートされている依存関係は次のとおりです。

  • すべてが成功した
  • 少なくとも 1 つが成功した
  • いずれも失敗しなかった
  • すべてが完了した
  • 少なくとも 1 つが失敗した
  • すべて失敗した

Azure Databricks ジョブでタスクを条件付きで実行する」を参照してください

If/else 条件付きタスク

If/else というタスクの種類を使用すると、何らかの値に基づいて条件を指定できます。 「If/else 条件タスク を使用して分岐ロジックをジョブに追加する」を参照してください

ジョブでは、ロジック内で定義する taskValues をサポートし、タスクからジョブ環境に何らかの計算または状態の結果を返すことができます。 if/else 条件は、taskValues、ジョブ パラメーター、または動的な値に対して定義できます。

Azure Databricks では、条件で次のオペランドをサポートしています。

  • ==
  • !=
  • >
  • >=
  • <
  • <=

関連項目:

For each タスク

For each タスクを使用して、別のタスクをループで実行し、タスクの各イテレーションに異なるパラメーター セットを渡します。

ジョブに For each タスクを追加するには、For each タスクとネストされたタスクの 2 つのタスクを定義する必要があります。 入れ子になったタスクは、For each タスクのイテレーションごとに実行するタスクであり、標準的な Azure Databricks ジョブ タスクの種類の 1 つです。 入れ子になったタスクにパラメーターを渡すには、複数のメソッドがサポートされています。

パラメーター化された Azure Databricks ジョブ タスクをループで実行する」を参照してください。

期間のしきい値

期間のしきい値を指定すると、指定した期間を超えた場合に警告を送信するか、タスクまたはジョブを停止できます。 この設定を構成する場合の例として、以下が挙げられます。

  • タスクが、反応しなくなった状態で固まってしまうことが多い。
  • ワークフローの SLA を超過した場合に、エンジニアに警告する必要がある。
  • 想定外のコストが発生しないように、大規模なクラスターを使用して構成されたジョブを失敗させる必要がある。

コンカレンシー

ほとんどのジョブは、既定のコンカレンシーである、1 つのコンカレント ジョブを使用して構成されています。 つまり、新しいジョブをトリガーするまでに前のジョブの実行が完了していない場合、次のジョブの実行はスキップされます。

コンカレンシーを増やすユース ケースもありますが、ほとんどのワークロードではこの設定を変更する必要はありません。

コンカレンシーの構成の詳細については、「同時実行の最大数を構成する」を参照してください。

ジョブを監視する方法

ジョブまたはタスクの開始、完了、または失敗時に通知を受け取ることができます。 通知は、1 つ以上のメール アドレスまたはシステムの宛先に送信できます。 「ジョブ イベントのメール通知とシステム通知を追加する」を参照してください。

システム テーブルには、アカウント内のジョブ アクティビティに関連するレコードを表示できる lakeflow スキーマが含まれています。 「ジョブ システム テーブル リファレンス」を参照してください。

ジョブ システム テーブルを課金テーブルと結合し、アカウント全体のジョブ コストを監視することもできます。 「システム テーブルを使用してジョブ コストを監視する」を参照してください。

制限事項

次の制限があります。

  • 1 つのワークスペースでのタスクの同時実行は、1000 に制限されています。 すぐに開始できない実行を要求した場合は、429 Too Many Requests 応答が返されます。
  • 1 時間に 1 つのワークスペースで作成できるジョブの数は、10000 に制限されます ("実行の送信" を含む)。 この制限は、REST API およびノートブック ワークフローによって作成されるジョブにも影響します。
  • ワークスペースには、最大 12,000 個の保存済みジョブを含めることができます。
  • ジョブには、最大 100 個のタスクを含めることができます。

ワークフローをプログラムで管理するには

Databricks には、ワークフローをプログラムでスケジュールおよび調整できるようにする、次のようなツールと API が用意されています。

開発者ツールの詳細については、「開発者ツールとガイダンス」を参照してください。

Apache AirFlow を使用したワークフロー オーケストレーション

Apache Airflow を使用すると、データ ワークフローを管理およびスケジュールできます。 Airflow では、Python ファイルにワークフローを定義すると、Airflow でワークフローのスケジュールと実行が管理されます。 「Apache Airflow を使用して Azure Databricks ジョブを調整する」を参照してください。

Azure Data Factory を使用したワークフロー オーケストレーション

Azure Data Factory (ADF) は、データの保管、移行、処理のサービスを自動化されたデータ パイプラインにまとめることができるクラウド データ統合サービスです。 ADF を使用すると、ADF パイプラインの一部として Azure Databricks ジョブを調整できます。

ADF から Azure Databricks に対して認証する方法など、ADF Web アクティビティを使用してジョブを実行する方法については、「Azure Data Factory から Azure Databricks ジョブ オーケストレーションを活用する」を参照してください。

ADF には、ADF パイプラインで Databricks ノートブック、Python スクリプト、または JAR にパッケージ化されたコードを実行するための組み込みのサポートも用意されています。

ADF パイプラインで Databricks ノートブックを実行する方法については、「Azure Data Factory で Databricks Notebook アクティビティを使用して Databricks ノートブックを実行する」と、次に「Databricks Notebook を実行してデータを変換する」を参照してください。

ADF パイプラインで Python スクリプトを実行する方法については、「Azure Databricks で Python アクティビティを実行してデータを変換する」を参照してください。

ADF パイプラインで JAR にパッケージ化されたコードを実行する方法については、「Azure Databricks で JAR アクティビティを実行してデータを変換する」を参照してください。