Take events from an Eventhub to another Eventhub

Miguel_0101 21 Reputation points
2021-04-14T16:07:30.307+00:00

Hi everyone,
I have a Python program that takes loads of events from an EventHub, does some complex processing and sends results to another Eventhub. We are using the async option of EventHubConsumerClient and EventHubProducerClient to create the client connection.
We have been trying two options to create the connections and we are having some issues with each of them:

First Option:
We create a Producer connection every time we receive an EventBatch from the consumer Eventhub. This option uses all of the events (at least that is out believe) but CPU shoots up and we have issues with memory leakage.

I would be something like

async def on_event_batch(partition_context, event_batch):
    producer = EventHubProducerClient.from_connection_string(...)
    async with producer:
        event_data_batch =  await producer.create_batch(partition_key=partition_context.partition_id)
        ...

Second Option:
We create a singleton Producer and use it every time to send EventBatches to the target Eventhub.
This option reduces CPU and memory leaks but we lose a considerable amount of Event. A schematic piece of the code.

I would be something like

producer = EventHubProducerClient.from_connection_string(...)
async def on_event_batch(partition_context, event_batch):
    event_data_batch =  await producer.create_batch(partition_key=partition_context.partition_id) 
   ...

Second option seems more logical for us but we cannot solve the events lost issue.
Any help is really appreciated.
Best regards

A more complete piece of the code of First Option.

import ...

with open(configFilePath) as json_data_file:
    config = json.load(json_data_file)

producer = EventHubProducerClient.from_connection_string(conn_str = config.get('EventHub').get('Producer').get('connection_str')
                                                         , eventhub_name = config.get('EventHub').get('Producer').get('eventhub_name'))
async def on_event_batch(partition_context, event_batch):

    print(partition_context.partition_id)  
    # Create a batch
    event_data_batch =  await producer.create_batch(partition_key=partition_context.partition_id) 
    for x in event_batch:
        can_event, devicetype = canonizer.main(x.body_as_str()) 
        ED_can_event = EventData(str(can_event))
        try: 
            event_data_batch.add(ED_can_event) # Add event data to the batch.
        except Exception as e: 
            print(e)
            try:
                await producer.send_batch(event_data_batch)
                event_data_batch = await producer.create_batch(partition_key=partition_context.partition_id)
                event_data_batch.add(ED_can_event)
            except Exception as e:
                print(e)
                event_data_batch = await producer.create_batch(partition_key=partition_context.partition_id)
    try:
        await producer.send_batch(event_data_batch) # Send batch of events to the event hub.   
    except Exception as e:
        print(e)
        pass

    await partition_context.update_checkpoint()


async def main(max_batch_size = 250):

    checkpoint_store = BlobCheckpointStore.from_connection_string(config.get('connection_str'),config.get('container_name'))

    consumer = EventHubConsumerClient.from_connection_string(conn_str= config.get('EventHub').get('Consumer').get('connection_str')
                                                            , consumer_group = config.get('EventHub').get('Consumer').get('consumer_group')
                                                            , eventhub_name= config.get('EventHub').get('Consumer').get('eventhub_name')
                                                            , checkpoint_store=checkpoint_store)
    async with consumer,producer:
        await consumer.receive_batch(
            on_event_batch=on_event_batch,
            max_batch_size=max_batch_size,
        )

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

A more complete piece of the code of Second Option.

import ...

with open(configFilePath) as json_data_file:
    config = json.load(json_data_file)

async def on_event_batch(partition_context, event_batch):

    producer = EventHubProducerClient.from_connection_string(conn_str = config.get('connection_str')
                                                         , eventhub_name = config.get('eventhub_name'))
    async with producer:
        # Create a batch
        event_data_batch =  await producer.create_batch(partition_key=partition_context.partition_id) 
        for x in event_batch:
            can_event, devicetype = canonizer.main(x.body_as_str()) #Primer elemento de la lista es la trama y el segundo el tipo de dispositivo emisor 
            ED_can_event = EventData(str(can_event))
            ED_can_event.properties  = x.properties
            ED_can_event.properties.update({"DeviceType": devicetype, "Canonized_At": int(time.time())})

            try: 
                event_data_batch.add(ED_can_event)

            except Exception as e: 
                print(e)
                try:
                    await producer.send_batch(event_data_batch)
                    event_data_batch = await producer.create_batch(partition_key=partition_context.partition_id)
                    event_data_batch.add(ED_can_event)
                except Exception as e:
                    print(e)
                    event_data_batch = await producer.create_batch(partition_key=partition_context.partition_id)
        try:
            await producer.send_batch(event_data_batch) # Send batch of events to the event hub.   
        except Exception as e:
            print(e)
            pass            
        await partition_context.update_checkpoint()

async def main(max_batch_size = 250):
    checkpoint_store = BlobCheckpointStore.from_connection_string(config.get('Storage').get('connection_str'),config.get('Storage').get('container_name'))    
    consumer = EventHubConsumerClient.from_connection_string(conn_str= config.get('EventHub').get('Consumer').get('connection_str')
                                                            , consumer_group = config.get('EventHub').get('Consumer').get('consumer_group')
                                                            , eventhub_name= config.get('EventHub').get('Consumer').get('eventhub_name')
                                                            , checkpoint_store=checkpoint_store)
    async with consumer:
        await consumer.receive_batch(
            on_event_batch=on_event_batch,
            max_batch_size=max_batch_size)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
Azure Event Hubs
Azure Event Hubs
An Azure real-time data ingestion service.
637 questions
{count} votes

Accepted answer
  1. Saurabh Sharma 23,816 Reputation points Microsoft Employee
    2021-04-14T22:09:20.837+00:00

    Hi @Miguel_0101 ,

    1st option is not favorable since it runs with TCP connection and SSL overhead for each batch handled. This will incur as high CPU cost and high API latency.

    2nd option is not favorable since it runs into single client bottleneck.

    I would recommend you to create a pool of producer clients like may be 10 and use them randomly. You can increase the pool size if you think you need more. This is a common practice for many TCP clients.

    Hope this helps. Please let me know if you have any other questions.

    Thanks
    Saurabh

    ----------

    Please do not forget to "Accept the answer" wherever the information provided helps you to help others in the community.


0 additional answers

Sort by: Most helpful

Your answer

Answers can be marked as Accepted Answers by the question author, which helps users to know the answer solved the author's problem.