Creazione di un flusso di dati

Importante

Anteprima delle operazioni di Azure IoT: abilitata da Azure Arc è attualmente in anteprima. Non è consigliabile usare questo software di anteprima negli ambienti di produzione.

Sarà necessario distribuire una nuova installazione di Azure IoT Operations quando viene resa disponibile una versione disponibile a livello generale. Non sarà possibile aggiornare un'installazione di anteprima.

Vedere le condizioni per l'utilizzo supplementari per le anteprime di Microsoft Azure per termini legali aggiuntivi che si applicano a funzionalità di Azure in versione beta, in anteprima o in altro modo non ancora disponibili a livello generale.

Un flusso di dati è il percorso che i dati prendono dall'origine alla destinazione con trasformazioni facoltative. È possibile configurare il flusso di dati usando il portale operazioni di Azure IoT o creando una risorsa personalizzata flusso di dati . Prima di creare un flusso di dati, è necessario configurare gli endpoint del flusso di dati per le origini dati e le destinazioni.

L'esempio seguente è una configurazione del flusso di dati con un endpoint di origine MQTT, trasformazioni e un endpoint di destinazione Kafka:

apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
  name: my-dataflow
spec:
  profileRef: my-dataflow-profile
  mode: enabled
  operations:
    - operationType: source
      name: my-source
      sourceSettings:
        endpointRef: mq
        dataSources:
          - thermostats/+/telemetry/temperature/#
          - humidifiers/+/telemetry/humidity/#
        serializationFormat: json
    - operationType: builtInTransformation
      name: my-transformation
      builtInTransformationSettings:
        filter:
          - inputs:
              - 'temperature.Value'
              - '"Tag 10".Value'
            expression: "$1*$2<100000"
        map:
          - inputs:
              - '*'
            output: '*'
          - inputs:
              - temperature.Value
            output: TemperatureF
            expression: cToF($1)
          - inputs:
              - '"Tag 10".Value'
            output: 'Tag 10'
        serializationFormat: json
    - operationType: destination
      name: my-destination
      destinationSettings:
        endpointRef: kafka
        dataDestination: factory
Nome Descrizione
profileRef Riferimento al profilo del flusso di dati.
mode Modalità del flusso di dati: enabled o disabled.
operations[] Operazioni eseguite dal flusso di dati.
operationType Tipo di operazione: source, destinationo builtInTransformation.

Esaminare le sezioni seguenti per informazioni su come configurare i tipi di operazione del flusso di dati.

Configurare l'origine

Per configurare un'origine per il flusso di dati, specificare il riferimento all'endpoint e l'origine dati. È possibile specificare un elenco di origini dati per l'endpoint. Ad esempio, argomenti MQTT o Kafka. La definizione seguente è un esempio di configurazione del flusso di dati con un endpoint di origine e un'origine dati:

apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
  name: mq-to-kafka
  namespace: azure-iot-operations
spec:
  profileRef: example-dataflow
  operations:
    - operationType: source
      sourceSettings:
        endpointRef: mq-source
        dataSources:
        - azure-iot-operations/data/thermostat
Nome Descrizione
operationType source
sourceSettings Impostazioni per l'operazione source .
sourceSettings.endpointRef Riferimento all'endpoint source .
sourceSettings.dataSources Origini dati per l'operazione source . Sono supportati i caratteri jolly # e +.

Configurare la trasformazione

L'operazione di trasformazione consente di trasformare i dati dall'origine prima di inviarli alla destinazione. Le trasformazioni sono facoltative. Se non è necessario apportare modifiche ai dati, non includere l'operazione di trasformazione nella configurazione del flusso di dati. Più trasformazioni vengono concatenate in fasi indipendentemente dall'ordine in cui sono specificate nella configurazione.

spec:
  operations:
  - operationType: builtInTransformation
    name: transform1
    builtInTransformationSettings:
      datasets:
        # ...
      filter:
        # ...
      map:
        # ...
Nome Descrizione
operationType builtInTransformation
name Nome della trasformazione.
builtInTransformationSettings Impostazioni per l'operazione builtInTransformation .
builtInTransformationSettings.datasets Aggiungere altri dati ai dati di origine in base a un set di dati e a una condizione per la corrispondenza.
builtInTransformationSettings.filter Filtrare i dati in base a una condizione.
builtInTransformationSettings.map Spostare i dati da un campo a un altro con una conversione facoltativa.

Arricchire: Aggiungere dati di riferimento

Per arricchire i dati, è possibile usare il set di dati di riferimento nell'archivio stati distribuito di Azure IoT Operations (DSS). Il set di dati viene usato per aggiungere dati aggiuntivi ai dati di origine in base a una condizione. La condizione viene specificata come campo nei dati di origine che corrispondono a un campo nel set di dati.

Nome Descrizione
builtInTransformationSettings.datasets.key Set di dati usato per l'arricchimento (chiave in DSS).
builtInTransformationSettings.datasets.expression Condizione per l'operazione di arricchimento.

I nomi delle chiavi nell'archivio di stati distribuiti corrispondono a un set di dati nella configurazione del flusso di dati.

Ad esempio, è possibile usare il campo deviceId nei dati di origine per trovare la corrispondenza con il campo asset nel set di dati:

spec:
  operations:
  - operationType: builtInTransformation
    name: transform1
    builtInTransformationSettings:
      datasets:
      - key: assetDataset
        inputs:
          - $source.deviceId # ------------- $1
          - $context(assetDataset).asset # - $2
        expression: $1 == $2

Se il set di dati ha un record con il campo asset, simile al seguente:

{
  "asset": "thermostat1",
  "location": "room1",
  "manufacturer": "Contoso"
}

I dati dell'origine con la corrispondenza dei deviceId campi hanno i location campi e manufacturer disponibili in filter e map le thermostat1 fasi.

È possibile caricare dati di esempio nel DSS usando l'esempio di strumento set di DSS.

Per altre informazioni sulla sintassi delle condizioni, vedere Arricchire i dati usando flussi di dati e Convertire i dati tramite flussi di dati.

Filtro: filtrare i dati in base a una condizione

Per filtrare i dati in base a una condizione, è possibile usare la fase filter. La condizione viene specificata come campo nei dati di origine che corrispondono a un valore.

Nome Descrizione
builtInTransformationSettings.filter.inputs[] Input per valutare una condizione di filtro.
builtInTransformationSettings.filter.expression Condizione per la valutazione del filtro.

Ad esempio, è possibile usare il campo temperature nei dati di origine per filtrare i dati:

spec:
  operations:
  - operationType: builtInTransformation
    name: transform1
    builtInTransformationSettings:
      filter:
        - inputs:
          - temperature ? $last # - $1
          expression: "$1 > 20"

Se il campo temperature è maggiore di 20, i dati vengono passati alla fase successiva. Se il campo temperature è minore o uguale a 20, i dati vengono filtrati.

Mappa: spostare i dati da un campo a un altro

Per eseguire il mapping dei dati a un altro campo con la conversione facoltativa, è possibile usare l'operazione map. La conversione viene specificata come formula che utilizza i campi nei dati di origine.

Nome Descrizione
builtInTransformationSettings.map[].inputs[] Input per l'operazione di mapping
builtInTransformationSettings.map[].output Campo di output per l'operazione di mapping
builtInTransformationSettings.map[].expression Formula di conversione per l'operazione di mapping

Ad esempio, è possibile usare il campo temperature nei dati di origine per convertire la temperatura in Celsius e archiviarla nel campo temperatureCelsius. È anche possibile arricchire i dati di origine con il campo location dal set di dati di contestualizzazione:

spec:
  operations:
  - operationType: builtInTransformation
    name: transform1
    builtInTransformationSettings:
      map:
        - inputs:
          - temperature # - $1
          output: temperatureCelsius
          expression: "($1 - 32) * 5/9"
        - inputs:
          - $context(assetDataset).location  
          output: location

Per altre informazioni, vedere Eseguire il mapping dei dati usando flussi di dati e Convertire i dati usando i flussi di dati.

Configurare la destinazione

Per configurare una destinazione per il flusso di dati, è necessario specificare l'endpoint e un percorso (argomento o tabella) per la destinazione.

Nome Descrizione
destinationSettings.endpointRef Riferimento all'endpoint destination
destinationSettings.dataDestination Destinazione dei dati

Configurare le informazioni di riferimento sugli endpoint di destinazione

Per configurare l'endpoint per la destinazione, è necessario specificare l'ID e il riferimento all'endpoint:

spec:
  operations:
  - operationType: destination
    name: destination1
    destinationSettings:
      endpointRef: eventgrid

Configurare il percorso di destinazione

Dopo aver ottenuto l'endpoint, è possibile configurare il percorso per la destinazione. Se la destinazione è un endpoint MQTT o Kafka, usare il percorso per specificare l'argomento:

- operationType: destination
  destinationSettings:
    endpointRef: eventgrid
    dataDestination: factory

Per gli endpoint di archiviazione come Microsoft Fabric, usare il percorso per specificare il nome della tabella:

- operationType: destination
  destinationSettings:
    endpointRef: adls
    dataDestination: telemetryTable