Parquet 形式で Event Hubs からデータをキャプチャする
この記事では、ノー コード エディターを使用して、Azure Data Lake Storage Gen2 アカウントの Event Hubs のストリーミング データを Parquet 形式で自動的にキャプチャする方法について説明します。
前提条件
イベント ハブを持つ Azure Event Hubs 名前空間と、キャプチャされたデータを保存するコンテナーを持つ Azure Data Lake Storage Gen2 アカウント。 これらのリソースはパブリックにアクセスできる必要があり、ファイアウォールの内側に置いたり、Azure 仮想ネットワークでセキュリティ保護したりすることはできません。
イベント ハブがない場合は、クイック スタート: イベント ハブの作成の手順に従って作成します。
Data Lake Storage Gen2 アカウントがない場合は、ストレージ アカウントの作成の手順に従ってアカウントを作成します
Event Hubs のデータは、JSON、CSV、または Avro 形式でシリアル化される必要があります。 テスト目的の場合は、左側のメニューで [データの生成 (プレビュー)] を選択し、データセットに [Stocks data] を選択して、[送信] を選択します。
データをキャプチャするようにジョブを構成する
Azure Data Lake Storage Gen2 内のデータをキャプチャするように Stream Analytics ジョブを構成するには、次の手順に従います。
Azure portal で、イベント ハブに移動します。
左側のメニューの [機能] で [データの処理] を選択します。 次に、[データを Parquet 形式で ADLS Gen2 にキャプチャする] カードの [開始] を選択します。
Stream Analytics ジョブの名前を入力して、[作成] を選択します。
Event Hubs でのデータの種類として [シリアル化] を指定し、ジョブが Event Hubs に接続するのに使用する [認証方法] を指定します。 次に、 [接続](Connect) を選択します。
接続が正常に確立されると、次の情報が表示されます。
[Azure Data Lake Storage Gen2] タイルを選択して構成を編集します。
[Azure Data Lake Storage Gen2] 構成ページで、次の手順を行います。
ドロップダウン メニューから [サブスクリプション]、[ストレージ アカウント名]、[コンテナー] を選択します。
[サブスクリプション] が選択されると、[認証方法] と [ストレージ アカウント キー] が自動的に入力されます。
[シリアル化] 形式に [Parquet] を選択します。
ストリーミング BLOB の場合、ディレクトリのパス パターンは動的な値であると想定されます。 日付を、BLOB のファイルパスの一部にする必要があります。これは、
{date}
として参照されます。 カスタム パス パターンの詳細については、「Azure Stream Analytics でのカスタム BLOB 出力のパーティション分割」を参照してください。[接続] を選択します
接続が確立されると、出力データに存在するフィールドが表示されます。
コマンド バーで 「保存」 を選択して、構成を保存します。
コマンド バーで [開始] を選択して、データをキャプチャするストリーミング フローを開始します。 次に、[Stream Analytics ジョブの開始] ウィンドウで次の手順を行います。
出力開始時刻を選択します。
価格プランを選択します。
ジョブを実行するストリーミング ユニット (SU) の数を選択します。 SU は、Stream Analytics ジョブを実行するために割り当てられているコンピューティング リソースを表しています。 詳細については、Azure Stream Analytics のストリーミング ユニットに関するページを参照してください。
イベント ハブの [データの処理] ページの [Stream Analytics ジョブ] タブに Stream Analytic ジョブが表示されます。
出力の確認
イベント ハブの [Event Hubs インスタンス] ページで、[データの生成] を選択し、データセットに [Stock data] を選択してから、[送信] を選択してサンプル データをイベント ハブに送信します。
Parquet ファイルが Azure Data Lake Storage コンテナーに生成されていることを確認します。
左側のメニューで [データの処理] を選択します。 [Stream Analytics ジョブ] タブに切り替えます。[メトリックを開く] を選択して監視します。
入力イベントと出力イベントを示すメトリックのスクリーンショットの例を次に示します。
Event Hubs の geo レプリケーション機能を使用する場合の考慮事項
Azure Event Hubs では最近、geo レプリケーション機能のプレビューがローンチされました。 この機能は、Azure Event Hubs の Geo ディザスター リカバリー 機能とは異なります。
フェールオーバーの種類が [適用]、レプリケーションの整合性が [非同期]である場合、Stream Analytics ジョブでは、Azure Event Hubs 出力への出力が 1 回のみ行われることは保証されません。
Event Hubs を出力とするプロデューサーである Azure Stream Analytics が、フェールオーバー期間中および Event Hubs によるスロットリング中に、プライマリとセカンダリの間のレプリケーションのラグが最大構成ラグに達すると、ジョブでウォーターマーク遅延を検出する場合があります。
Event Hubs を入力とするコンシューマーである Azure Stream Analytics が、フェールオーバー期間中にウォーターマーク遅延を検出し、フェールオーバーの完了後、データをスキップするか、重複データを見つけようとする場合があります。
これらの注意事項により、Event Hubs のフェールオーバーが完了した直後に、適切な開始時刻で Stream Analytics ジョブを再起動することをお勧めします。 また、Event Hubs の geo レプリケーション機能はパブリック プレビュー段階であるため、現時点で運用環境の Stream Analytics ジョブにこのパターンを使用することはお勧めしません。 現在の Stream Analytics の動作は、Event Hubs の geo レプリケーション機能が一般公開される前に改善され、Stream Analytics の運用ジョブで使用できるようになります。
次のステップ
これで、Stream Analytics のノー コード エディターを使用して、Event Hubs のデータを Parquet 形式で Azure Data Lake Storage Gen2 にキャプチャするジョブを作成する方法を確認しました。 次は、Azure Stream Analytics の詳細と、作成したジョブを監視する方法について学習します。