Считывание входных данных любого формата с помощью пользовательских десериализаторов .NET (предварительная версия)

Внимание

Пользовательский десериализатор .net для Azure Stream Analytics будет прекращен 30 сентября 2024 года. После этой даты невозможно использовать эту функцию. Перейдите к встроенному десериализатору JSON, AVRO или CSV по этой дате.

Пользовательские десериализаторы .NET позволяют заданию Azure Stream Analytics считывать данные из форматов, не входящих в три встроенных формата данных. В этой статье описывается формат сериализации и интерфейсы, определяющие пользовательские десериализаторы .NET для облака Azure Stream Analytics облачных и пограничных заданий. Также приведены примеры десериализаторов для буфера протокола и формата CSV.

Создание пользовательского десериализатора .NET

Следующие примеры кода являются интерфейсами, определяющими пользовательский десериализатор и реализующими StreamDeserializer<T>.

UserDefinedOperator — базовый класс для всех операторов настраиваемой потоковой передачи. Он инициализирует StreamingContext, который предоставляет контекст, который включает механизм публикации диагностика, для которого потребуется выполнить отладку любых проблем с десериализатором.

    public abstract class UserDefinedOperator
    {
        public abstract void Initialize(StreamingContext streamingContext);
    }

В следующем фрагменте кода показана десериализация для потоковой передачи данных.

Ошибки, которые можно пропустить, следует выдавать с помощью IStreamingDiagnostics, передаваемого методом инициализации UserDefinedOperator. Все исключения будут рассматриваться как ошибки, и десериализатор будет создан повторно. После некоторых ошибок задание перейдет в состояние сбоя.

StreamDeserializer<T> десериализует поток в объект типа T. Должны выполняться следующие условия:

  1. T является классом или структурой.
  2. Все общедоступные поля в T являются либо
    1. одним из полей [sbyte, byte, short, ushort, int, uint, long, DateTime, string, float, double], либо их эквивалентами, допускающими значение NULL.
    2. Другая структура или класс, следующие тем же правилам.
    3. Массив типа T2, который следует тем же правилам.
    4. IListT2 где T2 следует тем же правилам.
    5. Не имеет рекурсивных типов.

Параметр stream — это поток, содержащий сериализованный объект. Deserialize возвращает коллекцию экземпляров T.

    public abstract class StreamDeserializer<T> : UserDefinedOperator
    {
        public abstract IEnumerable<T> Deserialize(Stream stream);
    }

StreamingContextпредоставляет контекст, включающий механизм публикации диагностика для оператора пользователя.

    public abstract class StreamingContext
    {
        public abstract StreamingDiagnostics Diagnostics { get; }
    }

StreamingDiagnostics — это диагностика для определяемых пользователем операторов, включающая сериализатор, десериализатор и определяемые пользователем функции.

WriteError записывает сообщение об ошибке в журналы ресурсов и отправляет ошибку диагностике.

briefMessage — краткое сообщение об ошибке. Это сообщение отображается в диагностике и используется группой разработки продукта для отладки. Не включайте конфиденциальную информацию и не сохраняйте сообщение менее 200 символов

detailedMessage — подробное сообщение об ошибке, которое добавляется только в журналы ресурсов в хранилище. Это сообщение должно быть короче 2000 символов.

    public abstract class StreamingDiagnostics
    {
        public abstract void WriteError(string briefMessage, string detailedMessage);
    }

Примеры десериализатора

В этом разделе показано, как создавать пользовательские десериализаторы для Protobuf и CSV. Дополнительные примеры, например avRO format for Event Hubs Capture, посетите Azure Stream Analytics на сайте GitHub.

Формат буфера протокола (Protobuf)

Это пример использования формата буфера протокола.

Предположим следующее определение буфера протокола.

syntax = "proto3";
// protoc.exe from nuget "Google.Protobuf.Tools" is used to generate .cs file from this schema definition.
// Run below command to generate the csharp class
// protoc.exe --csharp_out=. MessageBodyProto.proto

package SimulatedTemperatureSensor;
message MessageBodyProto {
    message Ambient {
      double temperature = 1;
      int64 humidity = 2;
    }

    message Machine {
      double temperature = 1;
      double pressure = 2;
    }

    Machine machine = 1;
    Ambient ambient = 2;
    string timeCreated = 3;
}

Запуск protoc.exe из Google.Protobuf.Tools NuGet создает CS-файл с определением. Созданный файл не отображается здесь. Необходимо убедиться, что версия NuGet Protobuf, используемая в проекте Stream Analytics, соответствует версии Protobuf, которая использовалась для создания входных данных.

Следующий фрагмент кода является реализацией десериализатора при условии, что созданный файл включен в проект. Эта реализация является просто тонкой оболочкой для созданного файла.

    public class MessageBodyDeserializer : StreamDeserializer<SimulatedTemperatureSensor.MessageBodyProto>
    {
        public override IEnumerable<SimulatedTemperatureSensor.MessageBodyProto> Deserialize(Stream stream)
        {
            while (stream.Position < stream.Length)
            {
                yield return SimulatedTemperatureSensor.MessageBodyProto.Parser.ParseDelimitedFrom(stream);
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
        }
    }

CSV

Следующий фрагмент кода является простым десериализатором CSV, который также демонстрирует распространение ошибок.

using System.Collections.Generic;
using System.IO;

using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;

namespace ExampleCustomCode.Serialization
{
    public class CustomCsvDeserializer : StreamDeserializer<CustomEvent>
    {
        private StreamingDiagnostics streamingDiagnostics;

        public override IEnumerable<CustomEvent> Deserialize(Stream stream)
        {
            using (var sr = new StreamReader(stream))
            {
                string line = sr.ReadLine();
                while (line != null)
                {
                    if (line.Length > 0 && !string.IsNullOrWhiteSpace(line))
                    {
                        string[] parts = line.Split(',');
                        if (parts.Length != 3)
                        {
                            streamingDiagnostics.WriteError("Did not get expected number of columns", $"Invalid line: {line}");
                        }
                        else
                        {
                            yield return new CustomEvent()
                            {
                                Column1 = parts[0],
                                Column2 = parts[1],
                                Column3 = parts[2]
                            };
                        }
                    }

                    line = sr.ReadLine();
                }
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
            this.streamingDiagnostics = streamingContext.Diagnostics;
        }
    }

    public class CustomEvent
    {
        public string Column1 { get; set; }

        public string Column2 { get; set; }

        public string Column3 { get; set; }
    }
}

Формат сериализации для REST API

Все входные данные Stream Analytics имеют формат сериализации. Дополнительные сведения о параметрах ввода см. в документации по REST API ввода.

Следующий код JavaScript является примером формата сериализации десериализатора .NET при использовании REST API:

{    
   "properties":{    
      "type":"stream",  
      "serialization":{    
         "type":"CustomCLR",  
         "properties":{    
            "serializationDllPath":"<path to the dll inside UserCustomCode\CLR\ folder>", 
            "serializationClassName":"<Full name of the deserializer class name>" 
         }  
      }
   }  
}  

serializationClassName должен быть классом, реализующим StreamDeserializer<T>. Это описывается в следующем разделе.

Поддержка регионов

Эта функция доступна при использовании SKU "Стандартный" в следующих регионах:

  • Центрально-западная часть США
  • Северная Европа
  • Восточная часть США
  • Западная часть США
  • Восточная часть США 2
  • Западная Европа

Вы можете запросить поддержку для дополнительных регионов. Однако при использовании кластеров Stream Analytics такое ограничение не существует.

Часто задаваемые вопросы

Когда эта функция станет доступной во всех регионах Azure?

Указанная функция доступна в 6 регионах. Если вы хотите использовать эту функцию в другом регионе, отправьте запрос. Поддержка всех регионов Azure включена в план.

Можно ли получить доступ к MetadataPropertyValue из входных данных, похожих на функцию GetMetadataPropertyValue?

Эта функция не поддерживается. Если вам нужна эта возможность, вы можете проголосовать за этот запрос на UserVoice.

Можно ли поделиться реализацией десериализатора с сообществом, чтобы другие пользователи могли воспользоваться ею?

После реализации десериализатора вы можете помочь другим пользователям, предоставив ему общий доступ к сообществу. Отправьте код в репозиторий GitHub Azure Stream Analytics.

Каковы другие ограничения использования пользовательских десериализаторов в Stream Analytics?

Если входные данные являются форматом Protobuf с типом схемы, содержащей MapField тип, вы не сможете реализовать пользовательский десериализатор. Кроме того, пользовательские десериализаторы не поддерживают примеры данных или предварительный просмотр данных.

Next Steps