Creazione di un flusso di dati
Importante
Anteprima delle operazioni di Azure IoT: abilitata da Azure Arc è attualmente in anteprima. Non è consigliabile usare questo software di anteprima negli ambienti di produzione.
Sarà necessario distribuire una nuova installazione di Azure IoT Operations quando viene resa disponibile una versione disponibile a livello generale. Non sarà possibile aggiornare un'installazione di anteprima.
Vedere le condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure per termini legali aggiuntivi che si applicano a funzionalità di Azure in versione beta, in anteprima o in altro modo non ancora disponibili a livello generale.
Un flusso di dati è il percorso che i dati prendono dall'origine alla destinazione con trasformazioni facoltative. È possibile configurare il flusso di dati usando il portale operazioni di Azure IoT o creando una risorsa personalizzata flusso di dati . Prima di creare un flusso di dati, è necessario configurare gli endpoint del flusso di dati per le origini dati e le destinazioni.
L'esempio seguente è una configurazione del flusso di dati con un endpoint di origine MQTT, trasformazioni e un endpoint di destinazione Kafka:
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
name: my-dataflow
spec:
profileRef: my-dataflow-profile
mode: enabled
operations:
- operationType: source
name: my-source
sourceSettings:
endpointRef: mq
dataSources:
- thermostats/+/telemetry/temperature/#
- humidifiers/+/telemetry/humidity/#
serializationFormat: json
- operationType: builtInTransformation
name: my-transformation
builtInTransformationSettings:
filter:
- inputs:
- 'temperature.Value'
- '"Tag 10".Value'
expression: "$1*$2<100000"
map:
- inputs:
- '*'
output: '*'
- inputs:
- temperature.Value
output: TemperatureF
expression: cToF($1)
- inputs:
- '"Tag 10".Value'
output: 'Tag 10'
serializationFormat: json
- operationType: destination
name: my-destination
destinationSettings:
endpointRef: kafka
dataDestination: factory
Nome | Descrizione |
---|---|
profileRef |
Riferimento al profilo del flusso di dati. |
mode |
Modalità del flusso di dati: enabled o disabled . |
operations[] |
Operazioni eseguite dal flusso di dati. |
operationType |
Tipo di operazione: source , destination o builtInTransformation . |
Esaminare le sezioni seguenti per informazioni su come configurare i tipi di operazione del flusso di dati.
Configurare l'origine
Per configurare un'origine per il flusso di dati, specificare il riferimento all'endpoint e l'origine dati. È possibile specificare un elenco di origini dati per l'endpoint. Ad esempio, argomenti MQTT o Kafka. La definizione seguente è un esempio di configurazione del flusso di dati con un endpoint di origine e un'origine dati:
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
name: mq-to-kafka
namespace: azure-iot-operations
spec:
profileRef: example-dataflow
operations:
- operationType: source
sourceSettings:
endpointRef: mq-source
dataSources:
- azure-iot-operations/data/thermostat
Nome | Descrizione |
---|---|
operationType |
source |
sourceSettings |
Impostazioni per l'operazione source . |
sourceSettings.endpointRef |
Riferimento all'endpoint source . |
sourceSettings.dataSources |
Origini dati per l'operazione source . Sono supportati i caratteri jolly # e + . |
Configurare la trasformazione
L'operazione di trasformazione consente di trasformare i dati dall'origine prima di inviarli alla destinazione. Le trasformazioni sono facoltative. Se non è necessario apportare modifiche ai dati, non includere l'operazione di trasformazione nella configurazione del flusso di dati. Più trasformazioni vengono concatenate in fasi indipendentemente dall'ordine in cui sono specificate nella configurazione.
spec:
operations:
- operationType: builtInTransformation
name: transform1
builtInTransformationSettings:
datasets:
# ...
filter:
# ...
map:
# ...
Nome | Descrizione |
---|---|
operationType |
builtInTransformation |
name |
Nome della trasformazione. |
builtInTransformationSettings |
Impostazioni per l'operazione builtInTransformation . |
builtInTransformationSettings.datasets |
Aggiungere altri dati ai dati di origine in base a un set di dati e a una condizione per la corrispondenza. |
builtInTransformationSettings.filter |
Filtrare i dati in base a una condizione. |
builtInTransformationSettings.map |
Spostare i dati da un campo a un altro con una conversione facoltativa. |
Arricchire: Aggiungere dati di riferimento
Per arricchire i dati, è possibile usare il set di dati di riferimento nell'archivio stati distribuito di Azure IoT Operations (DSS). Il set di dati viene usato per aggiungere dati aggiuntivi ai dati di origine in base a una condizione. La condizione viene specificata come campo nei dati di origine che corrispondono a un campo nel set di dati.
Nome | Descrizione |
---|---|
builtInTransformationSettings.datasets.key |
Set di dati usato per l'arricchimento (chiave in DSS). |
builtInTransformationSettings.datasets.expression |
Condizione per l'operazione di arricchimento. |
I nomi delle chiavi nell'archivio di stati distribuiti corrispondono a un set di dati nella configurazione del flusso di dati.
Ad esempio, è possibile usare il campo deviceId
nei dati di origine per trovare la corrispondenza con il campo asset
nel set di dati:
spec:
operations:
- operationType: builtInTransformation
name: transform1
builtInTransformationSettings:
datasets:
- key: assetDataset
inputs:
- $source.deviceId # ------------- $1
- $context(assetDataset).asset # - $2
expression: $1 == $2
Se il set di dati ha un record con il campo asset
, simile al seguente:
{
"asset": "thermostat1",
"location": "room1",
"manufacturer": "Contoso"
}
I dati dell'origine con la corrispondenza dei deviceId
campi hanno i location
campi e manufacturer
disponibili in filter
e map
le thermostat1
fasi.
È possibile caricare dati di esempio nel DSS usando l'esempio di strumento set di DSS.
Per altre informazioni sulla sintassi delle condizioni, vedere Arricchire i dati usando flussi di dati e Convertire i dati tramite flussi di dati.
Filtro: filtrare i dati in base a una condizione
Per filtrare i dati in base a una condizione, è possibile usare la fase filter
. La condizione viene specificata come campo nei dati di origine che corrispondono a un valore.
Nome | Descrizione |
---|---|
builtInTransformationSettings.filter.inputs[] |
Input per valutare una condizione di filtro. |
builtInTransformationSettings.filter.expression |
Condizione per la valutazione del filtro. |
Ad esempio, è possibile usare il campo temperature
nei dati di origine per filtrare i dati:
spec:
operations:
- operationType: builtInTransformation
name: transform1
builtInTransformationSettings:
filter:
- inputs:
- temperature ? $last # - $1
expression: "$1 > 20"
Se il campo temperature
è maggiore di 20, i dati vengono passati alla fase successiva. Se il campo temperature
è minore o uguale a 20, i dati vengono filtrati.
Mappa: spostare i dati da un campo a un altro
Per eseguire il mapping dei dati a un altro campo con la conversione facoltativa, è possibile usare l'operazione map
. La conversione viene specificata come formula che utilizza i campi nei dati di origine.
Nome | Descrizione |
---|---|
builtInTransformationSettings.map[].inputs[] |
Input per l'operazione di mapping |
builtInTransformationSettings.map[].output |
Campo di output per l'operazione di mapping |
builtInTransformationSettings.map[].expression |
Formula di conversione per l'operazione di mapping |
Ad esempio, è possibile usare il campo temperature
nei dati di origine per convertire la temperatura in Celsius e archiviarla nel campo temperatureCelsius
. È anche possibile arricchire i dati di origine con il campo location
dal set di dati di contestualizzazione:
spec:
operations:
- operationType: builtInTransformation
name: transform1
builtInTransformationSettings:
map:
- inputs:
- temperature # - $1
output: temperatureCelsius
expression: "($1 - 32) * 5/9"
- inputs:
- $context(assetDataset).location
output: location
Per altre informazioni, vedere Eseguire il mapping dei dati usando flussi di dati e Convertire i dati usando i flussi di dati.
Configurare la destinazione
Per configurare una destinazione per il flusso di dati, è necessario specificare l'endpoint e un percorso (argomento o tabella) per la destinazione.
Nome | Descrizione |
---|---|
destinationSettings.endpointRef |
Riferimento all'endpoint destination |
destinationSettings.dataDestination |
Destinazione dei dati |
Configurare le informazioni di riferimento sugli endpoint di destinazione
Per configurare l'endpoint per la destinazione, è necessario specificare l'ID e il riferimento all'endpoint:
spec:
operations:
- operationType: destination
name: destination1
destinationSettings:
endpointRef: eventgrid
Configurare il percorso di destinazione
Dopo aver ottenuto l'endpoint, è possibile configurare il percorso per la destinazione. Se la destinazione è un endpoint MQTT o Kafka, usare il percorso per specificare l'argomento:
- operationType: destination
destinationSettings:
endpointRef: eventgrid
dataDestination: factory
Per gli endpoint di archiviazione come Microsoft Fabric, usare il percorso per specificare il nome della tabella:
- operationType: destination
destinationSettings:
endpointRef: adls
dataDestination: telemetryTable