Create a dataflow
Important
Azure IoT Operations Preview – enabled by Azure Arc is currently in preview. You shouldn't use this preview software in production environments.
You'll need to deploy a new Azure IoT Operations installation when a generally available release is made available. You won't be able to upgrade a preview installation.
See the Supplemental Terms of Use for Microsoft Azure Previews for legal terms that apply to Azure features that are in beta, preview, or otherwise not yet released into general availability.
A dataflow is the path that data takes from the source to the destination with optional transformations. You can configure the dataflow by using the Azure IoT Operations portal or by creating a Dataflow custom resource. Before you create a dataflow, you must configure dataflow endpoints for the data sources and destinations.
The following example is a dataflow configuration with an MQTT source endpoint, transformations, and a Kafka destination endpoint:
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
name: my-dataflow
spec:
profileRef: my-dataflow-profile
mode: enabled
operations:
- operationType: source
name: my-source
sourceSettings:
endpointRef: mq
dataSources:
- thermostats/+/telemetry/temperature/#
- humidifiers/+/telemetry/humidity/#
serializationFormat: json
- operationType: builtInTransformation
name: my-transformation
builtInTransformationSettings:
filter:
- inputs:
- 'temperature.Value'
- '"Tag 10".Value'
expression: "$1*$2<100000"
map:
- inputs:
- '*'
output: '*'
- inputs:
- temperature.Value
output: TemperatureF
expression: cToF($1)
- inputs:
- '"Tag 10".Value'
output: 'Tag 10'
serializationFormat: json
- operationType: destination
name: my-destination
destinationSettings:
endpointRef: kafka
dataDestination: factory
Name | Description |
---|---|
profileRef |
Reference to the dataflow profile. |
mode |
Mode of the dataflow: enabled or disabled . |
operations[] |
Operations performed by the dataflow. |
operationType |
Type of operation: source , destination , or builtInTransformation . |
Review the following sections to learn how to configure the operation types of the dataflow.
Configure source
To configure a source for the dataflow, specify the endpoint reference and data source. You can specify a list of data sources for the endpoint. For example, MQTT or Kafka topics. The following definition is an example of a dataflow configuration with a source endpoint and data source:
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
name: mq-to-kafka
namespace: azure-iot-operations
spec:
profileRef: example-dataflow
operations:
- operationType: source
sourceSettings:
endpointRef: mq-source
dataSources:
- azure-iot-operations/data/thermostat
Name | Description |
---|---|
operationType |
source |
sourceSettings |
Settings for the source operation. |
sourceSettings.endpointRef |
Reference to the source endpoint. |
sourceSettings.dataSources |
Data sources for the source operation. Wildcards ( # and + ) are supported. |
Configure transformation
The transformation operation is where you can transform the data from the source before you send it to the destination. Transformations are optional. If you don't need to make changes to the data, don't include the transformation operation in the dataflow configuration. Multiple transformations are chained together in stages regardless of the order in which they're specified in the configuration.
spec:
operations:
- operationType: builtInTransformation
name: transform1
builtInTransformationSettings:
datasets:
# ...
filter:
# ...
map:
# ...
Name | Description |
---|---|
operationType |
builtInTransformation |
name |
Name of the transformation. |
builtInTransformationSettings |
Settings for the builtInTransformation operation. |
builtInTransformationSettings.datasets |
Add other data to the source data given a dataset and condition to match. |
builtInTransformationSettings.filter |
Filter the data based on a condition. |
builtInTransformationSettings.map |
Move data from one field to another with an optional conversion. |
Enrich: Add reference data
To enrich the data, you can use the reference dataset in the Azure IoT Operations distributed state store (DSS). The dataset is used to add extra data to the source data based on a condition. The condition is specified as a field in the source data that matches a field in the dataset.
Name | Description |
---|---|
builtInTransformationSettings.datasets.key |
Dataset used for enrichment (key in DSS). |
builtInTransformationSettings.datasets.expression |
Condition for the enrichment operation. |
Key names in the distributed state store correspond to a dataset in the dataflow configuration.
For example, you could use the deviceId
field in the source data to match the asset
field in the dataset:
spec:
operations:
- operationType: builtInTransformation
name: transform1
builtInTransformationSettings:
datasets:
- key: assetDataset
inputs:
- $source.deviceId # ------------- $1
- $context(assetDataset).asset # - $2
expression: $1 == $2
If the dataset has a record with the asset
field, similar to:
{
"asset": "thermostat1",
"location": "room1",
"manufacturer": "Contoso"
}
The data from the source with the deviceId
field matching thermostat1
has the location
and manufacturer
fields available in filter
and map
stages.
You can load sample data into the DSS by using the DSS set tool sample.
For more information about condition syntax, see Enrich data by using dataflows and Convert data using dataflows.
Filter: Filter data based on a condition
To filter the data on a condition, you can use the filter
stage. The condition is specified as a field in the source data that matches a value.
Name | Description |
---|---|
builtInTransformationSettings.filter.inputs[] |
Inputs to evaluate a filter condition. |
builtInTransformationSettings.filter.expression |
Condition for the filter evaluation. |
For example, you could use the temperature
field in the source data to filter the data:
spec:
operations:
- operationType: builtInTransformation
name: transform1
builtInTransformationSettings:
filter:
- inputs:
- temperature ? $last # - $1
expression: "$1 > 20"
If the temperature
field is greater than 20, the data is passed to the next stage. If the temperature
field is less than or equal to 20, the data is filtered.
Map: Move data from one field to another
To map the data to another field with optional conversion, you can use the map
operation. The conversion is specified as a formula that uses the fields in the source data.
Name | Description |
---|---|
builtInTransformationSettings.map[].inputs[] |
Inputs for the map operation |
builtInTransformationSettings.map[].output |
Output field for the map operation |
builtInTransformationSettings.map[].expression |
Conversion formula for the map operation |
For example, you could use the temperature
field in the source data to convert the temperature to Celsius and store it in the temperatureCelsius
field. You could also enrich the source data with the location
field from the contextualization dataset:
spec:
operations:
- operationType: builtInTransformation
name: transform1
builtInTransformationSettings:
map:
- inputs:
- temperature # - $1
output: temperatureCelsius
expression: "($1 - 32) * 5/9"
- inputs:
- $context(assetDataset).location
output: location
To learn more, see Map data by using dataflows and Convert data by using dataflows.
Configure destination
To configure a destination for the dataflow, you need to specify the endpoint and a path (topic or table) for the destination.
Name | Description |
---|---|
destinationSettings.endpointRef |
Reference to the destination endpoint |
destinationSettings.dataDestination |
Destination for the data |
Configure destination endpoint reference
To configure the endpoint for the destination, you need to specify the ID and endpoint reference:
spec:
operations:
- operationType: destination
name: destination1
destinationSettings:
endpointRef: eventgrid
Configure destination path
After you have the endpoint, you can configure the path for the destination. If the destination is an MQTT or Kafka endpoint, use the path to specify the topic:
- operationType: destination
destinationSettings:
endpointRef: eventgrid
dataDestination: factory
For storage endpoints like Microsoft Fabric, use the path to specify the table name:
- operationType: destination
destinationSettings:
endpointRef: adls
dataDestination: telemetryTable