你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
重要
Azure Arc 启用的 Azure IoT 操作预览版目前处于预览状态。 不应在生产环境中使用此预览版软件。
在正式版推出后,你需要部署新的 Azure IoT 操作安装。 无法升级预览版安装。
有关 beta 版本、预览版或尚未正式发布的版本的 Azure 功能所适用的法律条款,请参阅 Microsoft Azure 预览版的补充使用条款。
数据流是数据使用可选转换从源到目标采用的路径。 你可以通过创建数据流自定义资源或使用 Azure IoT 操作工作室门户来配置数据流。 数据流由三个部分组成:源、转换和目标。
若要定义源和目标,需要配置数据流终结点。 转换是可选的,可以包括扩充数据、筛选数据并将数据映射到其他字段等操作。
本文介绍如何使用示例(包括源、转换和目标)创建数据流。
先决条件
创建数据流
有了数据流终结点后,可以使用它们来创建数据流。 回想一下,数据流由三个部分组成:源、转换和目标。
若要在操作体验门户中创建数据流,请选择“数据流”>“创建数据流”。
数据流配置的整体结构如下所示:
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
name: my-dataflow
namespace: azure-iot-operations
spec:
profileRef: default
mode: Enabled
operations:
- operationType: Source
sourceSettings:
# See source configuration section
- operationType: BuiltInTransformation
builtInTransformationSettings:
# See transformation configuration section
- operationType: Destination
destinationSettings:
# See destination configuration section
查看以下部分,了解如何配置数据流的操作类型。
若要配置数据流的源,请指定终结点引用和数据源。 可以指定终结点的数据源列表。
将资产用作源
可以使用资产作为数据流的源。 这仅在操作体验门户中可用。
在“源详细信息”下,选择“资产”。
选择要用作源终结点的资产。
选择“继续”。
将显示所选资产的数据点列表。
选择“应用”以将资产用作源终结点。
使用 MQTT 作为源
在“源详细信息”下,选择“MQTT”。
输入要侦听传入消息的 MQTT 主题。
从下拉列表中选择“消息架构”或上传新架构。 如果源数据具有可选字段或带有不同类型的字段,请指定反序列化架构以确保一致性。 例如,数据可能包含并非在所有消息中都存在的字段。 如果没有架构,转换就无法处理这些字段,因为它们会有空值。 具有架构时,可以指定默认值或忽略字段。
选择“应用”。
例如,若要使用一个 MQTT 终结点和两个 MQTT 主题筛选器配置源,请使用以下配置:
sourceSettings:
endpointRef: mq
dataSources:
- thermostats/+/telemetry/temperature/#
- humidifiers/+/telemetry/humidity/#
由于 dataSources
允许在不修改终结点配置的情况下指定 MQTT 或 Kafka 主题,因此即使主题不同,也可以为多个数据流重复使用终结点。 若要了解详细信息,请参阅重复使用数据流终结点。
指定要反序列化数据的架构
如果源数据具有可选字段或带有不同类型的字段,请指定反序列化架构以确保一致性。 例如,数据可能包含并非在所有消息中都存在的字段。 如果没有架构,转换就无法处理这些字段,因为它们会有空值。 具有架构时,可以指定默认值或忽略字段。
spec:
operations:
- operationType: Source
sourceSettings:
serializationFormat: Json
schemaRef: aio-sr://exampleNamespace/exampleAvroSchema:1.0.0
若要指定架构,请创建文件并将其存储在架构注册表中。
{
"type": "record",
"name": "Temperature",
"fields": [
{"name": "deviceId", "type": "string"},
{"name": "temperature", "type": "float"}
]
}
注意
唯一支持的序列化格式是 JSON。 架构是可选的。
有关架构注册表的详细信息,请参阅了解消息架构。
共享订阅
若要将共享订阅与 MQTT 源配合使用,可以用 $shared/<subscription-group>/<topic>
形式指定共享订阅主题。
sourceSettings:
dataSources:
- $shared/myGroup/thermostats/+/telemetry/temperature/#
注意
如果数据流配置文件中的实例计数大于 1,则必须使用共享订阅主题。
转换操作用于在将数据发送到目标之前转换源中的数据。 转换是可选的。 如果不需要对数据进行更改,请不要在数据流配置中添加转换操作。 无论在配置中指定的顺序如何,多个转换都将分阶段链接在一起。 阶段的顺序始终为
- 扩充:在给定数据集和匹配条件的情况下,向源数据添加额外数据。
- 筛选:根据条件筛选数据。
- 映射:使用可选转换将数据从一个字段移到另一个字段。
在操作体验门户中,选择“数据流”>“添加转换(可选)”。
builtInTransformationSettings:
datasets:
# ...
filter:
# ...
map:
# ...
扩充:添加引用数据
若要扩充数据,可以在 Azure IoT 操作的分布式状态存储 (DSS) 中使用引用数据集。 数据集用于根据条件向源数据添加额外数据。 条件指定为与数据集中的字段匹配的源数据中的字段。
分布式状态存储中的键名称对应于数据流配置中的数据集。
例如,可以使用源数据中的 deviceId
字段来匹配数据集中的 asset
字段:
builtInTransformationSettings:
datasets:
- key: assetDataset
inputs:
- $source.deviceId # ------------- $1
- $context(assetDataset).asset # - $2
expression: $1 == $2
如果数据集具有含 asset
字段的记录,类似于:
{
"asset": "thermostat1",
"location": "room1",
"manufacturer": "Contoso"
}
源中 deviceId
字段与 thermostat1
匹配的数据具有在 filter
和 map
阶段可用的 location
和 manufacturer
字段。
你可以使用 DSS 集工具示例将示例数据加载到 DSS。
有关条件语法的详细信息,请参阅使用数据流扩充数据和使用数据流转换数据。
筛选器:基于条件筛选数据
若要按条件筛选数据,可以使用 filter
阶段。 条件指定为与值匹配的源数据中的字段。
在“转换(可选)”下,选择“筛选器”>“添加”。
选择要包含在数据集中的数据点。
添加筛选器条件和说明。
选择“应用”。
例如,可以使用源数据中的 temperature
字段来筛选数据:
builtInTransformationSettings:
filter:
- inputs:
- temperature ? $last # - $1
expression: "$1 > 20"
如果 temperature
字段大于 20,则系统会将数据传递到下一阶段。 如果 temperature
字段小于或等于 20,则系统会筛选数据。
映射:将数据从一个字段移动到另一个字段
若要将数据映射到另一个具有可选转换的字段,可以使用 map
操作。 系统将转换指定为使用源数据中的字段的公式。
在操作体验门户中,当前支持使用“计算”转换进行映射。
在“转换(可选)”下,选择“计算”>“添加”。
输入必填字段和表达式。
选择“应用”。
例如,可以使用源数据中的 temperature
字段将温度转换为摄氏度,并将其存储在 temperatureCelsius
字段中。 还可以使用上下文化数据集中的 location
字段丰富源数据:
builtInTransformationSettings:
map:
- inputs:
- temperature # - $1
output: temperatureCelsius
expression: "($1 - 32) * 5/9"
- inputs:
- $context(assetDataset).location
output: location
若要了解详细信息,请参阅使用数据流映射数据和使用数据流转换数据。
根据架构序列化数据
如果要在将数据发送到目标之前对其进行序列化,则需要指定架构和序列化格式。 否则,数据会以 JSON 格式序列化,并推断出类型。 请记住,存储终结点(如 Microsoft Fabric 或 Azure Data Lake)需要架构来确保数据一致性。
builtInTransformationSettings:
serializationFormat: Parquet
schemaRef: aio-sr://<NAMESPACE>/<SCHEMA>:<VERSION>
若要指定架构,可以使用架构定义创建架构自定义资源。
有关架构注册表的详细信息,请参阅了解消息架构。
{
"type": "record",
"name": "Temperature",
"fields": [
{"name": "deviceId", "type": "string"},
{"name": "temperatureCelsius", "type": "float"}
{"name": "location", "type": "string"}
]
}
支持的序列化格式为 JSON、Parquet 和 Delta。
若要配置数据流的目标,请指定终结点引用和数据目标。 可以为终结点指定数据目标的列表,它们是 MQTT 或 Kafka 主题。
选择要用作目标的数据流终结点。
选择“继续”以配置目标。
根据目标类型添加映射详细信息。
例如,若要使用前面创建的 MQTT 终结点和静态 MQTT 主题配置目标,请使用以下配置:
destinationSettings:
endpointRef: mq
dataDestination: factory
如果你已创建存储终结点(如 Microsoft Fabric),请使用数据目标字段指定表或容器名称:
destinationSettings:
endpointRef: adls
dataDestination: telemetryTable
示例
以下示例是一个数据流配置,它将 MQTT 终结点用于源和目标。 源会筛选 MQTT 主题 thermostats/+/telemetry/temperature/#
和 humidifiers/+/telemetry/humidity/#
中的数据。 该转换会将温度转换为华氏度,并筛选温度小于 100000 的数据。 目标会将数据发送到 MQTT 主题 factory
。
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
name: my-dataflow
namespace: azure-iot-operations
spec:
profileRef: default
mode: Enabled
operations:
- operationType: Source
sourceSettings:
endpointRef: mq
dataSources:
- thermostats/+/telemetry/temperature/#
- humidifiers/+/telemetry/humidity/#
- operationType: builtInTransformation
builtInTransformationSettings:
filter:
- inputs:
- 'temperature.Value'
- '"Tag 10".Value'
expression: "$1*$2<100000"
map:
- inputs:
- '*'
output: '*'
- inputs:
- temperature.Value
output: TemperatureF
expression: cToF($1)
- inputs:
- '"Tag 10".Value'
output: 'Tag 10'
- operationType: Destination
destinationSettings:
endpointRef: mq
dataDestination: factory
验证数据流是否正常工作
遵循教程:到 Azure 事件网格的双向 MQTT 桥,验证数据流是否正常工作。
导出数据流配置
若要导出数据流配置,可以使用操作体验门户或导出数据流自定义资源。
选择要导出的数据流,然后从工具栏中选择“导出”。
kubectl get dataflow my-dataflow -o yaml > my-dataflow.yaml