Stream Analytics のノーコード エディターを使用してフィルター処理と Azure Data Explorer への取り込みを行う

この記事では、ノーコード エディターを使用して Stream Analytics ジョブを簡単に作成する方法について説明します。 Event Hubs から継続的に読み取り、受信データをフィルター処理してから、Azure Data Explorer に結果を継続的に書き込みます。

前提条件

  • お使いの Azure Event Hubs と Azure Data Explorer リソースが、パブリックにアクセス可能であり、かつファイアウォールの内側に配置されたり、Azure Virtual Network でセキュリティ保護されたりしていてはいけません
  • Event Hubs のデータは、JSON、CSV、または Avro 形式でシリアル化される必要があります。

リアルタイム データをフィルター処理して取り込む Stream Analytics ジョブを開発する

  1. Azure portal で、Azure Event Hubs インスタンスを見つけて選択します。

  2. [機能]>[データの処理] を選択してから、[データのフィルター処理および Azure Data Explorer への保存] カードで [開始] を選択します。

    Azure Data Lake Storage Gen2 カードへのフィルター処理と取り込みを行う、[開始] を選択する場所を示すスクリーンショット。

  3. Stream Analytics の名前を入力して、[作成] を選択します。

    ジョブ名の入力場所を示すスクリーンショット。

  4. [Event Hubs] ウィンドウでデータの種類として [シリアル化] を指定し、ジョブが Event Hubs に接続するのに使用する [認証方法] を指定します。 次に、 [接続](Connect) を選択します。
    Event Hubs 接続構成を示すスクリーンショット。

  5. 接続が正常に確立されて、Event Hubs のインスタンスにデータ ストリームが流れるようになると、次の 2 つのことがすぐに表示されます。

    • 入力データに存在するフィールド。 [フィールドの追加] を選択するか、フィールドの横にある 3 つのドット記号を選択して削除、名前の変更、または型の変更を行うことができます。
      フィールドの種類を削除、名前変更、または変更できる Event Hubs のフィールドの一覧を示すスクリーンショット。
    • ダイアグラム ビューの [データ プレビュー] テーブルでの受信データのライブ サンプル。 それは定期的に自動更新されます。 [ストリーミング プレビューの一時停止] を選択すると、サンプル入力データの静的ビューを見ることができます。
      [Data Preview] (データのプレビュー) の下にサンプル データが示されているスクリーンショット。
  6. [フィルター] タイルを選択して、データを集計します。 [フィルター] 領域で、入力データを条件でフィルター処理するフィールドを選択します。

    フィルターのオペレーター構成を示すスクリーンショット。

  7. [管理] タイルを選択します。 [フィールドの管理] 構成パネルで、イベント ハブに出力するフィールドを選択します。 すべてのフィールドを追加する場合は、[すべてのフィールドを追加] を選択します。

    フィールド管理のオペレーター構成を示すスクリーンショット。

  8. [Azure Data Explorer] タイルを選択します。 構成パネルで、必要なパラメーターを入力して接続します。

    Note

    テーブルは、選択したデータベースに存在している必要があり、テーブル スキーマは、データのプレビューで生成されるフィールドとその型の数に正確に一致している必要があります。

    Kusto の出力構成を示すスクリーンショット。

  9. 必要に応じて、[静的プレビューの取得] または [静的プレビューの更新] を選択し、イベント ハブに取り込まれるデータのプレビューを表示します。
    [静的プレビューの取得] と [静的プレビューの更新] のオプションを示すスクリーンショット。

  10. [保存] を選択してから、Stream Analytics ジョブの [開始] を選択します。
    [保存] と [開始] のオプションを示すスクリーンショット。

  11. ジョブを開始するには、次のものを指定します。

    • ジョブを実行するストリーミング ユニット (SU) の数。 SU は、ジョブに割り当てられるコンピューティングとメモリの量を表します。 3 から始めて、必要に応じて調整することをお勧めします。
    • [出力データのエラー処理] – データ エラーが原因でジョブの送信先への出力が失敗した場合に必要な動作を指定できます。 既定では、ジョブは書き込み操作が成功するまで再試行します。 出力イベントを削除することもできます。
      [Stream Analytics ジョブの開始] のオプションを示すスクリーンショット。このオプションでは、出力時刻を変更し、ストリーミング ユニットの数を設定し、[出力データのエラー処理] のオプションを選択できます。
  12. [開始] を選択すると、2 分以内にジョブの実行が開始して、下のタブ セクションでメトリックが開きます。

    ジョブの開始後のメトリック データを示すスクリーンショット。

    [Stream Analytics ジョブ] タブの [データの処理] セクションでジョブを確認することもできます。必要に応じて、[メトリックを開く] を選択して監視するか、停止して再起動します。

    実行中のジョブの状態が表示されている [Stream Analytics ジョブ] タブのスクリーンショット。

Event Hubs の geo レプリケーション機能を使用する場合の考慮事項

Azure Event Hubs では最近、geo レプリケーション機能のプレビューがローンチされました。 この機能は、Azure Event Hubs の Geo ディザスター リカバリー 機能とは異なります。

フェールオーバーの種類が [適用]、レプリケーションの整合性が [非同期]である場合、Stream Analytics ジョブでは、Azure Event Hubs 出力への出力が 1 回のみ行われることは保証されません。

Event Hubs を出力とするプロデューサーである Azure Stream Analytics が、フェールオーバー期間中および Event Hubs によるスロットリング中に、プライマリとセカンダリの間のレプリケーションのラグが最大構成ラグに達すると、ジョブでウォーターマーク遅延を検出する場合があります。

Event Hubs を入力とするコンシューマーである Azure Stream Analytics が、フェールオーバー期間中にウォーターマーク遅延を検出し、フェールオーバーの完了後、データをスキップするか、重複データを見つけようとする場合があります。

これらの注意事項により、Event Hubs のフェールオーバーが完了した直後に、適切な開始時刻で Stream Analytics ジョブを再起動することをお勧めします。 また、Event Hubs の geo レプリケーション機能はパブリック プレビュー段階であるため、現時点で運用環境の Stream Analytics ジョブにこのパターンを使用することはお勧めしません。 現在の Stream Analytics の動作は、Event Hubs の geo レプリケーション機能が一般公開される前に改善され、Stream Analytics の運用ジョブで使用できるようになります。

次のステップ

Azure Stream Analytics の詳細と、作成したジョブを監視する方法を理解してください。