Руководство по Обработка событий Центров событий в Apache Kafka с использованием Stream Analytics

В этой статье показано, как выполняется потоковая передача данных в Центры событий и обработка этих данных с помощью Azure Stream Analytics. Здесь подробно описаны следующие действия:

  1. Создайте пространство имен Центров событий.
  2. создание клиента Kafka, который отправляет сообщения в концентратор событий;
  3. создание задания Stream Analytics, которое копирует данные из концентратора событий в хранилище BLOB-объектов Azure.

Вам не обязательно изменять протокол клиентов или запускать собственные кластеры при использовании конечной точки Kafka, предоставленной концентратором событий. Центры событий Azure поддерживают Apache Kafka 1.0. и более поздних версий.

Предварительные требования

Ниже указаны требования для работы с этим кратким руководством.

Создание пространства имен в Центрах событий

Если вы создаете пространство имен в Центрах событий, конечная точка Kafka для этого пространства имен включается автоматически. Вы можете выполнять потоковую передачу событий из приложений, использующих протокол Kafka, в концентраторы событий. Выполните пошаговые инструкции в статье Создание концентратора событий с помощью портала Azure, чтобы создать пространство имен в Центрах событий. Если вы используете выделенный кластер, ознакомьтесь с разделом Создание пространства имен и концентратора событий в выделенном кластере.

Примечание

Центры событий для Kafka не поддерживаются на уровне Базовый.

Отправка сообщений с использованием Kafka в Центрах событий

  1. Клонируйте репозиторий Центров событий Azure для Kafka на компьютер.

  2. Перейдите в папку azure-event-hubs-for-kafka/quickstart/java/producer.

  3. Обновите сведения о конфигурации для отправителя в файле по адресу src/main/resources/producer.config. Укажите имя и строку подключения для пространства имен концентратора событий.

    bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
    
  4. Перейдите в azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/ и откройте файл TestDataReporter.java в любом редакторе по своему усмотрению.

  5. Закомментируйте следующую строку кода.

                //final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
    
  6. Добавьте следующую строку кода вместо закомментированного кода.

                final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");            
    

    Этот код отправляет данные события в формате JSON. При настройке входных данных для задания Stream Analytics необходимо указать JSON как формат входных данных.

  7. Выполните код отправителя и потоковую передачу данных в Центры событий. При использовании командной строки Node.js перед выполнением этих команд на компьютере Windows переключитесь на папку azure-event-hubs-for-kafka/quickstart/java/producer.

    mvn clean package
    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    

Проверка получения данных концентратором событий

  1. Выберите Центры событий в разделе Сущности. Убедитесь, что вы видите концентратор событий с именем test.

    Концентратор событий. Тестирование

  2. Убедитесь, что вы видите сообщения, входящие в концентратор событий.

    Концентратор событий. Сообщения

Обработка данных событий с помощью задания Stream Analytics

В этом разделе будет создано задание Azure Stream Analytics. Клиент Kafka отправит события в концентратор событий. Вы создадите задание Stream Analytics, которое принимает данные о событиях в качестве входных данных и выводит их в хранилище BLOB-объектов Azure. Если у вас нет учетной записи службы хранилища Azure, то потребуется ее создать.

Запрос в задании Stream Analytics проходит через данные без выполнения какой-либо аналитики. Можно создать запрос, преобразующий входные данные в выходные данные в другом формате или с полученными аналитическими сведениями.

Создание задания Stream Analytics

  1. Выберите + Создать ресурс на портале Azure.
  2. Выберите Analytics в меню Azure Marketplace, и затем выберите задание Stream Analytics.
  3. На странице Новое задание Stream Analytics выполните следующие действия.
    1. Введите имя для задания.

    2. Выберите свою подписку.

    3. Выберите Создать новую для группы ресурсов и введите имя. Вы также можете использовать существующую группу ресурсов.

    4. Выберите расположение для задания.

    5. Щелкните Создать, чтобы создать задание.

      Новое задание Stream Analytics

Настройка входных данных для задания

  1. В сообщении уведомления, щелкните Перейти к ресурсу, чтобы открыть страницу задания Stream Analytics.

  2. Выберите Входные данные в разделе Топология задания в меню слева.

  3. Выберите Добавить потоковый вход, а затем выберите Концентратор событий.

    Добавление концентратора событий в качестве входных данных

  4. На странице конфигурации входных данных концентратора событий выполните следующие действия.

    1. Укажите псевдоним для входных данных.

    2. Выберите свою подписку Azure.

    3. Выберите пространство имен концентратора событий созданное ранее.

    4. Выберите тестирование для концентратора событий.

    5. Щелкните Сохранить.

      Конфигурация входных данных концентратора событий

Настройка выходных данных для задания

  1. Выберите Выходные данные в меню раздела Топология задания.
  2. Выберите + Добавить на панели инструментов и выберите Хранилище BLOB-объектов
  3. На странице параметров выходных данных хранилища BLOB-объектов выполните следующие действия.
    1. Укажите псевдоним для выходных данных.

    2. Выберите свою подписку Azure.

    3. Выберите свою учетную запись службы хранилища Azure.

    4. Введите имя для контейнера, в котором хранятся выходные данные из запроса Stream Analytics.

    5. Щелкните Сохранить.

      Конфигурация выходных данных хранилища BLOB-объектов

Определение запроса

Настроив задание Stream Analytics для считывания входящего потока данных, необходимо создать преобразование, которое анализирует данные в реальном времени. Определите запрос преобразования с помощью языка запросов Stream Analytics. В этом пошаговом руководстве необходимо определить запрос, который проходит через данные без выполнения какого-либо преобразования.

  1. Выберите Запрос.

  2. В окне запроса замените [YourOutputAlias] псевдонимом выходных данных, созданным ранее.

  3. Замените [YourInputAlias] псевдонимом входных данных, созданным ранее.

  4. На панели инструментов щелкните Сохранить.

    Снимок экрана: окно запроса со значениями для входных и выходных переменных.

Выполнение задания Stream Analytics

  1. В меню слева выберите Обзор.

  2. Щелкните Запуск.

    Меню «Пуск»

  3. На странице Запуск задания выберите Запуск.

    Страница запуска задания

  4. Подождите, пока состояние задания не изменится с запуска на выполняется.

    Состояние задания. Выполняется

Тестирование сценария

  1. Запустите еще раз отправитель Kafka, чтобы отправить события в концентратор событий.

    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    
  2. Убедитесь, что вы видите выходные данные сгенерированные в хранилище BLOB-объектов Azure. Вы увидите JSON-файл в контейнере с сотней строк, которые выглядят следующим образом.

    {"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    

    Задание Azure Stream Analytics получило входные данные из концентратора событий и в этом сценарии сохранило их в хранилище BLOB-объектов Azure.

Дальнейшие действия

Из этой статьи вы узнали, как выполнять потоковую передачу данных в Центры событий без необходимости менять клиенты протоколов или запускать собственные кластеры. Сведения о Центрах событий Azure для Apache Kafka см. в этом руководстве для разработчиков.