Usar hubs de eventos e .NET para enviar e receber mensagens de tópicos do Atlas Kafka

Este início rápido ensina como enviar e receber eventos de tópicos do Atlas Kafka . Usaremos Hubs de Eventos do Azure e a biblioteca .NET do Azure.Messaging.EventHubs.

Pré-requisitos

Se você for novo nos Hubs de Eventos, consulte Visão geral dos Hubs de Eventos antes de concluir este início rápido.

Para seguir este início rápido, você precisa de certos pré-requisitos em vigor:

  • Uma assinatura do Microsoft Azure. Para usar os serviços do Azure, incluindo hubs de eventos, você precisa de uma assinatura do Azure. Se você não tiver uma conta do Azure, poderá se inscrever para uma avaliação gratuita ou usar seus benefícios de assinante do MSDN ao criar uma conta.
  • Microsoft Visual Studio 2022. A biblioteca de clientes dos Hubs de Eventos usa novos recursos que foram introduzidos no C# 8.0. Você ainda pode usar a biblioteca com versões anteriores do C#, mas a nova sintaxe não estará disponível. Para usar a sintaxe completa, é recomendável compilar com o SDK do .NET Core 3.0 ou superior e a versão do idioma definida como latest. Se você estiver usando uma versão do Visual Studio antes do Visual Studio 2019, ele não terá as ferramentas necessárias para criar projetos C# 8.0. O Visual Studio 2022, incluindo a edição gratuita da Comunidade, pode ser baixado aqui.
  • Uma conta ativa do Microsoft Purview.
  • Um Hubs de Eventos configurado com sua conta do Microsoft Purview para enviar e receber mensagens:
    • Sua conta já pode estar configurada. Você pode marcar sua conta do Microsoft Purview no portal do Azure em Configurações, configuração kafka. Se ainda não estiver configurado, siga este guia.

Publicar mensagens no Microsoft Purview

Vamos criar um aplicativo de console do .NET Core que envia eventos para o Microsoft Purview por meio do tópico Kafka dos Hubs de Eventos, ATLAS_HOOK.

Para publicar mensagens no Microsoft Purview, você precisará de um Hubs de Eventos gerenciados ou pelo menos um Hubs de Eventos com uma configuração de gancho..

Criar um projeto do Visual Studio

Em seguida, crie um aplicativo de console .NET do C# no Visual Studio:

  1. Inicie o Visual Studio.
  2. Na janela Iniciar, selecione Criar um aplicativo de console do projeto>(.NET Framework). É necessário .NET versão 4.5.2 ou superior.
  3. No nome do projeto, insira PurviewKafkaProducer.
  4. Selecione Criar para criar o projeto.

Criar um aplicativo de console

  1. Inicie o Visual Studio 2022.
  2. Selecione Criar um novo projeto.
  3. Na caixa de diálogo Criar um novo projeto , faça as seguintes etapas: Se você não vir essa caixa de diálogo, selecione Arquivo no menu, selecione Novo e, em seguida, Projeto.
    1. Selecione C# para a linguagem de programação.
    2. Selecione Console para o tipo do aplicativo.
    3. Selecione Aplicativo de Console (.NET Core) na lista de resultados.
    4. Em seguida, selecione Avançar.

Adicionar o pacote NuGet dos Hubs de Eventos

  1. Selecione Ferramentas>NuGet Package Manager>Package Manager Console no menu.

  2. Execute o seguinte comando para instalar o pacote NuGet do Azure.Messaging.EventHubs e o pacote NuGet do Azure.Messaging.EventHubs.Producer :

    Install-Package Azure.Messaging.EventHubs
    
    Install-Package Azure.Messaging.EventHubs.Producer
    

Gravar código que envia mensagens para o hub de eventos

  1. Adicione as seguintes using instruções à parte superior do arquivo Program.cs :

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
  2. Adicione constantes à classe para a Program cadeia de conexão hubs de eventos e o nome dos Hubs de Eventos.

    private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    private const string eventHubName = "<EVENT HUB NAME>";
    
  3. Substitua o Main método pelo método a seguir async Main e adicione um async ProduceMessage para enviar mensagens por push ao Microsoft Purview. Consulte os comentários no código para obter detalhes.

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
     		/ Create an event producer client to add events in the event hub
            EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName);
    
     		await ProduceMessage(producer);
        }
    
     	static async Task ProduceMessage(EventHubProducerClient producer)
    
        {
     		// Create a batch of events 
     		using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
     		// Add events to the batch. An event is a represented by a collection of bytes and metadata. 
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>")));
    
     		// Use the producer client to send the batch of events to the event hub
     		await producerClient.SendAsync(eventBatch);
     		Console.WriteLine("A batch of 3 events has been published.");
    
     	}
    
  4. Compile o projeto. Assegure-se de que não existem erros.

  5. Execute o programa e aguarde a mensagem de confirmação.

    Observação

    Para obter o código-fonte completo com mais comentários informativos, consulte este arquivo no GitHub

Código de exemplo que cria uma tabela sql com duas colunas usando uma mensagem Create Entity JSON

	
	{
    "msgCreatedBy":"nayenama",
    "message":{
        "type":"ENTITY_CREATE_V2",
        "user":"admin",
        "entities":{
            "entities":[
                {
                    "typeName":"azure_sql_table",
                    "attributes":{
                        "owner":"admin",
                        "temporary":false,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
                        "name":"SalesOrderTable",
                        "description":"Sales Order Table added via Kafka"
                    },
                    "relationshipAttributes":{
                        "columns":[
                            {
                                "guid":"-1102395743156037",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
                                }
                            },
                            {
                                "guid":"-1102395743156038",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
                                }
                            }
                        ]
                    },
                    "guid":"-1102395743156036",
                    "version":0
                }
            ],
            "referredEntities":{
                "-1102395743156037":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
                        "precision":23,
                        "length":8,
                        "description":"Sales Order ID",
                        "scale":3,
                        "name":"OrderID",
                        "data_type":"int"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156037",
                    "version":2
                },
                "-1102395743156038":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
                        "description":"Sales Order Date",
                        "scale":3,
                        "name":"OrderDate",
                        "data_type":"datetime"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156038",
                    "status":"ACTIVE",
                    "createdBy":"ServiceAdmin",
                    "version":0
                }
            }
        }
    },
    "version":{
        "version":"1.0.0"
    },
    "msgCompressionKind":"NONE",
    "msgSplitIdx":1,
    "msgSplitCount":1
}


Receber mensagens do Microsoft Purview

Em seguida, saiba como escrever um aplicativo de console do .NET Core que recebe mensagens de hubs de eventos usando um processador de eventos. O processador de eventos gerencia pontos de verificação persistentes e recepções paralelas de hubs de eventos. Isso simplifica o processo de recebimento de eventos. Você precisa usar o hub de eventos ATLAS_ENTITIES para receber mensagens do Microsoft Purview.

Para receber mensagens do Microsoft Purview, você precisará de um Hubs de Eventos gerenciados ou de uma configuração de notificação dos Hubs de Eventos..

Aviso

O SDK dos Hubs de Eventos usa a versão mais recente da API de Armazenamento disponível. Essa versão pode não estar necessariamente disponível em sua plataforma do Stack Hub. Se você executar esse código no Azure Stack Hub, sofrerá erros de runtime, a menos que você direcione a versão específica que está usando. Se você estiver usando Armazenamento de Blobs do Azure como um repositório de ponto de verificação, examine a versão com suporte da API de Armazenamento do Azure para o build do Azure Stack Hub e, em seu código, direcione essa versão.

A versão mais alta disponível do serviço de Armazenamento é a versão 2019-02-02. Por padrão, a biblioteca de clientes SDK dos Hubs de Eventos usa a versão mais alta disponível no Azure (2019-07-07 no momento da versão do SDK). Se você estiver usando o Azure Stack Hub versão 2005, além de seguir as etapas desta seção, também precisará adicionar um código que tenha como destino a API do serviço de armazenamento versão 2019-02-02. Para saber como direcionar uma versão específica da API de Armazenamento, consulte este exemplo no GitHub.

Criar um Armazenamento do Azure e um contêiner de blob

Usaremos o Armazenamento do Azure como o repositório de ponto de verificação. Use as etapas a seguir para criar uma conta de Armazenamento do Azure.

  1. Configurar uma conta de armazenamento do Azure

  2. Criar um contêiner de blob

  3. Obter a cadeia de conexão para a conta de armazenamento

    Anote a cadeia de conexão e o nome do contêiner. Você os usará no código de recebimento.

Criar um projeto do Visual Studio para o receptor

  1. Na janela Gerenciador de Soluções, selecione e segure (ou clique com o botão direito do mouse) na solução EventHubQuickStart, aponte para Adicionar e selecione Novo Projeto.
  2. Selecione Aplicativo de Console (.NET Core)e selecione Avançar.
  3. Insira PurviewKafkaConsumer para o nome do projeto e selecione Criar.

Adicionar o pacote NuGet dos Hubs de Eventos

  1. Selecione Ferramentas>NuGet Package Manager>Package Manager Console no menu.

  2. Execute o seguinte comando para instalar o pacote NuGet do Azure.Messaging.EventHubs :

    Install-Package Azure.Messaging.EventHubs
    
  3. Execute o seguinte comando para instalar o pacote NuGet do Azure.Messaging.EventHubs.Processor :

    Install-Package Azure.Messaging.EventHubs.Processor
    

Atualizar o método Main

  1. Adicione as instruções a seguir using na parte superior do arquivo Program.cs .

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
  2. Adicione constantes à classe para a Program cadeia de conexão Hubs de Eventos e o nome do hub de eventos. Substitua espaços reservados em colchetes pelos valores reais que você obteve quando criou o hub de eventos e a conta de armazenamento (chaves de acesso – cadeia de conexão primária). Verifique se a é a {Event Hubs namespace connection string} cadeia de conexão no nível do namespace e não a cadeia de caracteres do hub de eventos.

        private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
        private const string eventHubName = "<EVENT HUB NAME>";
        private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
        private const string blobContainerName = "<BLOB CONTAINER NAME>";
    

    Use ATLAS_ENTITIES como o nome do hub de eventos ao enviar mensagens para o Microsoft Purview.

  3. Substitua o Main método pelo método a seguir async Main . Consulte os comentários no código para obter detalhes.

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
            // Create a blob container client that the event processor will use 
            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
            // Create an event processor client to process events in the event hub
            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
    
            // Register handlers for processing events and handling errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;
    
            // Start the processing
            await processor.StartProcessingAsync();
    
            // Wait for 10 seconds for the events to be processed
            await Task.Delay(TimeSpan.FromSeconds(10));
    
            // Stop the processing
            await processor.StopProcessingAsync();
        }    
    
  4. Agora, adicione os seguintes métodos de manipulador de eventos e erros à classe.

        static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            // Write the body of the event to the console window
            Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
    
            // Update checkpoint in the blob storage so that the app receives only new events the next time it's run
            await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            // Write details about the error to the console window
            Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }    
    
  5. Compile o projeto. Assegure-se de que não existem erros.

    Observação

    Para obter o código-fonte completo com mais comentários informativos, consulte este arquivo no GitHub.

  6. Execute o aplicativo receptor.

Um exemplo de uma mensagem recebida do Microsoft Purview

{
	"version":
		{"version":"1.0.0",
		 "versionParts":[1]
		},
		 "msgCompressionKind":"NONE",
		 "msgSplitIdx":1,
		 "msgSplitCount":1,
		 "msgSourceIP":"10.244.155.5",
		 "msgCreatedBy":
		 "",
		 "msgCreationTime":1618588940869,
		 "message":{
			"type":"ENTITY_NOTIFICATION_V2",
			"entity":{
				"typeName":"azure_sql_table",
					"attributes":{
						"owner":"admin",
						"createTime":0,
						"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
						"name":"SalesOrderTable",
						"description":"Sales Order Table"
						},
						"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
						"status":"ACTIVE",
						"displayText":"SalesOrderTable"
					},
					"operationType":"ENTITY_UPDATE",
					"eventTime":1618588940567
				}
}

Próximas etapas

Confira mais exemplos no GitHub.