Stream Analytics のノーコード エディターを使用してフィルター処理と Azure Data Lake Storage Gen2 への取り込みを行う
この記事では、ノーコード エディターを使用して Stream Analytics ジョブを簡単に作成する方法について説明します。 Event Hubs から継続的に読み取り、受信データをフィルター処理してから、Azure Data Lake Storage Gen2 に結果を継続的に書き込みます。
前提条件
- Azure Event Hubs リソースはパブリックにアクセスできる必要があり、ファイアウォールの内側に置いたり、Azure Virtual Network でセキュリティ保護したりすることはできません
- Event Hubs のデータは、JSON、CSV、または Avro 形式でシリアル化される必要があります。
リアルタイム データをフィルター処理して取り込む Stream Analytics ジョブを開発する
Azure portal で、Azure Event Hubs インスタンスを見つけて選択します。
[機能]>[データの処理] を選択し、[フィルター処理と ADLS Gen2 への取り込み] カードで [開始] を選択します。
[Event Hubs] ウィンドウでデータの種類として [シリアル化] を指定し、ジョブが Event Hubs に接続するのに使用する [認証方法] を指定します。 次に、 [接続](Connect) を選択します。
接続が正常に確立されて、Event Hubs のインスタンスにデータ ストリームが流れるようになると、次の 2 つのことがすぐに表示されます。
Azure Data Lake Storage Gen2 タイルを選択します。 フィルター処理されたデータを送信する Azure Data Lake Gen2 アカウントを選択します。
- ドロップダウン メニューから [サブスクリプション]、[ストレージ アカウント名]、[コンテナー] を選択します。
- [サブスクリプション] が選択されると、[認証方法] と [ストレージ アカウント キー] が自動的に入力されます。 [接続] を選択します。
フィールドの詳細とパス パターンの例については、「Azure Stream Analytics からの BLOB ストレージと Azure Data Lake Gen2 出力」を参照してください。
必要に応じて、[静的プレビューの取得]/[静的プレビューの更新] を選択して、Azure Data Lake Storage Gen2 から取り込まれるデータ プレビューを表示します。
ジョブを開始するには、ジョブを実行するストリーミング ユニット (SU) の数を指定します。 SU は、ジョブに割り当てられるコンピューティングとメモリの量を表します。 3 から始めて、必要に応じて調整することをお勧めします。
[開始] を選ぶと、2 分以内にジョブの実行が開始され、下のタブ セクションにメトリックが表示されます。
[Stream Analytics ジョブ] タブの [データの処理] セクションにジョブが表示されます。ジョブの状態が [実行中] と表示されるまで、[更新] を選択します。 必要に応じて、[メトリックを開く] を選択して監視するか、停止して再起動します。
メトリックのサンプル ページを次に示します。
Data Lake Storage のデータを確認する
指定したコンテナーにファイルが作成されているはずです。
そのファイルをダウンロードして開き、フィルター処理されたデータのみが表示されることを確認します。 次の例では、SwitchNum が US に設定されたデータが表示されます。
{"RecordType":"MO","SystemIdentity":"d0","FileNum":"548","SwitchNum":"US","CallingNum":"345697969","CallingIMSI":"466921402416657","CalledNum":"012332886","CalledIMSI":"466923101048691","DateS":"20220524","TimeType":0,"CallPeriod":0,"ServiceType":"S","Transfer":0,"OutgoingTrunk":"419","MSRN":"1416960750071","callrecTime":"2022-05-25T02:07:10Z","EventProcessedUtcTime":"2022-05-25T02:07:50.5478116Z","PartitionId":0,"EventEnqueuedUtcTime":"2022-05-25T02:07:09.5140000Z", "TimeS":null,"CallingCellID":null,"CalledCellID":null,"IncomingTrunk":null,"CalledNum2":null,"FCIFlag":null} {"RecordType":"MO","SystemIdentity":"d0","FileNum":"552","SwitchNum":"US","CallingNum":"012351287","CallingIMSI":"262021390056324","CalledNum":"012301973","CalledIMSI":"466922202613463","DateS":"20220524","TimeType":3,"CallPeriod":0,"ServiceType":"V","Transfer":0,"OutgoingTrunk":"442","MSRN":"886932428242","callrecTime":"2022-05-25T02:07:13Z","EventProcessedUtcTime":"2022-05-25T02:07:50.5478116Z","PartitionId":0,"EventEnqueuedUtcTime":"2022-05-25T02:07:12.7350000Z", "TimeS":null,"CallingCellID":null,"CalledCellID":null,"IncomingTrunk":null,"CalledNum2":null,"FCIFlag":null} {"RecordType":"MO","SystemIdentity":"d0","FileNum":"559","SwitchNum":"US","CallingNum":"456757102","CallingIMSI":"466920401237309","CalledNum":"345617823","CalledIMSI":"466923000886460","DateS":"20220524","TimeType":1,"CallPeriod":696,"ServiceType":"V","Transfer":1,"OutgoingTrunk":"419","MSRN":"886932429155","callrecTime":"2022-05-25T02:07:22Z","EventProcessedUtcTime":"2022-05-25T02:07:50.5478116Z","PartitionId":0,"EventEnqueuedUtcTime":"2022-05-25T02:07:21.9190000Z", "TimeS":null,"CallingCellID":null,"CalledCellID":null,"IncomingTrunk":null,"CalledNum2":null,"FCIFlag":null}
Event Hubs の geo レプリケーション機能を使用する場合の考慮事項
Azure Event Hubs では最近、geo レプリケーション機能のプレビューがローンチされました。 この機能は、Azure Event Hubs の Geo ディザスター リカバリー 機能とは異なります。
フェールオーバーの種類が [適用]、レプリケーションの整合性が [非同期]である場合、Stream Analytics ジョブでは、Azure Event Hubs 出力への出力が 1 回のみ行われることは保証されません。
Event Hubs を出力とするプロデューサーである Azure Stream Analytics が、フェールオーバー期間中および Event Hubs によるスロットリング中に、プライマリとセカンダリの間のレプリケーションのラグが最大構成ラグに達すると、ジョブでウォーターマーク遅延を検出する場合があります。
Event Hubs を入力とするコンシューマーである Azure Stream Analytics が、フェールオーバー期間中にウォーターマーク遅延を検出し、フェールオーバーの完了後、データをスキップするか、重複データを見つけようとする場合があります。
これらの注意事項により、Event Hubs のフェールオーバーが完了した直後に、適切な開始時刻で Stream Analytics ジョブを再起動することをお勧めします。 また、Event Hubs の geo レプリケーション機能はパブリック プレビュー段階であるため、現時点で運用環境の Stream Analytics ジョブにこのパターンを使用することはお勧めしません。 現在の Stream Analytics の動作は、Event Hubs の geo レプリケーション機能が一般公開される前に改善され、Stream Analytics の運用ジョブで使用できるようになります。
次のステップ
Azure Stream Analytics の詳細と、作成したジョブを監視する方法を理解してください。