Usar hubs de eventos e .NET para enviar e receber mensagens de tópicos do Atlas Kafka
Este início rápido ensina como enviar e receber eventos de tópicos do Atlas Kafka . Usaremos Hubs de Eventos do Azure e a biblioteca .NET do Azure.Messaging.EventHubs.
Pré-requisitos
Se você for novo nos Hubs de Eventos, consulte Visão geral dos Hubs de Eventos antes de concluir este início rápido.
Para seguir este início rápido, você precisa de certos pré-requisitos em vigor:
- Uma assinatura do Microsoft Azure. Para usar os serviços do Azure, incluindo hubs de eventos, você precisa de uma assinatura do Azure. Se você não tiver uma conta do Azure, poderá se inscrever para uma avaliação gratuita ou usar seus benefícios de assinante do MSDN ao criar uma conta.
-
Microsoft Visual Studio 2022. A biblioteca de clientes dos Hubs de Eventos usa novos recursos que foram introduzidos no C# 8.0. Você ainda pode usar a biblioteca com versões anteriores do C#, mas a nova sintaxe não estará disponível. Para usar a sintaxe completa, é recomendável compilar com o SDK do .NET Core 3.0 ou superior e a versão do idioma definida como
latest
. Se você estiver usando uma versão do Visual Studio antes do Visual Studio 2019, ele não terá as ferramentas necessárias para criar projetos C# 8.0. O Visual Studio 2022, incluindo a edição gratuita da Comunidade, pode ser baixado aqui. - Uma conta ativa do Microsoft Purview.
-
Um Hubs de Eventos configurado com sua conta do Microsoft Purview para enviar e receber mensagens:
- Sua conta já pode estar configurada. Você pode marcar sua conta do Microsoft Purview no portal do Azure em Configurações, configuração kafka. Se ainda não estiver configurado, siga este guia.
Publicar mensagens no Microsoft Purview
Vamos criar um aplicativo de console do .NET Core que envia eventos para o Microsoft Purview por meio do tópico Kafka dos Hubs de Eventos, ATLAS_HOOK.
Para publicar mensagens no Microsoft Purview, você precisará de um Hubs de Eventos gerenciados ou pelo menos um Hubs de Eventos com uma configuração de gancho..
Criar um projeto do Visual Studio
Em seguida, crie um aplicativo de console .NET do C# no Visual Studio:
- Inicie o Visual Studio.
- Na janela Iniciar, selecione Criar um aplicativo de console do projeto>(.NET Framework). É necessário .NET versão 4.5.2 ou superior.
- No nome do projeto, insira PurviewKafkaProducer.
- Selecione Criar para criar o projeto.
Criar um aplicativo de console
- Inicie o Visual Studio 2022.
- Selecione Criar um novo projeto.
- Na caixa de diálogo Criar um novo projeto , faça as seguintes etapas: Se você não vir essa caixa de diálogo, selecione Arquivo no menu, selecione Novo e, em seguida, Projeto.
- Selecione C# para a linguagem de programação.
- Selecione Console para o tipo do aplicativo.
- Selecione Aplicativo de Console (.NET Core) na lista de resultados.
- Em seguida, selecione Avançar.
Adicionar o pacote NuGet dos Hubs de Eventos
Selecione Ferramentas>NuGet Package Manager>Package Manager Console no menu.
Execute o seguinte comando para instalar o pacote NuGet do Azure.Messaging.EventHubs e o pacote NuGet do Azure.Messaging.EventHubs.Producer :
Install-Package Azure.Messaging.EventHubs
Install-Package Azure.Messaging.EventHubs.Producer
Gravar código que envia mensagens para o hub de eventos
Adicione as seguintes
using
instruções à parte superior do arquivo Program.cs :using System; using System.Text; using System.Threading.Tasks; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer;
Adicione constantes à classe para a
Program
cadeia de conexão hubs de eventos e o nome dos Hubs de Eventos.private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>"; private const string eventHubName = "<EVENT HUB NAME>";
Substitua o
Main
método pelo método a seguirasync Main
e adicione umasync ProduceMessage
para enviar mensagens por push ao Microsoft Purview. Consulte os comentários no código para obter detalhes.static async Task Main() { // Read from the default consumer group: $Default string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; / Create an event producer client to add events in the event hub EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName); await ProduceMessage(producer); } static async Task ProduceMessage(EventHubProducerClient producer) { // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add events to the batch. An event is a represented by a collection of bytes and metadata. eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>"))); eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>"))); eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>"))); // Use the producer client to send the batch of events to the event hub await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 3 events has been published."); }
Compile o projeto. Assegure-se de que não existem erros.
Execute o programa e aguarde a mensagem de confirmação.
Observação
Para obter o código-fonte completo com mais comentários informativos, consulte este arquivo no GitHub
Código de exemplo que cria uma tabela sql com duas colunas usando uma mensagem Create Entity JSON
{
"msgCreatedBy":"nayenama",
"message":{
"type":"ENTITY_CREATE_V2",
"user":"admin",
"entities":{
"entities":[
{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"temporary":false,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table added via Kafka"
},
"relationshipAttributes":{
"columns":[
{
"guid":"-1102395743156037",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
}
},
{
"guid":"-1102395743156038",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
}
}
]
},
"guid":"-1102395743156036",
"version":0
}
],
"referredEntities":{
"-1102395743156037":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
"precision":23,
"length":8,
"description":"Sales Order ID",
"scale":3,
"name":"OrderID",
"data_type":"int"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156037",
"version":2
},
"-1102395743156038":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
"description":"Sales Order Date",
"scale":3,
"name":"OrderDate",
"data_type":"datetime"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156038",
"status":"ACTIVE",
"createdBy":"ServiceAdmin",
"version":0
}
}
}
},
"version":{
"version":"1.0.0"
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1
}
Receber mensagens do Microsoft Purview
Em seguida, saiba como escrever um aplicativo de console do .NET Core que recebe mensagens de hubs de eventos usando um processador de eventos. O processador de eventos gerencia pontos de verificação persistentes e recepções paralelas de hubs de eventos. Isso simplifica o processo de recebimento de eventos. Você precisa usar o hub de eventos ATLAS_ENTITIES para receber mensagens do Microsoft Purview.
Para receber mensagens do Microsoft Purview, você precisará de um Hubs de Eventos gerenciados ou de uma configuração de notificação dos Hubs de Eventos..
Aviso
O SDK dos Hubs de Eventos usa a versão mais recente da API de Armazenamento disponível. Essa versão pode não estar necessariamente disponível em sua plataforma do Stack Hub. Se você executar esse código no Azure Stack Hub, sofrerá erros de runtime, a menos que você direcione a versão específica que está usando. Se você estiver usando Armazenamento de Blobs do Azure como um repositório de ponto de verificação, examine a versão com suporte da API de Armazenamento do Azure para o build do Azure Stack Hub e, em seu código, direcione essa versão.
A versão mais alta disponível do serviço de Armazenamento é a versão 2019-02-02. Por padrão, a biblioteca de clientes SDK dos Hubs de Eventos usa a versão mais alta disponível no Azure (2019-07-07 no momento da versão do SDK). Se você estiver usando o Azure Stack Hub versão 2005, além de seguir as etapas desta seção, também precisará adicionar um código que tenha como destino a API do serviço de armazenamento versão 2019-02-02. Para saber como direcionar uma versão específica da API de Armazenamento, consulte este exemplo no GitHub.
Criar um Armazenamento do Azure e um contêiner de blob
Usaremos o Armazenamento do Azure como o repositório de ponto de verificação. Use as etapas a seguir para criar uma conta de Armazenamento do Azure.
Obter a cadeia de conexão para a conta de armazenamento
Anote a cadeia de conexão e o nome do contêiner. Você os usará no código de recebimento.
Criar um projeto do Visual Studio para o receptor
- Na janela Gerenciador de Soluções, selecione e segure (ou clique com o botão direito do mouse) na solução EventHubQuickStart, aponte para Adicionar e selecione Novo Projeto.
- Selecione Aplicativo de Console (.NET Core)e selecione Avançar.
- Insira PurviewKafkaConsumer para o nome do projeto e selecione Criar.
Adicionar o pacote NuGet dos Hubs de Eventos
Selecione Ferramentas>NuGet Package Manager>Package Manager Console no menu.
Execute o seguinte comando para instalar o pacote NuGet do Azure.Messaging.EventHubs :
Install-Package Azure.Messaging.EventHubs
Execute o seguinte comando para instalar o pacote NuGet do Azure.Messaging.EventHubs.Processor :
Install-Package Azure.Messaging.EventHubs.Processor
Atualizar o método Main
Adicione as instruções a seguir
using
na parte superior do arquivo Program.cs .using System; using System.Text; using System.Threading.Tasks; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor;
Adicione constantes à classe para a
Program
cadeia de conexão Hubs de Eventos e o nome do hub de eventos. Substitua espaços reservados em colchetes pelos valores reais que você obteve quando criou o hub de eventos e a conta de armazenamento (chaves de acesso – cadeia de conexão primária). Verifique se a é a{Event Hubs namespace connection string}
cadeia de conexão no nível do namespace e não a cadeia de caracteres do hub de eventos.private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>"; private const string eventHubName = "<EVENT HUB NAME>"; private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>"; private const string blobContainerName = "<BLOB CONTAINER NAME>";
Use ATLAS_ENTITIES como o nome do hub de eventos ao enviar mensagens para o Microsoft Purview.
Substitua o
Main
método pelo método a seguirasync Main
. Consulte os comentários no código para obter detalhes.static async Task Main() { // Read from the default consumer group: $Default string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 10 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(10)); // Stop the processing await processor.StopProcessingAsync(); }
Agora, adicione os seguintes métodos de manipulador de eventos e erros à classe.
static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Write the body of the event to the console window Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray())); // Update checkpoint in the blob storage so that the app receives only new events the next time it's run await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
Compile o projeto. Assegure-se de que não existem erros.
Observação
Para obter o código-fonte completo com mais comentários informativos, consulte este arquivo no GitHub.
Execute o aplicativo receptor.
Um exemplo de uma mensagem recebida do Microsoft Purview
{
"version":
{"version":"1.0.0",
"versionParts":[1]
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1,
"msgSourceIP":"10.244.155.5",
"msgCreatedBy":
"",
"msgCreationTime":1618588940869,
"message":{
"type":"ENTITY_NOTIFICATION_V2",
"entity":{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"createTime":0,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table"
},
"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
"status":"ACTIVE",
"displayText":"SalesOrderTable"
},
"operationType":"ENTITY_UPDATE",
"eventTime":1618588940567
}
}
Próximas etapas
Confira mais exemplos no GitHub.