Enrich data by using dataflows

Important

This page includes instructions for managing Azure IoT Operations components using Kubernetes deployment manifests, which is in preview. This feature is provided with several limitations, and shouldn't be used for production workloads.

See the Supplemental Terms of Use for Microsoft Azure Previews for legal terms that apply to Azure features that are in beta, preview, or otherwise not yet released into general availability.

You can enrich data by using the contextualization datasets function. When incoming records are processed, you can query these datasets based on conditions that relate to the fields of the incoming record. This capability allows for dynamic interactions. Data from these datasets can be used to supplement information in the output fields and participate in complex calculations during the mapping process.

To load sample data into the state store, use the state store CLI.

For example, consider the following dataset with a few records, represented as JSON records:

{
  "Position": "Analyst",
  "BaseSalary": 70000,
  "WorkingHours": "Regular"
},
{
  "Position": "Receptionist",
  "BaseSalary": 43000,
  "WorkingHours": "Regular"
}

The mapper accesses the reference dataset stored in the Azure IoT Operations state store by using a key value based on a condition specified in the mapping configuration. Key names in the state store correspond to a dataset in the dataflow configuration.

datasets: [
  {
    key: 'position',
    inputs: [
      '$source.Position' //  - $1
      '$context.Position' // - $2
    ],
    expression: '$1 == $2'
  }
]

When a new record is being processed, the mapper performs the following steps:

  • Data request: The mapper sends a request to the state store to retrieve the dataset stored under the key Position.
  • Record matching: The mapper then queries this dataset to find the first record where the Position field in the dataset matches the Position field of the incoming record.
{
  inputs: [
    '$context(position).WorkingHours' //  - $1 
  ]
  output: 'WorkingHours'
}
{
  inputs: [
    'BaseSalary' // - - - - - - - - - - - - $1
    '$context(position).BaseSalary' //  - - $2
  ]
  output: 'BaseSalary'
  expression: 'if($1 == (), $2, $1)'
}

In this example, the WorkingHours field is added to the output record, while the BaseSalary is used conditionally only when the incoming record doesn't contain the BaseSalary field (or the value is null if it's a nullable field). The request for the contextualization data doesn't happen with every incoming record. The mapper requests the dataset and then it receives notifications from the state store about the changes, while it uses a cached version of the dataset.

It's possible to use multiple datasets:

datasets: [
  {
    key: 'position'
    inputs: [
      '$source.Position'  // - $1
      '$context.Position' // - $2
    ],
    expression: '$1 == $2'
  }
  {
    key: 'permissions'
    inputs: [
      '$source.Position'  // - $1
      '$context.Position' // - $2
    ],
    expression: '$1 == $2'
  }
]

Then use the references mixed:

inputs: [
  '$context(position).WorkingHours'  // - $1
  '$context(permissions).NightShift' // - $2
]

The input references use the key of the dataset like position or permission. If the key in state store is inconvenient to use, you can define an alias:

datasets: [
  {
    key: 'datasets.parag10.rule42 as position'
    inputs: [
      '$source.Position'  // - $1
      '$context.Position' // - $2
    ],
    expression: '$1 == $2'
  }
]

The configuration renames the dataset with the key datasets.parag10.rule42 to position.