Delta Lake 形式で Event Hubs からデータをキャプチャする

この記事では、ノー コード エディターを使用して、Azure Data Lake Storage Gen2 アカウントの Event Hubs のストリーミング データを Delta Lake 形式で自動的にキャプチャする方法について説明します。

前提条件

  • お使いの Azure Event Hubs と Azure Data Lake Storage Gen2 リソースはパブリックにアクセスできる必要があり、ファイアウォールの内側に置いたり、Azure Virtual Network でセキュリティ保護したりすることはできません。
  • Event Hubs のデータは、JSON、CSV、または Avro 形式でシリアル化される必要があります。

データをキャプチャするようにジョブを構成する

Azure Data Lake Storage Gen2 内のデータをキャプチャするように Stream Analytics ジョブを構成するには、次の手順に従います。

  1. Azure portal で、イベント ハブに移動します。

  2. [機能] >[データの処理] の順に選択し、[ADLS Gen2 へのデータを Delta Lake 形式でキャプチャする] カードの [開始] を選択します。
    Event Hubs のデータの処理でカードの開始を示すスクリーンショット。

    または、[機能]>[キャプチャ] を選択し、[出力イベントのシリアル化形式] で [Delta Lake] オプションを選択し、[データ キャプチャ構成の開始] を選択します。 キャプチャ データ作成のエントリ ポイントを示すスクリーンショット。

  3. Stream Analytics ジョブを識別するための [名前] を入力します。 [作成] を選択します
    ジョブ名を入力する [New Stream Analytics job] (新しい Stream Analytics ジョブ) ウィンドウを示すスクリーンショット。

  4. Event Hubs でのデータの種類として [シリアル化] を指定し、ジョブが Event Hubs に接続するのに使用する [認証方法] を指定します。 次に、 [接続](Connect) を選択します。 Event Hubs 接続構成を示すスクリーンショット。

  5. 接続が正常に確立されると、次の情報が表示されます。

    • 入力データに存在するフィールド。 [フィールドの追加] を選択するか、フィールドの横にある 3 つのドット記号を選択して削除または名前の変更を行うことができます。
    • ダイアグラム ビューの [データ プレビュー] テーブルでの受信データのライブ サンプル。 定期的に更新されます。 [ストリーミング プレビューの一時停止] を選択すると、サンプル入力の静的ビューを見ることができます。
      [Data Preview] (データのプレビュー) の下にサンプル データが示されているスクリーンショット。
  6. [Azure Data Lake Storage Gen2] タイルを選択して構成を編集します。

  7. [Azure Data Lake Storage Gen2] 構成ページで、次の手順を行います。

    1. ドロップダウン メニューから [サブスクリプション]、[ストレージ アカウント名]、[コンテナー] を選択します。

    2. [サブスクリプション] が選択されると、[認証方法] と [ストレージ アカウント キー] が自動的に入力されます。

    3. Delta テーブル パスは、Azure Data Lake Storage Gen2 に格納されている Delta Lake テーブルの場所と名前を指定するために使用されます。 1 つ以上のパス セグメントを使用して、Delta テーブルへのパスと Delta テーブル名を定義するよう選択できます。 詳細については、Delta Lake テーブルへの書き込みに関するページを参照してください。

    4. [接続] を選択します。

      BLOB の接続構成を編集する [BLOB] ウィンドウを示す最初のスクリーンショット。

  8. 接続が確立されると、出力データに存在するフィールドが表示されます。

  9. コマンド バーで 「保存」 を選択して、構成を保存します。

  10. コマンド バーで [開始] を選択して、データをキャプチャするストリーミング フローを開始します。 次に、[Stream Analytics ジョブの開始] ウィンドウで次の手順を行います。

    1. 出力開始時刻を選択します。
    2. ジョブを実行するストリーミング ユニット (SU) の数を選択します。 SU は、Stream Analytics ジョブを実行するために割り当てられているコンピューティング リソースを表しています。 詳細については、Azure Stream Analytics のストリーミング ユニットに関するページを参照してください。
      出力の開始時刻、ストリーミング ユニット、エラー処理を設定する [Stream Analytics ジョブの開始] ウィンドウを示すスクリーンショット。
  11. [開始] を選択すると、2 分以内にジョブの実行が開始され、次の図に示すように、タブ セクションにメトリックが表示されます。 メトリックのグラフを示すスクリーンショット。

  12. 新しいジョブは、[Stream Analytics ジョブ] タブに表示されます。 [メトリックを開く] リンクが選択されているスクリーンショット。

出力の確認

Delta lake 形式の Parquet ファイルが Azure Data Lake Storage コンテナーに生成されていることを確認します。

Azure Data Lake Storage (ADLS) コンテナーに生成された Parquet ファイルを示すスクリーンショット。

Event Hubs の geo レプリケーション機能を使用する場合の考慮事項

最近、Azure Event Hubs では geo レプリケーション機能のプレビューがローンチされました。 この機能は、Azure Event Hubs の geo ディザスター リカバリー 機能とは異なります。

フェールオーバーの種類が [強制] で、レプリケーションの整合性が [非同期] の場合、Stream Analytics ジョブでは、Azure Event Hubs 出力への出力が 1 回のみ行われることは保証されません。

フェールオーバー期間中および Event Hubs によるスロットリング中に、プライマリとセカンダリの間のレプリケーションのラグが最大構成ラグに達すると、Event Hubs を出力とするプロデューサーである Azure Stream Analytics がジョブでウォーターマーク遅延を検出する場合があります。

Event Hubs を入力とするコンシューマーである Azure Stream Analytics が、フェールオーバー期間中にウォーターマーク遅延を検出し、フェールオーバーの完了後、データをスキップするか、重複データを検出する場合があります。

これらの注意事項により、Event Hubs のフェールオーバーが完了した直後に、適切な開始時刻で Stream Analytics ジョブを再起動することをお勧めします。 また、Event Hubs の geo レプリケーション機能はパブリック プレビュー段階であるため、現時点で運用環境の Stream Analytics ジョブにこのパターンを使用することはお勧めしません。 現在の Stream Analytics の動作は、Event Hubs の geo レプリケーション機能が一般公開される前に改善され、Stream Analytics の運用ジョブで使用できるようになります。

次のステップ

これで、Stream Analytics のノー コード エディターを使用して、Event Hubs のデータを Delta lake 形式で Azure Data Lake Storage Gen2 にキャプチャするジョブを作成する方法を確認しました。 次は、Azure Stream Analytics の詳細と、作成したジョブを監視する方法について学習します。