Ler entrada em qualquer formato usando desserializadores personalizados do .NET (Visualização)

Importante

O desserializador .net personalizado para o Azure Stream Analytics será desativado em 30 de setembro de 2024. Após essa data, não será possível usar o recurso. Faça a transição para um desserializador interno JSON, AVRO ou CSV até essa data.

Os desserializadores personalizados do .NET permitem que seu trabalho do Azure Stream Analytics leia dados de formatos fora dos três formatos de dados internos. Este artigo explica o formato de serialização e as interfaces que definem os desserializadores personalizados do .NET para trabalhos de nuvem e borda do Azure Stream Analytics. Há também exemplos de desserializadores para o buffer de protocolo e o formato CSV.

Desserializador personalizado do .NET

Os exemplos de código a seguir são as interfaces que definem o desserializador personalizado e implementam StreamDeserializer<T>o .

UserDefinedOperator é a classe base para todos os operadores de streaming personalizados. Ele inicializa StreamingContexto , que fornece contexto, que inclui mecanismo para publicar diagnósticos para os quais você precisará depurar quaisquer problemas com seu desserializador.

    public abstract class UserDefinedOperator
    {
        public abstract void Initialize(StreamingContext streamingContext);
    }

O trecho de código a seguir é a desserialização para streaming de dados.

Os erros puláveis devem ser emitidos usando IStreamingDiagnostics o método Initialize passado pelo UserDefinedOperator. Todas as exceções serão tratadas como erros e o desserializador será recriado. Após alguns erros, o trabalho irá para um status de falha.

StreamDeserializer<T> Desserializa um fluxo em objeto do tipo T. Devem ser preenchidas as seguintes condições:

  1. T é uma classe ou um struct.
  2. Todos os campos públicos em T são
    1. Um de [sbyte, byte, short, ushort, int, uint, long, DateTime, string, float, double] ou seus equivalentes anuláveis.
    2. Outra struct ou classe seguindo as mesmas regras.
    3. Matriz do tipo T2 que segue as mesmas regras.
    4. IListT2 onde T2 segue as mesmas regras.
    5. Não tem nenhum tipo recursivo.

O parâmetro stream é o fluxo que contém o objeto serializado. Deserialize Retorna uma coleção de T instâncias.

    public abstract class StreamDeserializer<T> : UserDefinedOperator
    {
        public abstract IEnumerable<T> Deserialize(Stream stream);
    }

StreamingContext fornece contexto, que inclui mecanismo para publicar diagnósticos para o operador do usuário.

    public abstract class StreamingContext
    {
        public abstract StreamingDiagnostics Diagnostics { get; }
    }

StreamingDiagnostics é o diagnóstico para operadores definidos pelo usuário, incluindo serializador, desserializador e funções definidas pelo usuário.

WriteError Grava uma mensagem de erro nos logs de recursos e envia o erro para o diagnóstico.

briefMessage é uma breve mensagem de erro. Essa mensagem aparece no diagnóstico e é usada pela equipe do produto para fins de depuração. Não inclua informações confidenciais e mantenha a mensagem com menos de 200 caracteres

detailedMessage é uma mensagem de erro detalhada que só é adicionada aos seus logs de recursos no armazenamento. Esta mensagem deve ter menos de 2000 caracteres.

    public abstract class StreamingDiagnostics
    {
        public abstract void WriteError(string briefMessage, string detailedMessage);
    }

Exemplos de desserializador

Esta seção mostra como escrever desserializadores personalizados para Protobuf e CSV. Para obter mais exemplos, como o formato AVRO para Captura de Hubs de Eventos, visite o Azure Stream Analytics no GitHub.

Formato de buffer de protocolo (Protobuf)

Este é um exemplo usando o formato de buffer de protocolo.

Suponha a seguinte definição de buffer de protocolo.

syntax = "proto3";
// protoc.exe from nuget "Google.Protobuf.Tools" is used to generate .cs file from this schema definition.
// Run below command to generate the csharp class
// protoc.exe --csharp_out=. MessageBodyProto.proto

package SimulatedTemperatureSensor;
message MessageBodyProto {
    message Ambient {
      double temperature = 1;
      int64 humidity = 2;
    }

    message Machine {
      double temperature = 1;
      double pressure = 2;
    }

    Machine machine = 1;
    Ambient ambient = 2;
    string timeCreated = 3;
}

Em execução protoc.exe a partir do Google.Protobuf.Tools NuGet gera um arquivo .cs com a definição. O arquivo gerado não é mostrado aqui. Você deve garantir que a versão do Protobuf NuGet que você usa em seu projeto do Stream Analytics corresponda à versão do Protobuf que foi usada para gerar a entrada.

O trecho de código a seguir é a implementação do desserializador assumindo que o arquivo gerado está incluído no projeto. Esta implementação é apenas um wrapper fino sobre o arquivo gerado.

    public class MessageBodyDeserializer : StreamDeserializer<SimulatedTemperatureSensor.MessageBodyProto>
    {
        public override IEnumerable<SimulatedTemperatureSensor.MessageBodyProto> Deserialize(Stream stream)
        {
            while (stream.Position < stream.Length)
            {
                yield return SimulatedTemperatureSensor.MessageBodyProto.Parser.ParseDelimitedFrom(stream);
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
        }
    }

CSV

O trecho de código a seguir é um desserializador CSV simples que também demonstra erros de propagação.

using System.Collections.Generic;
using System.IO;

using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;

namespace ExampleCustomCode.Serialization
{
    public class CustomCsvDeserializer : StreamDeserializer<CustomEvent>
    {
        private StreamingDiagnostics streamingDiagnostics;

        public override IEnumerable<CustomEvent> Deserialize(Stream stream)
        {
            using (var sr = new StreamReader(stream))
            {
                string line = sr.ReadLine();
                while (line != null)
                {
                    if (line.Length > 0 && !string.IsNullOrWhiteSpace(line))
                    {
                        string[] parts = line.Split(',');
                        if (parts.Length != 3)
                        {
                            streamingDiagnostics.WriteError("Did not get expected number of columns", $"Invalid line: {line}");
                        }
                        else
                        {
                            yield return new CustomEvent()
                            {
                                Column1 = parts[0],
                                Column2 = parts[1],
                                Column3 = parts[2]
                            };
                        }
                    }

                    line = sr.ReadLine();
                }
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
            this.streamingDiagnostics = streamingContext.Diagnostics;
        }
    }

    public class CustomEvent
    {
        public string Column1 { get; set; }

        public string Column2 { get; set; }

        public string Column3 { get; set; }
    }
}

Formato de serialização para APIs REST

Cada entrada do Stream Analytics tem um formato de serialização. Para obter mais informações sobre opções de entrada, consulte a documentação da API REST de entrada.

O código JavaScript a seguir é um exemplo do formato de serialização do desserializador .NET ao usar a API REST:

{    
   "properties":{    
      "type":"stream",  
      "serialization":{    
         "type":"CustomCLR",  
         "properties":{    
            "serializationDllPath":"<path to the dll inside UserCustomCode\CLR\ folder>", 
            "serializationClassName":"<Full name of the deserializer class name>" 
         }  
      }
   }  
}  

serializationClassName deve ser uma classe que implementa StreamDeserializer<T>. Isso é descrito na seção a seguir.

Suporte de região

Esse recurso está disponível nas seguintes regiões ao usar o SKU padrão:

  • E.U.A. Centro-Oeste
  • Europa do Norte
  • E.U.A. Leste
  • E.U.A. Oeste
  • E.U.A. Leste 2
  • Europa Ocidental

Você pode solicitar suporte para mais regiões. No entanto, não há essa restrição de região ao usar clusters do Stream Analytics.

Perguntas mais frequentes

Quando esse recurso estará disponível em todas as regiões do Azure?

Esta funcionalidade está disponível em 6 regiões. Se você estiver interessado em usar essa funcionalidade em outra região, envie uma solicitação. O suporte para todas as regiões do Azure está no roteiro.

Posso acessar MetadataPropertyValue a partir de minhas entradas semelhantes à função GetMetadataPropertyValue?

Esta funcionalidade não é suportada. Se você precisar desse recurso, você pode votar para esta solicitação no UserVoice.

Posso compartilhar minha implementação do desserializador com a comunidade para que outros possam se beneficiar?

Depois de implementar seu desserializador, você pode ajudar outras pessoas compartilhando-o com a comunidade. Envie seu código para o repositório GitHub do Azure Stream Analytics.

Quais são as outras limitações do uso de desserializadores personalizados no Stream Analytics?

Se sua entrada for do formato Protobuf com um esquema contendo MapField tipo, você não poderá implementar um desserializador personalizado. Além disso, os desserializadores personalizados não suportam dados de exemplo ou dados de visualização.

Passos Seguintes