MongoDB Atlas のデータ変更を Azure Synapse Analytics にリアルタイムで同期するようにする

Azure Synapse Analytics

リアルタイム分析は、現在の分析情報に基づいて意思決定を迅速に行い、自動化されたアクションを実行するのに役立ちます。 また、強化されたカスタマー エクスペリエンスを提供するのにも役立ちます。 このソリューションでは、MongoDB のオペレーショナル データの変更と、Azure Synapse Analytics のデータ プールの同期を維持する方法について説明します。

アーキテクチャ

次の図は、Atlas から Azure Synapse Analytics にリアルタイムでの同期を実装する方法を示しています。 この単純なフローにより、MongoDB Atlas コレクションで発生するすべての変更が、Azure Synapse Analytics ワークスペースの既定の Azure Data Lake Storage リポジトリにレプリケートされます。 データが Data Lake Storage に格納されたら、Azure Synapse Analytics パイプラインを使用して、分析要件に応じ、専用の SQL プール、Spark プール、またはその他のソリューションにデータをプッシュできます。

MongoDB Atlas から Azure Synapse Analytics へのリアルタイム同期を実装するためのアーキテクチャを示す図。

このアーキテクチャの PowerPoint ファイル をダウンロードします。

データフロー

MongoDB Atlas オペレーショナル データ ストア (ODS) のリアルタイムでの変更はキャプチャされ、リアルタイム分析のユース ケース、ライブ レポート、ダッシュボードで使用できるよう Azure Synapse Analytics ワークスペースの Data Lake Storage で利用可能になります。

  1. MongoDB Atlas のオペレーショナル/トランザクション データストアのデータ変更は、 Atlas トリガーによってキャプチャされます。

  2. Atlas データベース トリガーによってイベントが観察されると、変更の種類と変更されたドキュメント (完全または差分) が Atlas 関数に渡されます。

  3. Atlas 関数によって Azure 関数がトリガーされ、変更イベントと JSON ドキュメントが渡されます。

  4. Azure Functions は、Azure Storage Files Data Lake クライアント ライブラリを使用して、変更されたドキュメントを Azure Synapse Analytics ワークスペースの構成済みの Data Lake Storage に書き込みます。

  5. Data Lake Storage にデータが書き込まれると、専用 SQL プール、Spark プール、およびその他のソリューションに送信できるようになります。 または、現在のデータに対して追加の BI レポートを作成したり AI/機械学習を実行したりするために、Azure Synapse Analytics データ フローまたは Copy パイプラインを使用して、JSON から Parquet または Delta 形式にデータを変換することができます。

コンポーネント

  • MongoDB Atlas 変更ストリーム では、コレクション、データベース、デプロイ クラスターに対する変更をアプリケーションに通知できます。 変更ストリームを使用すると、アプリケーションはリアルタイムのデータ変更にアクセスでき、変更に素早く対応できます。 この機能は、アラームを発生させ、即座に対応する必要のある、IoT イベントの追跡や財務データの変更などのユース ケースで重要です。 Atlas トリガーは、変更ストリームを使用してコレクションの変更を監視し、トリガー イベントに対応する関連する Atlas 関数を自動的に呼び出します。
  • Atlas トリガー は、特定のコレクションのドキュメントの挿入、更新、削除に応答するもので、変更イベントに応答して Atlas 関数を自動的に呼び出すことができます。
  • Atlas 関数 は、Atlas トリガーを呼び出す、イベントに基づいてアクションを実行するサーバーレスのサーバー側 JavaScript コード実装です。 Atlas トリガーと Atlas 関数を組み合わせると、イベントドリブン アーキテクチャの実装を簡略化できます。
  • Azure Functions は、希望のプログラミング言語でアプリケーションを効率的に開発できる、イベントドリブンのサーバーレス コンピューティング プラットフォームです。 これを使用すると、他の Azure サービスとシームレスに接続することもできます。 このシナリオでは、Azure 関数によって変更イベントをキャプチャし、それを使用して、変更されたデータを含む BLOB を Azure Storage Files Data Lake クライアント ライブラリを使用して Data Lake Storage に書き込みます。
  • Data Lake Storage は、Azure Synapse Analytics の既定のストレージ ソリューションです。 データに直接クエリを実行するには、サーバーレス プールを使用します。
  • Azure Synapse Analyticsパイプライン および データ フロー を使用すると、さらなる分析用に MongoDB の変更データを含む BLOB を専用 SQL プールまたは Spark プールにプッシュできます。 パイプラインを使用すると、 ストレージ イベント トリガースケジュールされたトリガー の両方を使用し、リアルタイムとほぼリアルタイムの両方のユース ケース用のソリューションを構築して Data Lake Storage 内の変更されたデータセットに対処できます。 この統合により、変更データセットのダウンストリームでの利用が高速化されます。

Azure Synapse Analytics パイプラインがデータをプールにプッシュする方法を示す図。

代替

このソリューションでは、Atlas トリガーを使用し、Atlas の変更ストリームをリッスンして変更イベントに応答して Azure Functions をトリガーするコードをラップします。 したがって、以前に提供されていた 代替ソリューションよりもはるかに簡単に実装できます。 このソリューションでは、 Azure App Services Web アプリで変更ストリームをリッスンするコードを作成する必要があります。

MongoDB Spark コネクタ を使用して MongoDB ストリーム データを読み取り、Delta テーブルに書き込む、別の方法もあります。 このコードは、Azure Synapse Analytics のパイプラインの一部である Spark Notebook で継続的に実行されます。 このソリューションの実装の詳細については、 Spark ストリーミングを使用した Atlas から Azure Synapse Analytics への同期に関する説明を参照してください。

ただし、Azure Functions で Atlas トリガーを使用した場合、完全にサーバーレスなソリューションとなります。 サーバーレスであるため、堅牢なスケーラビリティとコストの最適化が実現されます。 価格は従量課金制のコスト モデルに基づいています。 Atlas 関数を使用して、Azure Functions エンドポイントを呼び出す前にいくつかの変更イベントを組み合わせれば、さらにコストを節約できます。 この戦略は、トラフィックの多いシナリオで役立ちます。

また、 Microsoft Fabric ではデータ資産を統合し、データに対する分析と AI の実行を容易にでき、分析情報をすばやく取得できます。 Fabric の Azure Synapse Analytics Data Engineering、データ サイエンス、データ ウェアハウス、リアルタイム分析で、OneLake にプッシュされる MongoDB データがより適切に利用されるようになりました。 Atlas 用の Dataflow Gen2 およびデータ パイプライン コネクタを両方使用して、Atlas データを OneLake に直接読み込むことができます。 このコードなしのメカニズムは、Atlas から OneLake にデータを取り込む強力な方法を提供します。

Microsoft Fabric がデータを OneLake にプッシュする方法を示す図。

Fabric では、Extract/Transform/Load (ETL) なしで、OneLake ショートカットを使用して、Data Lake Storage にプッシュされるデータを直接参照できます。

データを Power BI にプッシュして、BI レポート用のレポートと視覚化を作成できます。

シナリオの詳細

多くのエンタープライズ アプリケーションのオペレーショナル データ層である MongoDB Atlas には、内部アプリケーション、顧客向けサービス、および複数のチャネルのサードパーティ API からのデータが保存されます。 Azure Synapse Analytics のデータ パイプラインを使用すると、このデータと、他の従来のアプリケーションのリレーショナル データや、ログ、オブジェクト ストアおよびクリックストリームなどのソースからの非構造化データとを結合できます。

企業は、 集計分析ノードAtlas Searchベクトル検索Atlas Data LakeAtlas SQL インターフェイスデータ フェデレーションおよび グラフ などの MongoDB 機能を使用し、アプリケーション駆動型インテリジェンスを可能にしています。 ただし、MongoDB のトランザクション データは、バッチ、AI/機械学習、データ ウェアハウスの BI 分析とインテリジェンスのために、Azure Synapse Analytics 専用の SQL プールまたは Spark プールに、抽出、変換、読み込まれます。

Atlas と Azure Synapse Analytics 間のデータ移動には、バッチ統合とリアルタイム同期の 2 つのシナリオがあります。

バッチ統合

Atlas から Azure Synapse Analytics の Data Lake Storage にデータを移動するには、バッチとマイクロバッチの統合を使用します。 履歴データを一度に全部フェッチすることも、フィルター条件に基づいて増分データをフェッチすることもできます。

Azure Synapse Analytics では、MongoDB オンプレミス インスタンスと MongoDB Atlas をソースまたはシンク リソースとして統合できます。 コネクタの詳細については、 MongoDB との間での双方向のデータ コピー または MongoDB Atlas との間でのデータ コピーに関する説明を参照してください。

オンプレミスの MongoDB または Atlas に保存されているオペレーショナル データに対し Azure Synapse Analytics を実行するには、ソース コネクタが便利です。 ソース コネクタを使用して Atlas からデータをフェッチし、Parquet、Avro、JSON、テキスト形式、または CSV BLOB ストレージとして Data Lake Storage にデータを読み込むことができます。 その後に、これらのファイルを変換したり、マルチデータベース、マルチクラウド、またはハイブリッド クラウド環境の他のデータ ソースの他のファイルに変換または結合することができます。 このユース ケースは、エンタープライズ データ ウェアハウス (EDW) と大規模な分析シナリオで一般的です。 また、シンク コネクタを使用し、分析結果を Atlas に戻し、格納することもできます。 バッチ統合の詳細については、「Azure Synapse Analytics を使用して MongoDB Atlas のオペレーショナル データを分析する」を参照してください。

リアルタイムの同期

この記事で説明するアーキテクチャでは、リアルタイム同期を実装し、Azure Synapse Analytics ストレージを MongoDB のオペレーショナル データで最新の状態に保つために役立ちます。

このソリューションは、主に次の 2 つの機能で構成されます。

  • Atlas の変更をキャプチャする
  • 変更を Azure Synapse Analytics に反映するように Azure 関数をトリガーする

Atlas の変更をキャプチャする

変更をキャプチャするには、Atlas トリガーを使用します。これは、 [トリガーの追加] UI で構成するか、 Atlas App Services Admin APIを使用して構成できます。 トリガーは、挿入、更新、削除などのデータベース イベントで発生するデータベースの変更をリッスンします。 Atlas トリガーは、変更イベントが検出されると Atlas 関数もトリガーします。 関数は、 [トリガーの追加] UI を使用し追加します。 Atlas 関数を作成し、 Atlas Admin API を使用して、それをトリガー呼び出しエンドポイントとして関連付けることもできます。

次のスクリーンショットは、Atlas トリガーの作成と編集に使用するフォームを示しています。 [トリガー ソースの詳細] セクションには、トリガーが変更イベントを監視するコレクションと、監視するデータベース イベント (挿入、更新、削除、および/または置換) を指定します。

Atlasトリガーを作成するためのフォームを示すスクリーンショット。

トリガーは、有効になっているイベントに応答して Atlas 関数を呼び出すことができます。 次のスクリーンショットは、データベース トリガーに応答して呼び出す Atlas 関数として追加された単純な JavaScript コードを示しています。 Atlas 関数は Azure 関数を呼び出し、トリガーが有効になっている内容に応じて、挿入、更新、削除、または置換されたドキュメントと共に変更イベントのメタデータを渡します。

トリガーに追加された JavaScript コードを示すスクリーンショット。

Atlas 関数のコード

Atlas 関数のコードは、要求の本文の changeEvent 全体を Azure 関数に渡すことによって、Azure 関数エンドポイントに関連付けられている Azure 関数をトリガーします。

<Azure function URL endpoint> プレースホルダーは、実際の Azure 関数 URL エンドポイントに置き換える必要があります。

exports =  function(changeEvent) {

    // Invoke Azure function that inserts the change stream into Data Lake Storage.
    console.log(typeof fullDocument);
    const response =  context.http.post({
        url: "<Azure function URL endpoint>",
        body: changeEvent,
        encodeBodyAsJSON: true
    });
    return response;
};

変更を Azure Synapse Analytics に反映するように Azure 関数をトリガーする

Atlas 関数は、Azure Synapse Analytics の Data Lake Storage に変更ドキュメントを書き込む Azure 関数を呼び出すようにコーディングされます。 Azure 関数は、 Python 用 Azure Data Lake Storage クライアント ライブラリ SDK を使用して、ストレージ アカウントを表す DataLakeServiceClient クラスのインスタンスを作成します。

Azure 関数は、認証にストレージ キーを使用します。 Microsoft Entra ID OAuth の実装を使用することもできます。 Dake Lake Storage に関連する storage_account_key およびその他の属性は、構成された OS の環境変数からフェッチされます。 要求本文がデコードされると、 fullDocument (挿入または更新されたドキュメント全体) が要求本文から解析され、Data Lake クライアント関数 append_dataflush_data によって Data Lake Storage に書き込まれます。

削除操作では、 fullDocumentの代わりに fullDocumentBeforeChange が使用されます。 fullDocument には削除操作では値がないため、コードは削除されたドキュメントをフェッチします。これは fullDocumentBeforeChange にキャプチャされます。 なお、 fullDocumentBeforeChange は、前のスクリーンショットに示すように、 Document Preimage の設定がオンに設定されている場合にのみ設定されます。

import json
import logging
import os
import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a new request.')
    logging.info(req)
    storage_account_name = os.environ["storage_account_name"]
    storage_account_key = os.environ["storage_account_key"]
    storage_container = os.environ["storage_container"]
    storage_directory = os.environ["storage_directory"]
    storage_file_name = os.environ["storage_file_name"]
    service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
            "https", storage_account_name), credential=storage_account_key)
    json_data = req.get_body()
    logging.info(json_data)
    object_id = "test"
    try:
        json_string = json_data.decode("utf-8")
        json_object = json.loads(json_string)

        if json_object["operationType"] == "delete":
            object_id = json_object["fullDocumentBeforeChange"]["_id"]["$oid"]
            data = {"operationType": json_object["operationType"], "data":json_object["fullDocumentBeforeChange"]}
        else:
            object_id = json_object["fullDocument"]["_id"]["$oid"]
            data = {"operationType": json_object["operationType"], "data":json_object["fullDocument"]}
        
        logging.info(object_id)
        encoded_data = json.dumps(data)
    except Exception as e:
        logging.info("Exception occurred : "+ str(e)) 
        
    file_system_client = service_client.get_file_system_client(file_system=storage_container)
    directory_client = file_system_client.get_directory_client(storage_directory)
    file_client = directory_client.create_file(storage_file_name + "-" + str(object_id) + ".txt")
    file_client.append_data(data=encoded_data, offset=0, length=len(encoded_data))
    file_client.flush_data(len(encoded_data))
    return func.HttpResponse(f"This HTTP triggered function executed successfully.")

ここまでで、Atlas トリガーが発生した変更をキャプチャして Atlas 関数を介して Azure 関数に渡す方法と、Azure 関数が Azure Synapse Analytics ワークスペースの Data Lake Storage に新しいファイルとして変更ドキュメントを書き込む方法を確認しました。

ファイルが Data Lake Storage に追加されたら、 ストレージ イベント トリガー を設定して、専用 SQL プールまたは Spark プール テーブルに変更ドキュメントを書き込むことができるパイプラインをトリガーできます。 パイプラインで、 Copy アクティビティを使用し、データ フローを使用してデータを変換できます。 または、最終的なターゲットが専用 SQL プールである場合は、Azure Synapse Analytics の専用 SQL プールに直接書き込むよう Azure 関数を変更できます。 SQL プールの場合は、SQL プール接続用に ODBC 接続文字列 を取得します。 接続文字列を使用した SQL プール テーブルにクエリを実行するために使用できる Python コードの例については、「Python を使用してデータベースを照会する」を参照してください。 このコードを変更し、Insert クエリを使用して専用 SQL プールに書き込むことができます。 関数で専用 SQL プールに書き込むためには、構成設定とロールを割り当てる必要があります。 これらの設定とロールに関する情報は、この記事では扱いません。

ほぼリアルタイムのソリューションが必要で、データをリアルタイムで同期する必要がない場合は、スケジュールされたパイプラインを実行する選択肢がよい場合があります。 スケジュールされたトリガーを設定して、ビジネスで許容できるほぼリアルタイムの頻度で、Copy アクティビティまたはデータ フローを使用してパイプラインをトリガーし、 MongoDB コネクタ を使用して、前回のスケジュールされた実行と現在の実行の間に挿入、更新、または削除されたデータを MongoDB からフェッチできます。 このパイプラインは、MongoDB コネクタをソース コネクタとして使用して MongoDB Atlas から差分データをフェッチし、シンク接続として使用して Data Lake Storage または Azure Synapse Analytics の専用 SQL プールにプッシュします。 このソリューションでは、Atlas トリガーがリッスンしている MongoDB Atlas コレクションで変更が発生した場合、(この記事で説明しているプッシュ メカニズムである主なソリューションに対し) MongoDB Atlas からのプル メカニズムを使用します。

考えられるユース ケース

MongoDB と Azure Synapse Analytics EDW と分析サービスは、さまざまなユース ケースに対応します。

小売

  • インテリジェンスを製品バンドルと製品のプロモーションに組み込む
  • customer 360 とハイパーパーソナル化の実装
  • 在庫の枯渇の予測とサプライ チェーンの注文の最適化
  • eコマースでの動的割引価格とスマート検索の実装

銀行と金融

  • 顧客の金融サービスをカスタマイズする
  • 不正なトランザクションの検出とブロック

遠距離通信

  • 次世代ネットワークを最適化する
  • エッジ ネットワークの価値を最大にする

自動車

  • 接続済み車両のパラメーター化の最適化
  • 接続済み車両の IoT 通信の異常を検出する

製造

  • 機械の予測メンテナンスを実現する
  • 保管と在庫管理を最適化する

考慮事項

これらの考慮事項は、ワークロードの品質向上に使用できる一連の基本原則である Azure Well-Architected Framework の要素を組み込んでいます。 詳細については、「Microsoft Azure Well-Architected Framework」を参照してください。

セキュリティ

セキュリティは、重要なデータやシステムの意図的な攻撃や悪用に対する保証を提供します。 詳細については、「セキュリティの重要な要素の概要」を参照してください。

Azure Functions はサーバーレス マネージド サービスであるため、アプリ リソースとプラットフォーム コンポーネントは強化されたセキュリティによって保護されます。 ただし、HTTPS プロトコルと最新の TLS バージョンを使用することをお勧めします。 また、入力を検証して MongoDB の変更ドキュメントであることを確認することをお勧めします。 Azure Functions のセキュリティに関する考慮事項については、「Azure Functions のセキュリティ保護」を参照してください。

MongoDB Atlas はサービスとしてのマネージド データベースであるため、MongoDB はプラットフォームのセキュリティを強化します。 MongoDB には、データベース アクセス、ネットワーク セキュリティ、保存時と転送中の暗号化、データ主権など、格納されているデータに対してセキュリティを 360 度確保するメカニズムが複数あります。 MongoDB Atlas のセキュリティに関するホワイト ペーパーと、MongoDB 内のデータがデータ ライフサイクル全体にわたってセキュリティで保護されていることを確認するのに役立つその他の記事については、 MongoDB Atlas のセキュリティ に関するページを参照してください。

コストの最適化

コストの最適化とは、不要な費用を削減し、運用効率を向上させることです。 詳しくは、 コスト最適化の柱の概要に関する記事をご覧ください。

Azure 製品の構成とコストの見積もりには Azure 料金計算ツールを使用してください。 Azure では、使用するリソースの種類の適切な数を特定し、支出を時間の経過に伴って分析し、過剰に出費することなくビジネス ニーズに合わせてスケーリングし、不要なコストを抑えることができます。 Azure 関数は、呼び出されたときにのみコストが発生します。 ただし、MongoDB Atlas の変更量によっては、別の一時コレクションに変更を格納し、バッチが特定の制限を超えた場合にのみ Azure 関数がトリガーされるように Atlas 関数でバッチ処理メカニズムを使用することを評価できます。

Atlas クラスターの詳細については、 MongoDB Atlas のコストを削減する 5 つの方法 および クラスター構成コストに関する説明を参照してください。 MongoDB の価格に関するページ では、MongoDB Atlas クラスターおよび MongoDB Atlas 開発者データ プラットフォームのその他のオファリングの価格オプションを理解するのに役立ちます。 Atlas Data Federation は Azure にデプロイでき、 Azure Blob Storage を (プレビュー段階で) サポートしています。 バッチ処理を使用してコストを最適化することを検討している場合は、MongoDB の一時コレクションではなく Blob Storage への書き込みを検討してください。

パフォーマンス効率

パフォーマンス効率とは、ユーザーによって行われた要求に合わせて効率的な方法でワークロードをスケーリングできることです。 詳細については、「パフォーマンス効率の柱の概要」を参照してください。

Atlas トリガーと Azure 関数は、パフォーマンスとスケーラビリティにおいて実績があります。 Azure Functions のパフォーマンスとスケーラビリティに関する考慮事項については、「Durable Functions のパフォーマンスとスケーリング (Azure Functions)」を参照してください。 MongoDB Atlas インスタンスのパフォーマンスを向上させるいくつかの考慮事項については、 オンデマンドでのスケーリング に関する説明を参照してください。 MongoDB Atlas の構成のベスト プラクティスについては、 MongoDB のパフォーマンスのベスト プラクティス ガイド に関するページを参照してください。

まとめ

MongoDB Atlas は Azure Synapse Analytics とシームレスに統合でき、Atlas のお客様は Atlas を Azure Synapse Analytics のソースまたはシンクとして簡単に使用できます。 このソリューションを使用すると、複雑な分析と AI の推論に Azure Synapse Analytics の MongoDB オペレーショナル データをリアルタイムで使用できます。

このシナリオのデプロイ

MongoDB Atlas を Azure Synapse Analytics にリアルタイムで同期する

共同作成者

この記事は、Microsoft によって保守されています。 当初の寄稿者は以下のとおりです。

プリンシパルの作成者:

  • Diana Annie Jenosh | シニア ソリューション アーキテクト - MongoDB Partners チーム
  • Venkatesh Shanbag| シニア ソリューション アーキテクト - MongoDB Partners チーム

その他の共同作成者:

  • Sunil Sabat | プリンシパル プログラム マネージャー - ADF チーム
  • Wee Hyong Tok | PM のプリンシパル ディレクター - ADF チーム

パブリックでない LinkedIn プロファイルを表示するには、LinkedIn にサインインします。

次のステップ