Configurar fluxos de dados no Azure IoT Operations
Importante
A Versão Prévia das Operações da Internet das Coisas do Azure – habilitadas pelo Azure Arc – está atualmente em versão prévia. Você não deve usar esse software em versão prévia em ambientes de produção.
Você precisará implantar uma nova instalação das Operações da Internet das Coisas do Azure quando uma versão em disponibilidade geral for disponibilizada. Você não poderá atualizar uma instalação de versão prévia.
Veja os Termos de Uso Complementares para Versões Prévias do Microsoft Azure para obter termos legais que se aplicam aos recursos do Azure que estão em versão beta, versão prévia ou que, de outra forma, ainda não foram lançados em disponibilidade geral.
Um fluxo de dados é o caminho que os dados levam da origem para o destino com transformações opcionais. Você pode configurar o fluxo de dados criando um recurso personalizado Fluxo de dados ou usando o portal do Azure IoT Operations Studio. Um fluxo de dados é composto de três partes: a origem, a transformação e o destino.
Para definir a origem e o destino, você precisa configurar os pontos de extremidade do fluxo de dados. A transformação é opcional e pode incluir operações como enriquecimento de dados, filtragem de dados e mapeamento de dados para outro campo.
Você pode usar a experiência de operações no Operações do Azure IoT para criar um fluxo de dados. A experiência de operações fornece uma interface visual para configurar o fluxo de dados. Você também pode usar o Bicep para criar um fluxo de dados usando um arquivo de modelo do Bicep ou usar o Kubernetes para criar um fluxo de dados usando um arquivo YAML.
Continue lendo para saber como configurar a fonte, a transformação e o destino.
Pré-requisitos
Você pode implantar fluxos de dados assim que tiver uma instância das Operações do Azure IoT Versão Prévia usando o perfil de fluxo de dados e o ponto de extremidade padrão. No entanto, talvez você queira configurar perfis de fluxo de dados e pontos de extremidade para personalizar o fluxo de dados.
Perfil de fluxo de dados
O perfil de fluxo de dados especifica o número de instâncias a serem usadas pelos fluxos de dados sob ele. Se você não precisar de vários grupos de fluxos de dados com diferentes configurações de escala, poderá usar o perfil de fluxo de dados padrão. Para saber como configurar um perfil de fluxo de dados, confira Configurar perfis de fluxo de dados.
Pontos de extremidade de fluxo de dados
Os pontos de extremidade de fluxo de dados são necessários para configurar a fonte e o destino do fluxo de dados. Para começar rapidamente, você pode usar o ponto de extremidade de fluxo de dados padrão para o agente MQTT local. Você também pode criar outros tipos de pontos de extremidade de fluxo de dados, como Kafka, Hubs de Eventos ou Azure Data Lake Storage. Para saber como configurar cada tipo de ponto de extremidade de fluxo de dados, veja Configurar pontos de extremidade de fluxo de dados.
Introdução
Quando tiver os pré-requisitos, você poderá começar a criar um fluxo de dados.
Para criar um fluxo de dados na experiência de operações, selecione Fluxo de dados>Criar fluxo de dados. Em seguida, você verá a página em que poderá configurar a fonte, a transformação e o destino do fluxo de dados.
Examine as seções a seguir para saber como configurar os tipos de operação do fluxo de dados.
Origem
Para configurar uma fonte para o fluxo de dados, especifique a referência do ponto de extremidade e uma lista de fontes de dados para o ponto de extremidade.
Usar Ativo como fonte
Você pode usar um ativo como fonte para o fluxo de dados. O uso de um ativo como fonte só está disponível na experiência de operações.
Em Detalhes da fonte, selecione Ativo.
Selecione o ativo que você deseja usar como ponto de extremidade de origem.
Selecione Continuar.
Uma lista de pontos de dados para o ativo selecionado é exibida.
Selecione Aplicar para usar o ativo como o ponto de extremidade de origem.
Usar o ponto de extremidade MQTT padrão como fonte
Em Detalhes da fonte, selecione MQTT.
Insira as seguintes configurações para a fonte MQTT:
Configuração Descrição Tópico do MQTT O filtro de tópico MQTT a ser assinado para as mensagens de entrada. Confira Configurar tópicos do MQTT ou do Kafka. Esquema de mensagem O esquema a ser usado para desserializar as mensagens de entrada. Confira Especificar o esquema para desserializar dados. Escolha Aplicar.
Para obter mais informações sobre o ponto de extremidade MQTT padrão e a criação de um ponto de extremidade MQTT como fonte de fluxo de dados, confira Ponto de extremidade MQTT.
Usar o ponto de extremidade de fluxo de dados MQTT ou Kafka personalizado como fonte
Se você criou um ponto de extremidade de fluxo de dados MQTT ou Kafka personalizado (por exemplo, para usar com a Grade de Eventos ou Hubs de Eventos), poderá usá-lo como a fonte para o fluxo de dados. Lembre-se de que os pontos de extremidade do tipo de armazenamento, como Data Lake ou Fabric OneLake, não podem ser usados como fonte.
Para configurar, use o YAML do Kubernetes ou o Bicep. Substitua os valores de espaço reservado por seu nome de ponto de extremidade personalizado e tópicos.
No momento, não há suporte para o uso de um ponto de extremidade MQTT ou Kafka personalizado como fonte na experiência de operações.
Configurar fontes de dados (tópicos MQTT ou Kafka)
Você pode especificar vários tópicos MQTT ou Kafka em uma fonte sem precisar modificar a configuração do ponto de extremidade do fluxo de dados. Isso significa que o mesmo ponto de extremidade pode ser reutilizado em vários fluxos de dados, mesmo que os tópicos variem. Para obter mais informações, confira Reutilizar pontos de extremidade de fluxo de dados.
Tópicos do MQTT
Quando a fonte for um ponto de extremidade MQTT (Grade de Eventos incluída), você pode usar o filtro de tópico MQTT para assinar mensagens de entrada. O filtro de tópico pode incluir curingas para assinar vários tópicos. Por exemplo, thermostats/+/telemetry/temperature/#
assina todas as mensagens de telemetria de temperatura dos termostatos. Para configurar os filtros de tópico do MQTT:
Nos Detalhes da fonte de fluxo de dados da experiência de operações, selecione MQTT e, em seguida, use o campo Tópico MQTT para especificar o filtro de tópico MQTT a ser assinado para as mensagens de entrada.
Observação
Somente um filtro de tópico MQTT pode ser especificado na experiência de operações. Para usar vários filtros de tópico MQTT, use o Bicep ou o Kubernetes.
Tópico do Kafka
Quando a fonte for um ponto de extremidade Kafka (Hubs de Eventos incluídos), especifique os tópicos kafka individuais aos quais assinar mensagens de entrada. Não há suporte para curingas, portanto, você deve especificar cada tópico estaticamente.
Observação
Ao usar os Hubs de Eventos por meio do ponto de extremidade do Kafka, cada hub de eventos individual dentro do namespace é o tópico do Kafka. Por exemplo, se você tiver um namespace dos Hubs de Eventos com dois hubs de eventos, thermostats
e humidifiers
, poderá especificar cada hub de eventos como um tópico do Kafka.
Para configurar os tópicos do Kafka:
No momento, não há suporte para o uso de um ponto de extremidade do Kafka como uma fonte na experiência de operações.
Especificar esquema para desserializar dados
Se os dados de origem tiverem campos opcionais ou campos com tipos diferentes, especifique um esquema de desserialização para garantir a consistência. Por exemplo, os dados podem ter campos que não estão presentes em todas as mensagens. Sem o esquema, a transformação não pode manipular esses campos, pois eles teriam valores vazios. Com o esquema, você pode especificar valores padrão ou ignorar os campos.
Especificar o esquema só é relevante ao usar a fonte MQTT ou Kafka. Se a fonte for um ativo, o esquema será automaticamente inferido da definição de ativo.
Para configurar o esquema usado para desserializar as mensagens de entrada de uma fonte:
Nos Detalhes da fonte de fluxo de dados da experiência de operações, selecione MQTT e use o campo Esquema de mensagem para especificar o esquema. Você pode usar o botão Carregar para carregar um arquivo de esquema primeiro. Para saber mais, confira Entenda os esquemas de mensagens.
Assinaturas compartilhadas
Para usar assinaturas compartilhadas com fontes MQTT, você pode especificar o tópico de assinatura compartilhada no formato de $shared/<GROUP_NAME>/<TOPIC_FILTER>
.
Em Detalhes da fonte do fluxo de dados da experiência de operações, selecione MQTT e use o campo Tópico MQTT para especificar o grupo de assinatura compartilhada e o tópico.
Observação
Se a contagem de instâncias no perfil do fluxo de dados for maior que 1, o prefixo do tópico da assinatura compartilhada será automaticamente adicionado ao filtro de tópicos.
Transformação
A operação de transformação é o local em que você pode transformar os dados da fonte antes de enviá-los para o destino. As transformações são opcionais. Se você não precisar fazer alterações nos dados, não inclua a operação de transformação na configuração do fluxo de dados. Várias transformações são encadeadas em fases, independentemente da ordem em que são especificadas na configuração. A ordem dos estágios é sempre:
- Enriquecer: adicione dados adicionais aos dados de origem, considerando um conjunto de dados e uma condição correspondente.
- Filtro: Filtra os dados com base em uma condição.
- Mapa: mova dados de um campo para outro com uma conversão opcional.
Na experiência de operações, selecione Fluxo de dados>Add transformação (opcional).
Enriquecer: adicionar dados de referência
Para enriquecer os dados, você pode usar o conjunto de dados de referência no DSS (repositório de estado distribuído) das Operações do Azure IoT. O conjunto de dados é usado para adicionar dados extras aos dados de origem com base em uma condição. A condição é especificada como um campo nos dados de origem que corresponde a um campo no conjunto de dados.
Você pode carregar dados de exemplo no DSS usando a amostra de ferramenta de conjunto do DSS. Os nomes de chave no repositório de estado distribuído correspondem a um conjunto de dados na configuração do fluxo de dados.
Atualmente, a operação de enriquecimento não está disponível na experiência de operações.
Para obter mais informações sobre a sintaxe de condição, confira Enriquecer dados usando fluxos de dados e Converter dados usando fluxos de dados.
Filtro: filtrar dados com base em uma condição
Para filtrar os dados em uma condição, você pode usar o estágio filter
. A condição é especificada como um campo nos dados de origem que corresponde a um valor.
Em Transformar (opcional), selecione Filtrar>Adicionar.
Escolha os pontos de dados a serem incluídos no conjunto de dados.
Adicione uma condição de filtro e uma descrição.
Escolha Aplicar.
Por exemplo, você pode usar uma condição de filtro como temperature > 20
para filtrar dados menores ou iguais a 20 com base no campo de temperatura.
Mapeamento: mover dados de um campo para outro
Para mapear os dados para outro campo com conversão opcional, você pode usar a operação map
. A conversão é especificada como uma fórmula que usa os campos nos dados de origem.
Na experiência de operações, atualmente há suporte para mapeamento usando transformações de computação.
Em Transformar (opcional), selecione Calcular >Adicionar.
Insira os campos e expressões obrigatórios.
Escolha Aplicar.
Para saber mais, confira Mapear dados usando fluxos de dados e Converter dados usando fluxos de dados.
Serializar dados de acordo com um esquema
Se você quiser serializar os dados antes de enviá-los ao destino, precisará especificar um esquema e um formato de serialização. Caso contrário, os dados são serializados em JSON com os tipos inferidos. Os pontos de extremidade de armazenamento, como o Microsoft Fabric ou o Azure Data Lake, exigem um esquema para garantir a consistência dos dados. Os formatos de serialização com suporte são o Parquet e o Delta.
Atualmente, não há suporte para a especificação do esquema de saída e da serialização na experiência de operações.
Para obter mais informações sobre o registro de esquemas, veja Entender esquemas de mensagens.
Destino
Para configurar um destino para o fluxo de dados, especifique a referência do ponto de extremidade e o destino dos dados. Você pode especificar uma lista de destinos de dados para o ponto de extremidade.
Para enviar dados a um destino que não seja o agente MQTT local, crie um ponto de extremidade de fluxo de dados. Para saber como, confira Configurar pontos de extremidade de fluxo de dados.
Importante
Os pontos de extremidade de armazenamento exigem uma referência de esquema. Se você criou pontos de extremidade de destino de armazenamento para o Microsoft Fabric OneLake, ADLS Gen 2, Azure Data Explorer e Armazenamento Local, especifique a referência do esquema.
Selecione o ponto de extremidade do fluxo de dados a ser usado como destino.
Selecione Continuar para configurar o destino.
Insira as configurações necessárias para o destino, incluindo o tópico ou a tabela para o qual enviar os dados. Confira Configurar o destino dos dados (tópico, contêiner ou tabela) para obter mais informações.
Configurar o destino dos dados (tópico, contêiner ou tabela)
Semelhante às fontes de dados, o destino dos dados é um conceito usado para manter os pontos de extremidade do fluxo de dados reutilizáveis em vários fluxos de dados. Essencialmente, ele representa o subdiretório na configuração do ponto de extremidade do fluxo de dados. Por exemplo, se o ponto de extremidade do fluxo de dados for um ponto de extremidade de armazenamento, o destino de dados será a tabela na conta de armazenamento. Se o ponto de extremidade do fluxo de dados for um ponto de extremidade do Kafka, o destino dos dados será o tópico do Kafka.
Tipo de ponto de extremidade | Significado do destino de dados | Descrição |
---|---|---|
MQTT (ou Grade de Eventos) | Tópico | O tópico MQTT para onde os dados são enviados. Há suporte apenas para tópicos estáticos, sem curingas. |
Kafka (ou Hubs de Eventos) | Tópico | O tópico do Kafka para onde os dados são enviados. Há suporte apenas para tópicos estáticos, sem curingas. Se o ponto de extremidade for um namespace dos Hubs de Eventos, o destino dos dados será o hub de eventos individual dentro do namespace. |
Armazenamento do Azure Data Lake | Contêiner | O contêiner na conta de armazenamento. Não é a tabela. |
Microsoft Fabric OneLake | Arquivo ou Pasta | Corresponde ao tipo de caminho configurado para o ponto de extremidade. |
Azure Data Explorer | Tabela | A tabela no banco de dados do Azure Data Explorer. |
Armazenamento local | Pasta | O nome da pasta ou do diretório na montagem do volume persistente do armazenamento local. |
Para configurar o destino de dados:
Ao usar a experiência de operações, o campo de destino dos dados é interpretado automaticamente com base no tipo de ponto de extremidade. Por exemplo, se o ponto de extremidade do fluxo de dados for um ponto de extremidade de armazenamento, a página de detalhes do destino solicitará que você insira o nome do contêiner. Se o ponto de extremidade do fluxo de dados for um ponto de extremidade MQTT, a página de detalhes do destino solicitará que você insira o tópico e assim por diante.
Exemplo
O exemplo a seguir é uma configuração de fluxo de dados que usa o ponto de extremidade MQTT para a origem e o destino. A fonte filtra os dados dos tópicos MQTT thermostats/+/telemetry/temperature/#
e humidifiers/+/telemetry/humidity/#
. A transformação converte a temperatura para Fahrenheit e filtra os dados onde a temperatura é menor que 100000. O destino envia os dados para o tópico MQTT factory
.
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
name: my-dataflow
namespace: azure-iot-operations
spec:
profileRef: default
mode: Enabled
operations:
- operationType: Source
sourceSettings:
endpointRef: default
dataSources:
- thermostats/+/telemetry/temperature/#
- humidifiers/+/telemetry/humidity/#
- operationType: builtInTransformation
builtInTransformationSettings:
filter:
- inputs:
- 'temperature.Value'
- '"Tag 10".Value'
expression: "$1*$2<100000"
map:
- inputs:
- '*'
output: '*'
- inputs:
- temperature.Value
output: TemperatureF
expression: cToF($1)
- inputs:
- '"Tag 10".Value'
output: 'Tag 10'
- operationType: Destination
destinationSettings:
endpointRef: default
dataDestination: factory
Verifique se um fluxo de dados está funcionando
Siga Tutorial: Ponte MQTT bidirecional para o Grade de Eventos do Azure para verificar se o fluxo de dados está funcionando.
Exportar configuração de fluxo de dados
Para exportar a configuração do fluxo de dados, você poderá usar a experiência de operações ou exportar o recurso personalizado do fluxo de dados.
Selecione o fluxo de dados que deseja exportar e selecione Exportar na barra de ferramentas.