你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

启用 MongoDB Atlas 数据更改到 Azure Synapse Analytics 的实时同步

Azure Synapse Analytics

实时分析可以帮助你快速做出决策,并根据当前见解执行自动化操作。 还可以帮助你提供增强的客户体验。 此解决方案介绍如何使 Azure Synapse Analytics 数据池与 MongoDB 中的操作数据更改保持同步。

体系结构

下图显示了如何实现从 Atlas 到 Azure Synapse Analytics 的实时同步。 此简单流可确保 MongoDB Atlas 集合中发生的任何更改都复制到 Azure Synapse Analytics 工作区中的默认 Azure Data Lake Storage 存储库。 数据在 Data Lake Storage 中后,可以根据分析要求使用 Azure Synapse Analytics 管道将数据推送到专用 SQL 池、Spark 池或其他解决方案。

显示了从 MongoDB Atlas 到 Azure Synapse Analytics 实施实时同步的体系结构的示意图。

下载此体系结构的 PowerPoint 文件

数据流

捕获 MongoDB Atlas 操作数据存储 (ODS) 中的实时更改,并将其提供给 Azure Synapse Analytics 工作区中的 Data Lake Storage,以便实时分析用例、实时报表和仪表板。

  1. Atlas 触发器捕获 MongoDB Atlas 操作/事务数据存储中的数据更改。

  2. Atlas 数据库触发器观察事件时,它会将更改类型和已更改的文档(完整或增量)传递给 Atlas 函数

  3. Atlas 函数触发 Azure 函数,传递更改事件和 JSON 文档。

  4. Azure Functions 使用 Azure 存储文件 Data Lake 客户端库将更改的文档写入 Azure Synapse Analytics 工作区中配置的 Data Lake Storage。

  5. 数据在 Data Lake Storage 中后,可以发送到专用 SQL 池、Spark 池和其他解决方案。 或者,也可以使用 Azure Synapse Analytics 数据流或复制管道将数据从 JSON 转换为 Parquet 或 Delta 格式,以便对当前数据运行其他 BI 报告或 AI/机器学习。

组件

  • MongoDB Atlas 更改流可用于通知应用程序对集合、数据库或部署群集所做的更改。 更改流可让应用程序访问实时数据更改,并让其立即对更改做出反应。 此功能在物联网事件跟踪和金融数据更改等需要立即发出警报并立即执行响应操作的用例中至关重要。 Atlas 触发器使用更改流监视集合中的更改,并自动调用关联的 Atlas 函数来响应触发器事件。
  • Atlas 触发器响应特定集合中的文档插入、更新和删除,并且可以自动调用 Atlas 函数以响应更改事件。
  • Atlas 函数是无服务器服务器端 JavaScript 代码实现,可以根据调用 Atlas 触发器的事件执行操作。 将 Atlas 触发器与 Atlas 函数相结合可以简化事件驱动体系结构的实现。
  • Azure Functions 是事件驱动的无服务器计算平台,可用于使用所选编程语言高效开发应用程序。 还可以使用它与其他 Azure 服务无缝连接。 在此方案中,Azure 函数捕获更改事件,并使用它通过 Azure 存储文件 Data Lake 客户端库将包含已更改数据的 Blob 写入 Data Lake Storage。
  • Data Lake Storage 是 Azure Synapse Analytics 中的默认存储解决方案。 可以使用无服务器池直接查询数据。
  • Azure Synapse Analytics 中的管道数据流可用于将包含 MongoDB 已更改数据的 blob 推送到专用 SQL 池或 Spark 池做进一步分析。 管道可让你使用存储事件触发器计划触发器为实时和近实时用例生成解决方案,从而处理 Data Lake Storage 中更改的数据集。 此集成可加速更改数据集的下游消耗。

显示了 Azure Synapse Analytics 管道如何将数据推送到池的示意图。

备选方法

此解决方案使用 Atlas 触发器包装代码来侦听 Atlas 更改流,并触发 Azure Functions 以响应更改事件。 因此,实现比以前提供的替代解决方案要容易得多。 对于该解决方案,需要编写代码来侦听 Azure 应用程序服务 Web 应用中的更改流。

另一种替代方法是使用 MongoDB Spark Connector 读取 MongoDB 流数据并将其写入 Delta 表。 代码在属于 Azure Synapse Analytics 中管道一部分的 Spark Notebook 中持续运行。 有关实现此解决方案的详细信息,请参阅使用 Spark 流式处理从 Atlas 同步到 Azure Synapse Analytics

但是,将 Atlas 触发器与 Azure Functions 配合使用可提供完全无服务器的解决方案。 由于它是无服务器的,因此该解决方案提供可靠的可伸缩性和成本优化。 定价基于即用即付成本模型。 在调用 Azure Functions 终结点之前,可以使用 Atlas 函数合并几个更改事件来节省更多资金。 此策略在大量流量方案中非常有用。

此外,Microsoft Fabric 可统一数据资产,更轻松地对数据运行分析和 AI,以便快速获取见解。 Azure Synapse Analytics 数据工程、数据科学、数据仓库和 Fabric 中的实时分析现在可以更好地利用推送到 OneLake 的 MongoDB 数据。 可以使用 Atlas 的数据流 Gen2 和数据管道连接器将 Atlas 数据直接加载到 OneLake。 这种无代码机制提供了从 Atlas 引入数据到 OneLake 的强大方法。

显示了 Microsoft Fabric 如何将数据推送到 OneLake 的示意图。

在 Fabric 中,可以使用 OneLake 快捷方式直接引用推送到 Data Lake Storage 的数据,而无需任何提取、转换、加载 (ETL)。

可以将数据推送到 Power BI,以便为 BI 报告创建报表和可视化效果。

方案详细信息

MongoDB Atlas 是许多企业应用程序的运营数据层,存储来自内部应用程序、面向客户的服务和来自多个渠道的第三方 API 的数据。 可以使用 Azure Synapse Analytics 中的数据管道将此数据与其他传统应用程序中的关系数据相结合,以及与日志、对象存储和点击流等源的非结构化数据相结合。

企业使用 MongoDB 功能,例如聚合分析节点Atlas 搜索矢量搜索Atlas Data LakeAtlas SQL 接口数据联合图表,以启用应用程序驱动的智能。 但是,MongoDB 中的事务数据会提取、转换并加载到 Azure Synapse Analytics 专用 SQL 池或 Spark 池,用于批处理、AI/机器学习和数据仓库 BI 分析和智能。

Atlas 和 Azure Synapse Analytics 之间的数据移动有两种方案:批量集成和实时同步。

批量集成

可以使用批处理和微批处理集成将数据从 Atlas 移到 Azure Synapse Analytics 中的 Data Lake Storage。 可以一次性提取整个历史数据,也可以根据筛选条件提取增量数据。

MongoDB 本地实例和 MongoDB Atlas 可以作为 Azure Synapse Analytics 中的源或接收器资源集成。 有关连接器的信息,请参阅在 MongoDB 之间复制数据在 MongoDB Atlas 之间复制数据

利用源连接器,可以方便地对存储在本地 MongoDB 和/或 Atlas 中的操作数据运行 Azure Synapse Analytics。 可以使用源连接器从 Atlas 提取数据,并将数据加载到 Parquet、Avro、JSON 和文本格式或 CSV Blob 存储中的 Data Lake Storage。 然后,可以在多数据库、多云或混合云方案中转换或联接来自其他数据源的其他文件。 此用例在企业数据仓库 (EDW) 和大规模分析方案中很常见。 还可以使用接收器连接器将分析结果存储回 Atlas 中。 有关批处理集成的详细信息,请参阅使用 Azure Synapse Analytics 分析 MongoDB Atlas 上的操作数据

实时同步

本文中所述的体系结构可帮助实现实时同步,使 Azure Synapse Analytics 存储与 MongoDB 的操作数据保持最新。

此解决方案主要有两个功能:

  • 捕获 Atlas 中的更改
  • 触发 Azure 函数以将更改传播到 Azure Synapse Analytics

捕获 Atlas 中的更改

可以使用 Atlas 触发器捕获更改,以在添加触发器 UI 中或通过 Atlas App Services 管理 API进行配置。 触发器侦听数据库事件(如插入、更新和删除)引起的数据库更改。 检测到更改事件时,Atlas 触发器还会触发 Atlas 函数。 可以使用添加触发器 UI 添加函数。 还可以使用 Atlas Admin API 创建 Atlas 函数并将其关联为触发器调用终结点。

以下屏幕截图显示了可用于创建和编辑 Atlas 触发器的窗体。 在触发器源详细信息部分中,指定触发器监视更改事件的集合及其监视的数据库事件(插入、更新、删除和/或替换)。

显示了创建 Atlas 触发器的表单的屏幕截图。

触发器可以调用 Atlas 函数,以响应为其启用的事件。 以下屏幕截图显示了作为 Atlas 函数添加的简单 JavaScript 代码,以响应数据库触发器而调用。 Atlas 函数调用 Azure 函数,根据触发器的启用情况,将更改事件的元数据与插入、更新、删除或替换的文档一起传递。

显示了添加到触发器中的 JavaScript 代码的屏幕截图。

Atlas 函数代码

Atlas 函数代码通过将请求正文中的整个 changeEvent 传递给 Azure 函数来触发与 Azure 函数终结点关联的 Azure 函数。

需要将 <Azure function URL endpoint> 占位符替换为实际的 Azure 函数 URL 终结点。

exports =  function(changeEvent) {

    // Invoke Azure function that inserts the change stream into Data Lake Storage.
    console.log(typeof fullDocument);
    const response =  context.http.post({
        url: "<Azure function URL endpoint>",
        body: changeEvent,
        encodeBodyAsJSON: true
    });
    return response;
};

触发 Azure 函数以将更改传播到 Azure Synapse Analytics

Atlas 函数编码为调用 Azure 函数,以将更改文档写入 Azure Synapse Analytics 中的 Data Lake Storage。 Azure 函数使用适用于 Python SDK 的 Azure Data Lake Storage 客户端库来创建表示存储帐户的 DataLakeServiceClient 类的实例。

Azure 函数使用存储密钥进行身份验证。 还可以使用 Microsoft Entra ID OAuth 实现。 从配置的 OS 环境变量中提取与 Dake Lake Storage 相关的 storage_account_key 及其他属性。 对请求正文解码后,从请求正文分析 fullDocument(整个插入或更新的文档),然后由 Data Lake 客户端函数 append_dataflush_data 写入 Data Lake Storage。

对于删除操作,使用 fullDocumentBeforeChange 而不是 fullDocumentfullDocument 在删除操作中没有任何值,因此代码将提取在 fullDocumentBeforeChange 中捕获的已删除文档。 请注意,仅当文档预映像设置设为打开时才会填充 fullDocumentBeforeChange,如上一屏幕截图所示。

import json
import logging
import os
import azure.functions as func
from azure.storage.filedatalake import DataLakeServiceClient

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a new request.')
    logging.info(req)
    storage_account_name = os.environ["storage_account_name"]
    storage_account_key = os.environ["storage_account_key"]
    storage_container = os.environ["storage_container"]
    storage_directory = os.environ["storage_directory"]
    storage_file_name = os.environ["storage_file_name"]
    service_client = DataLakeServiceClient(account_url="{}://{}.dfs.core.windows.net".format(
            "https", storage_account_name), credential=storage_account_key)
    json_data = req.get_body()
    logging.info(json_data)
    object_id = "test"
    try:
        json_string = json_data.decode("utf-8")
        json_object = json.loads(json_string)

        if json_object["operationType"] == "delete":
            object_id = json_object["fullDocumentBeforeChange"]["_id"]["$oid"]
            data = {"operationType": json_object["operationType"], "data":json_object["fullDocumentBeforeChange"]}
        else:
            object_id = json_object["fullDocument"]["_id"]["$oid"]
            data = {"operationType": json_object["operationType"], "data":json_object["fullDocument"]}
        
        logging.info(object_id)
        encoded_data = json.dumps(data)
    except Exception as e:
        logging.info("Exception occurred : "+ str(e)) 
        
    file_system_client = service_client.get_file_system_client(file_system=storage_container)
    directory_client = file_system_client.get_directory_client(storage_directory)
    file_client = directory_client.create_file(storage_file_name + "-" + str(object_id) + ".txt")
    file_client.append_data(data=encoded_data, offset=0, length=len(encoded_data))
    file_client.flush_data(len(encoded_data))
    return func.HttpResponse(f"This HTTP triggered function executed successfully.")

到目前为止,你已了解 Atlas 触发器如何捕获发生的任何更改,并通过 Atlas 函数将其传递给 Azure 函数,以及 Azure 函数将更改文档作为 Azure Synapse Analytics 工作区的 Data Lake Storage 中的新文件写入。

将文件添加到 Data Lake Storage 后,可以设置存储事件触发器来触发管道,然后可将更改文档写入专用 SQL 池或 Spark 池表。 管道可以使用复制活动,并使用数据流转换数据。 或者,如果最终目标是专用 SQL 池,则可以修改 Azure 函数以直接写入 Azure Synapse Analytics 中的专用 SQL 池。 对于 SQL 池,获取 SQL 池连接的 ODBC 连接字符串。 有关可用于使用连接字符串查询 SQL 池表的 Python 代码示例,请参阅使用 Python 查询数据库。 可以修改此代码以使用 Insert 查询写入专用 SQL 池。 需要分配配置设置和角色,以使函数能够写入专用 SQL 池。 有关这些设置和角色的信息超出了本文的范围。

如果想要近实时解决方案,并且不需要数据实时同步,则使用计划的管道运行可能是一个不错的选择。 可以设置计划触发器以业务能够承受的近实时频率触发包含复制活动或数据流的管道,以便使用 MongoDB 连接器从上次计划运行与当前运行之间插入、更新或删除的 MongoDB 提取数据。 管道使用 MongoDB 连接器作为源连接器从 MongoDB Atlas 提取增量数据,并将其作为接收器连接推送到 Data Lake Storage 或 Azure Synapse Analytics 专用 SQL 池。 此解决方案使用拉取机制(与本文中所述的主要解决方案 - 推送机制 - 相反),因为 MongoDB Atlas 集合中发生了 Atlas 触发器所侦听的 MongoDB Atlas 集合中的更改。

可能的用例

MongoDB 与 Azure Synapse Analytics EDW 和分析服务可以提供许多用例:

Retail

  • 将智能构建到产品捆绑和产品促销中
  • 实现客户 360 和超个性化
  • 预测库存枯竭和优化供应链订单
  • 在电子商务中实现动态折扣定价和智能搜索

银行和财务行业

  • 自定义客户金融服务
  • 检测和阻止欺诈性交易

电信

  • 优化下一代网络
  • 最大化边缘网络的价值

汽车

  • 优化联网车辆的参数化
  • 检测联网车辆的 IoT 通信中的异常

制造

  • 为机械提供预测性维护
  • 优化存储和库存管理

注意事项

这些注意事项实施 Azure 架构良好的框架的支柱原则,即一套可用于改进工作负载质量的指导原则。 有关详细信息,请参阅 Microsoft Azure 架构良好的框架

安全性

安全性针对蓄意攻击及滥用宝贵数据和系统提供保障措施。 有关详细信息,请参阅安全性支柱概述

Azure Functions 是一种无服务器托管服务,因此应用资源和平台组件受增强的安全性保护。 但是,我们建议使用 HTTPS 协议和最新的 TLS 版本。 验证输入以确保它是 MongoDB 更改文档也是一种好的做法。 有关 Azure Functions 的安全注意事项,请参阅保护 Azure Functions

MongoDB Atlas 是一种托管数据库即服务,因此 MongoDB 可提供增强的平台安全性。 MongoDB 提供多种机制来帮助确保存储数据的 360 度安全性,包括数据库访问、网络安全、静态加密和传输中加密,以及数据主权。 请参阅 MongoDB Atlas 安全白皮书和其他文章的 MongoDB Atlas 安全性,以帮助你确保 MongoDB 中的数据在整个数据生命周期内都安全。

成本优化

成本优化就是减少不必要的费用和提高运营效率。 有关详细信息,请参阅成本优化支柱概述

若要估算 Azure 产品和配置的成本,请使用 Azure 定价计算器。 Azure 有助于避免不必要的成本,方法是确定要使用的正确资源数、分析一段时间内的支出,以及在不超支的情况下进行缩放以满足业务需求。 Azure 函数仅在被调用时才会产生费用。 但是,根据 MongoDB Atlas 中的更改量,可以使用 Atlas 函数中的批处理机制来评估将更改存储在另一个临时集合中,并且仅当批处理超出特定限制时才触发 Azure 函数。

有关 Atlas 群集的信息,请参阅 5 种使用 MongoDB Atlas 降低成本的方法群集配置成本MongoDB 定价页可帮助你了解 MongoDB Atlas 群集和其他 MongoDB Atlas 开发人员数据平台的定价选项。 Atlas 数据联合可以部署在 Azure 中,支持 Azure Blob 存储(以预览版提供)。 如果考虑使用批处理来优化成本,请考虑写入 Blob 存储而不是 MongoDB 临时集合。

性能效率

性能效率是指工作负荷能够以高效的方式扩展以满足用户对它的需求。 有关详细信息,请参阅性能效率要素概述

Atlas 触发器和 Azure 函数经过时间检验,可提高性能和可伸缩性。 请参阅 Durable Functions (Azure Functions) 中的性能和缩放以了解 Azure Functions 的性能和可伸缩性注意事项。 有关提高 MongoDB Atlas 实例性能的一些注意事项,请参阅按需缩放。 有关 MongoDB Atlas 配置的最佳做法指南,请参阅 MongoDB 性能最佳做法指南

结论

MongoDB Atlas 与 Azure Synapse Analytics 无缝集成,可让 Atlas 客户轻松地将 Atlas 用作 Azure Synapse Analytics 的源或接收器。 借助此解决方案,可以从 Azure Synapse Analytics 实时使用 MongoDB 操作数据进行复杂的分析和 AI 推理。

部署此方案

MongoDB Atlas 到 Azure Synapse Analytics 的实时同步

作者

本文由 Microsoft 维护, 它最初是由以下贡献者撰写的。

主要作者:

其他参与者:

若要查看非公开领英个人资料,请登录领英。

后续步骤