Sending and Receiving Avro formatted events to Event Hub

CareyBoldenow 96 Reputation points
2021-01-29T15:43:53.833+00:00

I have been working with the Azure Schema Registry and am able to successfully register Avro schemas to the registry and also retrieve the schemas from the registry using the Azure Schema Registry Java SDK. However, I am unable to determine how to then actually serialize objects using these Avro schemas and then send that binary structure to Event Hub. The same holds true for the consuming side, how to consume an Avro formatted event from Event Hub and then deserialize it using the corresponding Avro Schema.

Looking at the Azure Schema Registry documentation at https://video2.skills-academy.com/en-us/azure/event-hubs/schema-registry-overview, the diagram there seems to indicate that we have to use the Kafka Producer/Consumer and all of its Avro related capability to do this, is that correct?

There are plenty of good examples on how to use the Azure Schema Registry SDKs on the Microsoft Learn, but there are no examples (that I could find) that show how to actually serialize and deserialize (using Avro) data to be sent and/or consumed from Event Hub (e.g. serialize/deserialize using EventData).

Therefore, if any one has a Java based example where they are producing and consuming events to/from Event Hub using Avro and the Azure Schema Registry, please point me in that direction.

Thanks for any help you can offer!

Azure Event Hubs
Azure Event Hubs
An Azure real-time data ingestion service.
591 questions
{count} votes

3 answers

Sort by: Most helpful
  1. Serkant Karaca 21 Reputation points Microsoft Employee
    2021-02-02T18:21:33.707+00:00
    1 person found this answer helpful.
    0 comments No comments

  2. CareyBoldenow 96 Reputation points
    2021-02-04T23:26:25.213+00:00

    So I was finally able to get all this to work using simple Java based Azure functions. In the event anyone wants to know how I got it to work, I am publishing my code below.

    This is a somewhat contrived function that gets triggered via an Http Request and then it also includes an Output Binding to Event Hub where I am simply using the same PlayingCard AVRO implementation included in @Serkant Karaca reply above and sending the serialized output of that object to Event Hub.

    public class Function {  
      
        @FunctionName("HttpExample")  
        @EventHubOutput(name = "event", eventHubName = "myEventHub", connection = "AzureEventHubConnection")  
        public byte[] run(  
                @HttpTrigger(  
                        name = "req",  
                        route = "message",  
                        methods = {HttpMethod.POST},  
                        authLevel = AuthorizationLevel.ANONYMOUS)  
                        HttpRequestMessage<Optional<String>> request,  
                final ExecutionContext context) {  
      
            TokenCredential tokenCredential = new ClientSecretCredentialBuilder()  
                    .tenantId("myTenantId")  
                    .clientId("myClientId")  
                    .clientSecret("myClientSecret")  
                    .build();  
      
            SchemaRegistryAsyncClient schemaRegistryAsyncClient = new SchemaRegistryClientBuilder()  
                    .credential(tokenCredential)  
                    .endpoint("myEventHubNamespaceEndpoint")  
                    .buildAsyncClient();  
      
            SchemaRegistryAvroSerializer schemaRegistryAvroSerializer = new SchemaRegistryAvroSerializerBuilder()  
                    .schemaRegistryAsyncClient(schemaRegistryAsyncClient)  
                    .schemaGroup("mySchemaGroup")  
                    .avroSpecificReader(true)  
                    .autoRegisterSchema(true)  
                    .buildSerializer();  
      
      
            PlayingCard playingCard = new PlayingCard();  
            playingCard.setCardValue(5);  
            playingCard.setIsFaceCard(false);  
            playingCard.setPlayingCardSuit(PlayingCardSuit.SPADES);  
      
            ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();  
      
            schemaRegistryAvroSerializer.serialize(byteOutStream, playingCard);  
            return byteOutStream.toByteArray();  
        }  
    }  
    

    The next function is triggered when events are published to the Event Hub via the above Function and it simply deserializes the binary structure being consumed from the Event Hub.

    public class Function {  
      
        @FunctionName("EventHubExample")  
        public void eventHubProcessor(  
            @EventHubTrigger(name = "messages",  
                    eventHubName = "myEventHub", connection = "AzureEventHubConnection", cardinality = Cardinality.MANY, dataType = "binary") List<byte[]> messages,  
            final ExecutionContext context )  
        {  
      
            TokenCredential tokenCredential = new ClientSecretCredentialBuilder()  
                .tenantId("myTenantId")  
                .clientId("myClientId")  
                .clientSecret("myClientSecret")  
                .build();  
      
      
            SchemaRegistryAsyncClient schemaRegistryAsyncClient = new SchemaRegistryClientBuilder()  
                    .credential(tokenCredential)  
                    .endpoint("myEventHubNamespaceEndpoint")  
                    .buildAsyncClient();  
      
            SchemaRegistryAvroSerializer schemaRegistryAvroSerializer = new SchemaRegistryAvroSerializerBuilder()  
                    .schemaRegistryAsyncClient(schemaRegistryAsyncClient)  
                    .schemaGroup("mySchemaGroup")  
                    .avroSpecificReader(true)  
                    .autoRegisterSchema(true)  
                    .buildSerializer();  
              
            ByteArrayInputStream in = new ByteArrayInputStream(messages.get(0));  
      
            PlayingCard playingCard = schemaRegistryAvroSerializer.deserialize(in, TypeReference.createInstance(PlayingCard.class));  
    
            context.getLogger().info("PlayingCard: " + playingCard);  
        }  
      
    }  
    
    1 person found this answer helpful.
    0 comments No comments

  3. CareyBoldenow 96 Reputation points
    2021-02-02T20:12:18.25+00:00

    Hello @Serkant Karaca

    Yes, the code that you have linked above provides some very good examples of how to serialize/deserialize Java objects using Avro within the context of a standalone Java application, and I an able to make that work successfully. However, what I am still trying to figure out is how to do the same when serializing events (using Avro) to EventHub and when consuming/deserializing from Event Hub.

    As you probably know, if you were doing this in Kafka, you simply need to define some properties in your producer and consumer like the following:

    properties.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
    properties.setProperty("schema.registry.url", "myurl");
    properties.setProperty("specific.avro.reader", "true");

    So if I am implementing my Event Hub producers and consumers using something like Spring Cloud Stream Binder, or if I am just using an Azure Function with Event Hub input/ bindings, how do we do we conceptually do the same as how Kafka does it with the Confluent Schema Registry?

    What I have tried and failed to get to work is something like the following using Spring Cloud Stream Binder:

    My Producer:

    @EnableBinding(Source.class)  
    @RestController  
    public class EventhubSource {  
      
        @Autowired  
        private Source source;  
      
        @PostMapping("/messages")  
        public String postMessage(@RequestBody EventPayload payload) {  
      
            TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();  
      
            SchemaRegistryAsyncClient schemaRegistryAsyncClient = new SchemaRegistryClientBuilder()  
                    .credential(tokenCredential)  
                    .endpoint("myendpoint")  
                    .buildAsyncClient();  
      
            SchemaRegistryAvroSerializer schemaRegistryAvroSerializer = new SchemaRegistryAvroSerializerBuilder()  
                    .schemaRegistryAsyncClient(schemaRegistryAsyncClient)  
                    .schemaGroup("myschemagroup")  
                    .avroSpecificReader(true)  
                    .autoRegisterSchema(true)  
                    .buildSerializer();  
      
      
            PlayingCard playingCard = new PlayingCard();  
            playingCard.setCardValue(5);  
            playingCard.setIsFaceCard(false);  
            playingCard.setPlayingCardSuit(PlayingCardSuit.SPADES);  
      
            ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();  
      
            schemaRegistryAvroSerializer.serialize(byteOutStream, playingCard);  
            source.output().send(new GenericMessage<>(byteOutStream.toByteArray()));  
      
            return payload.toString();  
      
      
        }  
    

    Essentially, I am using the code you referenced above and then at the very end writing the byte array of the serialized output stream to Event Hub. The message is going to event hub, however, when I then try and consume it, I run into various issues.

    For example, with the following:

    @EnableBinding(Sink.class)  
    public class EventhubSink {  
      
        @StreamListener(Sink.INPUT)  
        public void handleMessage(byte[] message, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {  
      
             checkpointer.success()  
                    .doOnSuccess(s -> deserializeEvent(message))  
                    .doOnError(System.out::println)  
                    .subscribe();  
        }  
      
        private void deserializeEvent(byte[] eMessage) {  
            try {  
                TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build();  
      
                SchemaRegistryAsyncClient schemaRegistryAsyncClient = new SchemaRegistryClientBuilder()  
                        .credential(tokenCredential)  
                        .endpoint("myendpoint")  
                        .buildAsyncClient();  
      
                 SchemaRegistryAvroSerializer schemaRegistryAvroSerializer = new SchemaRegistryAvroSerializerBuilder()  
                        .schemaRegistryAsyncClient(schemaRegistryAsyncClient)  
                        .schemaGroup("myschemagroup")  
                        .avroSpecificReader(true)  
                        .autoRegisterSchema(true)  
                        .buildSerializer();  
      
                 InputStream targetStream = new ByteArrayInputStream(eMessage);  
                PlayingCard playingCard = schemaRegistryAvroSerializer.deserialize(targetStream,  
                        TypeReference.createInstance(PlayingCard.class));  
      
                targetStream.close();  
            } catch(Exception e) {  
                e.printStackTrace();  
            }  
        }  
    

    I get the following error when trying to deserialize the message (line 30) coming back from Event Hub:

    java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-kqueue-1
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)
    at reactor.core.publisher.Mono.block(Mono.java:1680)
    at com.azure.data.schemaregistry.avro.SchemaRegistryAvroSerializer.deserialize(SchemaRegistryAvroSerializer.java:50)
    at com.bestbuy.scc.poc.EventhubSink.deserializeEvent(EventhubSink.java:60)
    at com.bestbuy.scc.poc.EventhubSink.lambda$handleMessage$0(EventhubSink.java:31)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:282)

    The spring cloud stream binder does have message converters, but it seems to be limited to using their schema registry client or the confluent one they offer, and not the azure schema registry client, so I am not sure if I am missing something there yet. That being said, I would be fine to try this using just the Azure SDK (no spring cloud steam binder), but again I am just looking for a jumpstart on how that would all be implemented for an Event Hub producer and consumer.