ジョブを使って構造化ストリーミングのクエリ エラーから復旧する
構造化ストリーミングは、ストリーミング クエリにおけるフォールト トレランスとデータ整合性を備えています。失敗時に自動的に再開するように、Azure Databricks ジョブを使用して、構造化ストリーミング クエリを簡単に構成することができます。 ストリーミング クエリに対してチェックポイントを有効にすると、失敗後にクエリを再起動できます。 再起動されたクエリは、クエリが失敗したところから続行します。
構造化ストリーミング クエリに対してチェックポイントを有効にする
Databricks では、クエリを開始する前に、常に checkpointLocation
オプションをクラウド ストレージ パスに指定することをお勧めしています。 次に例を示します。
streamingDataFrame.writeStream
.format("parquet")
.option("path", "/path/to/table")
.option("checkpointLocation", "/path/to/table/_checkpoint")
.start()
このチェックポイントの場所には、クエリを識別する重要な情報がすべて保持されます。 クエリごとに異なるチェックポイントの場所が必要です。 複数のクエリに同じ場所を指定しないでください。 詳細については、「構造化ストリーミング プログラミング ガイド」を参照してください。
注意
checkpointLocation
はほとんどの種類の出力シンクで必須ですが、メモリ シンクなどの一部のシンクでは、checkpointLocation
を指定しない場合、一時的なチェックポイントの場所が自動的に生成される可能性があります。 これらの一時的なチェックポイントの場所によってフォールト トレランスやデータ整合性の保証が確証されることはなく、適切にクリーンアップされない場合があります。 checkpointLocation
を常に指定することで、潜在的な落とし穴を回避します。
障害時にストリーミング クエリを再起動するように、構造化ストリーミング ジョブを構成する
ストリーミング クエリを含むノートブックまたは JAR を使用して、Azure Databricks ジョブを作成し、次の目的のために構成できます。
- 常に新しいクラスターを使用します。
- 障害が発生した場合は常に再試行します。
ジョブの失敗時に自動的に再起動することは、スキーマの進化によるストリーミング ワークロードを構成する場合に特に重要です。 Azure Databricks 上でスキーマの進化を機能させるには、スキーマの変更が検出されたときに予期されるエラーを発生させ、ジョブの再開時に新しいスキーマを使ってデータを適切に処理します。 Databricks では、スキーマの進化を伴うクエリを含むストリーミング タスクを、Databricks ジョブで自動的に再起動するように常に構成することをお勧めします。
ジョブは、構造化ストリーム API と緊密に統合され、実行中のアクティブなすべてのストリーミング クエリの監視が可能です。 この構成により、クエリの一部が失敗した場合、ジョブは (他のすべてのクエリと共に) 実行を自動的に終了し、新しいクラスターで新しい実行を開始します。 これによって、ノートブックまたは JAR コードが再実行され、すべてのクエリがもう一度再起動されます。 これは、適切な状態に戻るための最も安全な方法です。
Note
- アクティブなストリーミング クエリでエラーが発生すると、アクティブな実行が失敗し、他のすべてのストリーミング クエリが終了します。
- ノートブックの最後に
streamingQuery.awaitTermination()
またはspark.streams.awaitAnyTermination()
を使用する必要はありません。 ストリーミング クエリがアクティブな場合、ジョブによって、実行が自動的に完了を防止します。 - Databricks では、構造化ストリーミング ノートブックをオーケストレーションする際に、
%run
とdbutils.notebook.run()
の代わりにジョブを使用することを推奨しています。 「ノートブックでコードをモジュール化またはリンクする」を参照してください。
推奨されるジョブ構成の例を次に示します。
- クラスター: これは、常に新しいクラスターを使用し、最新の Spark バージョン (またはバージョン 2.1 以上) を使用する場合に設定します。 Spark 2.1 以降で開始されたクエリは、クエリと Spark バージョンのアップグレード後に復旧できます。
- 通知: エラーに関するメール通知が必要な場合は、これを設定します。
- スケジュール: スケジュールは設定しないでください。
- タイムアウト: タイムアウトは設定しないでください。ストリーミング クエリは無期限に実行されます。
- 同時実行の最大数: 1 に設定します。 各クエリのインスタンスが同時にアクティブである必要があります。
- 再試行: [無制限] に設定します。
これらの構成を理解するには、「Azure Databricks ジョブを作成して実行する」をご覧ください。
構造化ストリーミング クエリの変更後に復旧する
同じチェックポイントの場所からの再起動の間に、ストリーミング クエリで許可される変更には制限があります。 許可されていない変更、または変更の影響が十分に定義されていない変更の一部を次に示します。 それらすべてについて、次のとおりです。
- 許可される用語は、指定された変更を実行できるが、その効果のセマンティクスが十分に定義されているかどうかを意味します。これは、クエリと変更によって異なります。
- "許可されない" という用語は、再起動されたクエリが予期しないエラーで失敗する可能性が高いため、指定された変更を行うべきでないことを意味します。
sdf
は、sparkSession.readStream
で生成されたストリーミング DataFrame/Dataset を表します。
構造化ストリーミング クエリでの変更の種類
- 入力ソースの数または種類 (つまり、異なるソース) の変更: これは許可されません。
- 入力ソースのパラメーターの変更: これが許可されるかどうかと、変更のセマンティクスが適切に定義されているかどうかは、ソースとクエリによって異なります。 次に例をいくつか示します。
レート制限の追加、削除、変更は許可されます。
spark.readStream.format("kafka").option("subscribe", "article")
to
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
サブスクライブされた記事とファイルに対する変更は、結果が予測不可能であるために、一般に許可されません。
spark.readStream.format("kafka").option("subscribe", "article")
からspark.readStream.format("kafka").option("subscribe", "newarticle")
へ
- トリガー間隔の変更: 増分バッチと時間間隔の間でトリガーを変更できます。 「実行間のトリガー間隔の変更」を参照してください。
- 出力シンクの種類の変更: シンクのいくつかの特定の組み合わせ間の変更が許可されます。 これは、ケースバイケースで検証する必要があります。 次に例をいくつか示します。
- Kafka シンクへのファイル シンクが許可されます。 Kafka には新しいデータだけが表示されます。
- ファイル シンクへの Kafka シンクは許可されません。
- Kafka シンクを foreach に変更するか、またはその逆を許可します。
- 出力シンクのパラメーターの変更: これが許可されるかどうかと、変更のセマンティクスが十分に定義されているかどうかは、シンクとクエリによって異なります。 次に例をいくつか示します。
- ファイル シンクの出力ディレクトリに対する変更は許可されません。
sdf.writeStream.format("parquet").option("path", "/somePath")
からsdf.writeStream.format("parquet").option("path", "/anotherPath")
へ - 出力トピックに対する変更は許可されます。
sdf.writeStream.format("kafka").option("topic", "topic1")
からsdf.writeStream.format("kafka").option("topic", "topic2")
へ - ユーザー定義の foreach シンク (つまり
ForeachWriter
コード) に対する変更は許可されますが、変更のセマンティクスはコードによって異なります。
- ファイル シンクの出力ディレクトリに対する変更は許可されません。
- プロジェクション/フィルター/マップのような操作での変更: 一部の場合が許可されます。 例:
- フィルターの追加または削除は、
sdf.selectExpr("a")
からsdf.where(...).selectExpr("a").filter(...)
に許可されます。 - 同じ出力スキーマを持つプロジェクションの変更は、
sdf.selectExpr("stringColumn AS json").writeStream
からsdf.select(to_json(...).as("json")).writeStream
に許可されます。 - 異なる出力スキーマを持つプロジェクションの変更は、条件付きで
sdf.selectExpr("a").writeStream
からsdf.selectExpr("b").writeStream
に許可されます。これは、出力シンクでスキーマを"a"
から"b"
に変更できる場合にのみです。
- フィルターの追加または削除は、
- ステートフル操作の変更: ストリーミング クエリの一部の操作では、結果を継続的に更新するために状態データを維持する必要があります。 構造化ストリームは、状態データをフォールト トレラント ストレージ (DBFS、Azure Blob Storage など) に自動的にチェックポイント処理し、再起動後に復元します。 ただし、これは、状態データのスキーマが再起動間で同じままである必要があります。 つまり、ストリーミング クエリのステートフル操作に対する変更 (追加、削除、またはスキーマの変更) は、再起動の間は許可されません。 状態回復を確保するために再起動の間にスキーマを変更すべきでないステートフル操作の一覧を次に示します。
- ストリーミング集計: たとえば、
sdf.groupBy("a").agg(...)
です。 グループ化キーまたは集計の数または種類の変更は許可されません。 - ストリーミング重複排除: たとえば、
sdf.dropDuplicates("a")
です。 グループ化キーまたは集計の数または種類の変更は許可されません。 - ストリーム同士の結合: たとえば
sdf1.join(sdf2, ...)
(つまり、両方の入力がsparkSession.readStream
と一緒に生成される) です。 スキーマ列または等結合列の変更は許可されません。 結合の種類 (外部または内部) の変更は許可されません。 結合条件のその他の変更は、不正に定義されています。 - 任意のステートフル操作: たとえば、
sdf.groupByKey(...).mapGroupsWithState(...)
やsdf.groupByKey(...).flatMapGroupsWithState(...)
です。 ユーザー定義状態のスキーマとタイムアウトの種類に対する変更は許可されません。 ユーザー定義の状態マッピング関数内での変更は許可されますが、変更のセマンティック効果はユーザー定義ロジックによって異なります。 状態スキーマの変更を本当にサポートする場合は、スキーマの移行をサポートするエンコード/デコード スキームを使用して、複雑な状態データ構造を明示的にバイトにエンコード/デコードできます。 たとえば、Avro でエンコードされたバイトとして状態を保存した場合、これによってバイナリ状態が復元されるため、クエリの再起動間で Avro-state-schema を自由に変更できます。
- ストリーミング集計: たとえば、