構造化ストリーミングのトリガー間隔を構成する
Apache Spark 構造化ストリーミングは、データを段階的に処理します。バッチ処理のトリガー間隔を制御すると、準リアルタイムの処理、5 分ごとまたは 1 時間に 1 回のデータベースの更新、1 日または 1 週間のすべての新しいデータのバッチ処理など、ワークロードに対して構造化ストリーミングを使用できます。
Databricks の自動ローダーは構造化ストリーミングを使用してデータを読み込むため、トリガーのしくみを理解することで、必要な頻度でデータを取り込むときにコストを制御する柔軟性を最大限に高めることができます。
時間ベースのトリガー間隔の指定
構造化ストリーミングでは、時間ベースのトリガー間隔を "固定間隔マイクロバッチ" と表します。 processingTime
キーワードを使用して、.trigger(processingTime='10 seconds')
のように、期間を文字列として指定します。
小さすぎる trigger
間隔 (数十秒未満) を指定すると、新しいデータが到着した場合、システムが不要なチェックを実行する可能性があります。 待機時間の要件とデータがソースに到達する速度のバランスを取るために処理時間を構成します。
増分バッチ処理の構成
重要
Databricks Runtime 11.3 LTS 以降では、この Trigger.Once
設定は非推奨です。 Databricks では、すべての増分バッチ処理ワークロードに Trigger.AvailableNow
を使用することをお勧めします。
現在利用可能トリガー オプションでは、使用可能なすべてのレコードが増分バッチとして使用され、maxBytesPerTrigger
などのオプションを使用してバッチ サイズを構成できます (サイズ設定オプションはデータ ソースによって異なります)。
Azure Databricks では、多くの構造化ストリーミング ソースからの増分バッチ処理に Trigger.AvailableNow
を使用できます。 次の表に、各データ ソースに必要な Databricks Runtime の最小サポート バージョンを示します。
source | Databricks の最低ランタイム バージョン |
---|---|
ファイル ソース (JSON、Parquet など) | 9.1 LTS |
Delta Lake | 10.4 LTS |
自動ローダー | 10.4 LTS |
Apache Kafka | 10.4 LTS |
Kinesis | 13.1 |
既定のトリガー間隔はどのくらいですか?
構造化ストリーミングは、既定で 500 ms の固定間隔マイクロバッチに設定されます。 Databricks では、新しいデータが到着したかどうかの確認と、普通より小さいバッチの処理に関連するコストを最小限に抑えるために、常に調整済みの trigger
を指定することをお勧めしています。
実行間のトリガー間隔の変更
同じチェックポイントを使用しながら、実行間のトリガー間隔を変更できます。
マイクロバッチの処理中に構造化ストリーミング ジョブが停止した場合、そのマイクロバッチは、新しいトリガー間隔が適用される前に完了する必要があります。 そのため、トリガー間隔を変更した後、以前に指定した設定でマイクロバッチ処理が行われる場合があります。
時間ベースの間隔から AvailableNow
の使用に移行すると、使用可能なすべてのレコードを増分バッチとして処理する前にマイクロバッチ処理が行われる可能性があります。
AvailableNow
から時間ベースの間隔に移動すると、最後の AvailableNow
ジョブがトリガーされたときに使用可能だったすべてのレコードの処理が続行される可能性があります。 これは想定どおりの動作です。
Note
増分バッチに関連付けられているクエリ エラーから復旧しようとしている場合、バッチを完了する必要があるため、トリガー間隔を変更してもこの問題は解決されません。 Databricks は、バッチの処理に使用されるコンピューティング容量をスケールアップして、問題の解決を試みることを推奨しています。 まれに、新しいチェックポイントを使用してストリームを再起動することが必要になる場合があります。
連続処理モードとは何ですか?
Apache Spark では、連続処理と呼ばれる追加のトリガー間隔がサポートされています。 このモードは Spark 2.3 以降、試験段階として分類されています。この処理モデルのトレードオフの詳細については、Azure Databricks のアカウント チームにお問い合わせください。
この連続処理モードは、Delta Live テーブルで適用される連続処理とはまったく関係がないことにご注意ください。