Stream Analytics のノーコード エディターを使用して Azure Cosmos DB にデータを具体化する

この記事では、ノーコード エディターを使用して Stream Analytics ジョブを簡単に作成する方法について説明します。 このジョブは、Event Hubs から継続的に読み取り、カウント、合計、平均などの集計を実行します。 ある期間にグループ化するフィールドを選択すると、ジョブによって結果が Azure Cosmos DB に継続的に書き込まれます。

前提条件

  • Azure Event Hubs と Azure Cosmos DB リソースはパブリックにアクセスできる必要があり、ファイアウォールの内側に置いたり、Azure Virtual Network でセキュリティ保護したりすることはできません。
  • Event Hubs のデータは、JSON、CSV、または Avro 形式でシリアル化される必要があります。

Stream Analytics ジョブを開発する

Azure Cosmos DB にデータを具体化する Stream Analytics ジョブを開発するには、次の手順を使用します。

  1. Azure portal で、Azure Event Hubs インスタンスを見つけて選択します。
  2. [機能][データの処理] を選択します。 次に、[Materialize Data in Azure Cosmos DB] (Azure Cosmos DB にデータを具体化) というタイトルのカードの [開始] を選択します。
    データの具体化フローの開始を示すスクリーンショット。
  3. ジョブの名前を入力し、[作成] を選択します。
  4. イベント ハブでのデータの種類として [シリアル化] を指定し、ジョブが Event Hubs に接続するのに使用する [認証方法] を指定します。 次に、 [接続](Connect) を選択します。
  5. 接続が成功し、Event Hubs のインスタンスにデータ ストリームが流れるようになると、次の 2 つのことがすぐに表示されます。
    • 入力ペイロードに存在するフィールド。 必要に応じて、フィールドの横にある 3 つのドット記号を選択して、フィールドのデータ型を削除、名前変更、または変更します。
      確認用に入力のイベント ハブ フィールドを示すスクリーンショット。
    • [データ プレビュー] の下部のペインに表示される、定期的に自動的に更新される入力データのサンプル。 サンプル入力データの静的ビューが必要な場合は、[ストリーミング プレビューの一時停止] を選択できます。
      サンプル入力データを示すスクリーンショット。
  6. 次のステップでは、平均やカウントなど、計算するフィールドと集計を指定します。 また、グループ化するフィールドと、時間枠を指定することもできます。 その後、[データ プレビュー] セクションでステップの結果を検証できます。
    グループ化領域を示すスクリーンショット。
  7. 結果を書き込む Cosmos DB データベースコンテナーを選択します。
  8. [開始] を選択して Stream Analytics ジョブを開始します。
    [開始] の選択場所となる定義を示すスクリーンショット。
    ジョブを開始するには、次のことを指定する必要があります。
    • ジョブを実行するストリーミング ユニット (SU) の数。 SU は、ジョブに割り当てられるコンピューティングとメモリの量を表します。 3 から始めて、必要に応じて調整することをお勧めします。
    • [出力データのエラー処理] を使用すると、データ エラーが原因でジョブの送信先への出力が失敗した場合に必要な動作を指定できます。 既定では、ジョブは書き込み操作が成功するまで再試行します。 出力イベントを削除することもできます。
  9. [開始] を選択すると、ジョブは 2 分以内に実行を開始します。 [Stream Analytics ジョブ] タブの [データの処理] セクションでジョブを確認します。ジョブ メトリックを調べて、必要があれば停止して再起動できます。

Event Hubs の geo レプリケーション機能を使用する場合の考慮事項

最近、Azure Event Hubs では geo レプリケーション機能のプレビューがローンチされました。 この機能は、Azure Event Hubs の geo ディザスター リカバリー 機能とは異なります。

フェールオーバーの種類が [強制] で、レプリケーションの整合性が [非同期] の場合、Stream Analytics ジョブでは、Azure Event Hubs 出力への出力が 1 回のみ行われることは保証されません。

フェールオーバー期間中および Event Hubs によるスロットリング中に、プライマリとセカンダリの間のレプリケーションのラグが最大構成ラグに達すると、Event Hubs を出力とするプロデューサーである Azure Stream Analytics がジョブでウォーターマーク遅延を検出する場合があります。

Event Hubs を入力とするコンシューマーである Azure Stream Analytics が、フェールオーバー期間中にウォーターマーク遅延を検出し、フェールオーバーの完了後、データをスキップするか、重複データを検出する場合があります。

これらの注意事項により、Event Hubs のフェールオーバーが完了した直後に、適切な開始時刻で Stream Analytics ジョブを再起動することをお勧めします。 また、Event Hubs の geo レプリケーション機能はパブリック プレビュー段階であるため、現時点で運用環境の Stream Analytics ジョブにこのパターンを使用することはお勧めしません。 現在の Stream Analytics の動作は、Event Hubs の geo レプリケーション機能が一般公開される前に改善され、Stream Analytics の運用ジョブで使用できるようになります。

次のステップ

これで、Event Hubs から読み取り、カウントや平均などの集計を計算して、Azure Cosmos DB リソースに書き込むジョブを、Stream Analytics のノー コード エディターを使用して開発する方法がわかりました。