Configure dataflow endpoints
Important
Azure IoT Operations Preview – enabled by Azure Arc is currently in preview. You shouldn't use this preview software in production environments.
You'll need to deploy a new Azure IoT Operations installation when a generally available release is made available. You won't be able to upgrade a preview installation.
See the Supplemental Terms of Use for Microsoft Azure Previews for legal terms that apply to Azure features that are in beta, preview, or otherwise not yet released into general availability.
To get started with dataflows, you need to configure endpoints. An endpoint is the connection point for the dataflow. You can use an endpoint as a source or destination for the dataflow. Some endpoint types can be used as both sources and destinations, while others are for destinations only. A dataflow needs at least one source endpoint and one destination endpoint.
The following example shows a custom resource definition with all of the configuration options. The required fields are dependent on the endpoint type. Review the sections for each endpoint type for configuration guidance.
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: DataflowEndpoint
metadata:
name: <endpoint-name>
spec:
endpointType: <endpointType> # mqtt, kafka, or localStorage
authentication:
method: <method> # systemAssignedManagedIdentity, x509Credentials, userAssignedManagedIdentity, or serviceAccountToken
systemAssignedManagedIdentitySettings: # Required if method is systemAssignedManagedIdentity
audience: https://eventgrid.azure.net
### OR
# x509CredentialsSettings: # Required if method is x509Credentials
# certificateSecretName: x509-certificate
### OR
# userAssignedManagedIdentitySettings: # Required if method is userAssignedManagedIdentity
# clientId: <id>
# tenantId: <id>
# audience: https://eventgrid.azure.net
### OR
# serviceAccountTokenSettings: # Required if method is serviceAccountToken
# audience: my-audience
mqttSettings: # Required if endpoint type is mqtt
host: example.westeurope-1.ts.eventgrid.azure.net:8883
tls: # Omit for no TLS or MQTT.
mode: <mode> # enabled or disabled
trustedCaCertificateConfigMap: ca-certificates
sharedSubscription:
groupMinimumShareNumber: 3 # Required if shared subscription is enabled.
groupName: group1 # Required if shared subscription is enabled.
clientIdPrefix: <prefix>
retain: keep
sessionExpirySeconds: 3600
qos: 1
protocol: mqtt
maxInflightMessages: 100
Name | Description |
---|---|
endpointType |
Type of the endpoint. Values: mqtt , kafka , dataExplorer , dataLakeStorage , fabricOneLake , or localStorage . |
authentication.method |
Method of authentication. Values: systemAssignedManagedIdentity , x509Credentials , userAssignedManagedIdentity , or serviceAccountToken . |
authentication.systemAssignedManagedIdentitySettings.audience |
Audience of the service to authenticate against. Defaults to https://eventgrid.azure.net . |
authentication.x509CredentialsSettings.certificateSecretName |
Secret name of the X.509 certificate. |
authentication.userAssignedManagedIdentitySettings.clientId |
Client ID for the user-assigned managed identity. |
authentication.userAssignedManagedIdentitySettings.tenantId |
Tenant ID. |
authentication.userAssignedManagedIdentitySettings.audience |
Audience of the service to authenticate against. Defaults to https://eventgrid.azure.net . |
authentication.serviceAccountTokenSettings.audience |
Audience of the service account. Optional, defaults to the broker internal service account audience. |
mqttSettings.host |
Host of the MQTT broker in the form of <hostname>:<port>. Connects to MQTT broker if omitted. |
mqttSettings.tls |
TLS configuration. Omit for no TLS or MQTT broker. |
mqttSettings.tls.mode |
Enable or disable TLS. Values: enabled or disabled . Defaults to disabled . |
mqttSettings.tls.trustedCaCertificateConfigMap |
Trusted certificate authority (CA) certificate config map. No CA certificate if omitted. No CA certificate works for public endpoints like Azure Event Grid. |
mqttSettings.sharedSubscription |
Shared subscription settings. No shared subscription if omitted. |
mqttSettings.sharedSubscription.groupMinimumShareNumber |
Number of clients to use for shared subscription. |
mqttSettings.sharedSubscription.groupName |
Shared subscription group name. |
mqttSettings.clientIdPrefix |
Client ID prefix. Client ID generated by the dataflow is <prefix>-id. No prefix if omitted. |
mqttSettings.retain |
Whether or not to keep the retain setting. Values: keep or never . Defaults to keep . |
mqttSettings.sessionExpirySeconds |
Session expiry in seconds. Defaults to 3600 . |
mqttSettings.qos |
Quality of service. Values: 0 or 1 . Defaults to 1 . |
mqttSettings.protocol |
Use MQTT or web sockets. Values: mqtt or websockets . Defaults to mqtt . |
mqttSettings.maxInflightMessages |
The maximum number of messages to keep in flight. For subscribe, it's the receive maximum. For publish, it's the maximum number of messages to send before waiting for an acknowledgment. Default is 100 . |
Endpoint types for use as sources and destinations
The following endpoint types are used as sources and destinations.
MQTT
MQTT endpoints are used for MQTT sources and destinations. You can configure the endpoint, Transport Layer Security (TLS), authentication, and other settings.
MQTT broker
To configure an MQTT broker endpoint with default settings, you can omit the host field, along with other optional fields. This configuration allows you to connect to the default MQTT broker without any extra configuration in a durable way, no matter how the broker changes.
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: DataflowEndpoint
metadata:
name: mq
spec:
endpointType: mqtt
authentication:
method: serviceAccountToken
serviceAccountTokenSettings:
audience: aio-mq
mqttSettings:
{}
Event Grid
To configure an Azure Event Grid MQTT broker endpoint, use managed identity for authentication.
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: DataflowEndpoint
metadata:
name: eventgrid
spec:
endpointType: mqtt
authentication:
method: systemAssignedManagedIdentity
systemAssignedManagedIdentitySettings:
audience: "https://eventgrid.azure.net"
mqttSettings:
host: example.westeurope-1.ts.eventgrid.azure.net:8883
tls:
mode: Enabled
Other MQTT brokers
For other MQTT brokers, you can configure the endpoint, TLS, authentication, and other settings as needed.
spec:
endpointType: mqtt
authentication:
...
mqttSettings:
host: example.mqttbroker.com:8883
tls:
mode: Enabled
trustedCaCertificateConfigMap: <your CA certificate config map>
Under authentication
, you can configure the authentication method for the MQTT broker. Supported methods include X.509:
authentication:
method: x509Credentials
x509CredentialsSettings:
certificateSecretName: <your x509 secret name>
Important
When you use X.509 authentication with an Event Grid MQTT broker, go to the Event Grid namespace > Configuration and check these settings:
- Enable MQTT: Select the checkbox.
- Enable alternative client authentication name sources: Select the checkbox.
- Certificate Subject Name: Select this option in the dropdown list.
- Maximum client sessions per authentication name: Set to 3 or more.
The alternative client authentication and maximum client sessions options allow dataflows to use client certificate subject name for authentication instead of MQTT CONNECT Username
. This capability is important so that dataflows can spawn multiple instances and still be able to connect. To learn more, see Event Grid MQTT client certificate authentication and Multi-session support.
System-assigned managed identity:
authentication:
method: systemAssignedManagedIdentity
systemAssignedManagedIdentitySettings:
# Audience of the service to authenticate against
# Optional; defaults to the audience for Event Grid MQTT Broker
audience: https://eventgrid.azure.net
User-assigned managed identity:
authentication:
method: userAssignedManagedIdentity
userAssignedManagedIdentitySettings:
clientId: <id>
tenantId: <id>
Kubernetes SAT:
authentication:
method: serviceAccountToken
serviceAccountTokenSettings:
audience: <your service account audience>
You can also configure shared subscriptions, QoS, MQTT version, client ID prefix, keep alive, clean session, session expiry, retain, and other settings.
spec:
endpointType: mqtt
mqttSettings:
sharedSubscription:
groupMinimumShareNumber: 3
groupName: group1
qos: 1
mqttVersion: v5
clientIdPrefix: dataflow
keepRetain: enabled
Kafka
Kafka endpoints are used for Kafka sources and destinations. You can configure the endpoint, TLS, authentication, and other settings.
Azure Event Hubs
To configure an Azure Event Hubs Kafka, we recommend that you use managed identity for authentication.
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: DataflowEndpoint
metadata:
name: kafka
spec:
endpointType: kafka
authentication:
method: systemAssignedManagedIdentity
systemAssignedManagedIdentitySettings:
audience: <your Event Hubs namespace>.servicebus.windows.net
kafkaSettings:
host: <your Event Hubs namespace>.servicebus.windows.net:9093
tls:
mode: Enabled
consumerGroupId: mqConnector
Other Kafka brokers
For example, to configure a Kafka endpoint, set the host, TLS, authentication, and other settings as needed.
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: DataflowEndpoint
metadata:
name: kafka
spec:
endpointType: kafka
authentication:
...
kafkaSettings:
host: example.kafka.com:9093
tls:
mode: Enabled
consumerGroupId: mqConnector
Under authentication
, you can configure the authentication method for the Kafka broker. Supported methods include SASL, X.509, system-assigned managed identity, and user-assigned managed identity.
authentication:
method: sasl
saslSettings:
saslType: PLAIN
tokenSecretName: <your token secret name>
# OR
method: x509Credentials
x509CredentialsSettings:
certificateSecretName: <your x509 secret name>
# OR
method: systemAssignedManagedIdentity
systemAssignedManagedIdentitySettings:
audience: https://<your Event Hubs namespace>.servicebus.windows.net
# OR
method: userAssignedManagedIdentity
userAssignedManagedIdentitySettings:
clientId: <id>
tenantId: <id>
Configure settings specific to source endpoints
For Kafka endpoints, you can configure settings specific for using the endpoint as a source. These settings have no effect if the endpoint is used as a destination.
spec:
endpointType: kafka
kafkaSettings:
consumerGroupId: fromMq
Configure settings specific to destination endpoints
For Kafka endpoints, you can configure settings specific for using the endpoint as a destination. These settings have no effect if the endpoint is used as a source.
spec:
endpointType: kafka
kafkaSettings:
compression: gzip
batching:
latencyMs: 100
maxBytes: 1000000
maxMessages: 1000
partitionStrategy: static
kafkaAcks: all
copyMqttProperties: enabled
Important
By default, data flows don't send MQTT message user properties to Kafka destinations. These user properties include values such as subject
that stores the name of the asset sending the message. To include user properties in the Kafka message, you must update the DataflowEndpoint
configuration to include copyMqttProperties: enabled
.
Endpoint type for destinations only
The following endpoint type is used for destinations only.
Local storage and Edge Storage Accelerator
Use the local storage option to send data to a locally available persistent volume, through which you can upload data via Edge Storage Accelerator edge volumes. In this case, the format must be Parquet.
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: DataflowEndpoint
metadata:
name: esa
spec:
endpointType: localStorage
localStorageSettings:
persistentVolumeClaimRef: <your PVC name>