Migrating from Microsoft.Azure.EventHubs to Azure.Messaging.EventHubs . What do i need to do to make sure new SDK deployment picks up from checkpoint made by older SDK
Migrating from Microsoft.Azure.EventHubs to Azure.Messaging.EventHubs . Read from docs online that new SDK doesnt support old checkpointing format . What do i need to do to make sure new SDK deployment picks up from checkpoint made by older SDK
Azure Event Hubs
-
Soumya Prateek Raul 40 Reputation points • Microsoft Employee
2024-06-19T08:29:21.02+00:00 What is the SDK default in case checkpoints are absent during startup
.
-
PRADEEPCHEEKATLA-MSFT 83,966 Reputation points • Microsoft Employee
2024-06-19T09:30:39.48+00:00 @Soumya Prateek Raul - Thanks for the question and using MS Q&A platform.
Can you please share the document which you are referring to?
-
Soumya Prateek Raul 40 Reputation points • Microsoft Employee
2024-06-19T09:47:36.93+00:00 The query comes from a stackoverflow post i came across - https://stackoverflow.com/questions/61370590/can-one-migrate-from-microsoft-azure-eventhubs-library-to-azure-messaging-eventh
-
PRADEEPCHEEKATLA-MSFT 83,966 Reputation points • Microsoft Employee
2024-06-20T07:38:31.9833333+00:00 @Soumya Prateek Raul - It seems like you are looking to migrate from Microsoft.Azure.EventHubs to Azure.Messaging.EventHubs and want to make sure that the new SDK deployment picks up from the checkpoint made by the older SDK.
According to the documentation, the new SDK does not support the old checkpointing format. Therefore, you will need to migrate your checkpoints to the new format.
To migrate your checkpoints, you can use the
EventProcessorClient
class in the new SDK. This class provides a method calledLoadBalancingStrategy.CheckpointStore
that you can use to create a new checkpoint store that is compatible with the new SDK.Here is an example of how you can use the
EventProcessorClient
class to migrate your checkpoints:var connectionString = "your-event-hub-connection-string"; var eventHubName = "your-event-hub-name"; var consumerGroup = "your-consumer-group-name"; var checkpointStore = new BlobCheckpointStore("your-blob-connection-string", "your-blob-container-name"); var eventProcessorClient = new EventProcessorClient( checkpointStore, consumerGroup, connectionString, eventHubName); await eventProcessorClient.StartProcessingAsync();
In this example, we are creating a new
BlobCheckpointStore
that is compatible with the new SDK. We then create a newEventProcessorClient
and pass in the new checkpoint store, consumer group, connection string, and event hub name. Finally, we start processing events using the newEventProcessorClient
.I hope this helps! Let me know if you have any further questions.
-
PRADEEPCHEEKATLA-MSFT 83,966 Reputation points • Microsoft Employee
2024-06-24T08:25:51.3466667+00:00 @Soumya Prateek Raul - We haven’t heard from you on the last response and was just checking back to see if you have a resolution yet. In case if you have any resolution please do share that same with the community as it can be helpful to others. Otherwise, will respond with more details and we will try to help.
-
Soumya Prateek Raul 40 Reputation points • Microsoft Employee
2024-06-24T13:04:19.6133333+00:00 The new eventhub SDK follows a different folder format for the checkpointing store than the old SDK.
On the absence of a checkpoint store the new SDK by default would start reading events from the oldest event present in the eventhub. There are two ways to override this behavior
a) Manually create a checkpoint store supporting the new SDK format. Pass partition offset as desired.
b) Pass a function to [ eventProcessorClient.PartitionInitializingAsync ] that would pass a DefaultStartingPosition to the eventProcessorClient.
We followed option b.
eventProcessorClient.PartitionInitializingAsync += this.InitializePartitionAsync; private Task InitializePartitionAsync(PartitionInitializingEventArgs args){ // Because of SDK upgrade , the default starting position is reset to the oldest event in the hub , as we don't want to process very old events we are setting it to [Current Time - 30 minutes ] events generated in last 30 mins // This is a one-time change and will be reverted args.DefaultStartingPosition = EventPosition.FromEnqueuedTime(DateTimeOffset.Now.AddMinutes(-30)); return Task.CompletedTask; }
-
PRADEEPCHEEKATLA-MSFT 83,966 Reputation points • Microsoft Employee
2024-06-30T14:59:45.81+00:00 @Soumya Prateek Raul - Thank you for sharing this information. It's good to know that you were able to override the default behavior of the new SDK by passing a function to
eventProcessorClient.PartitionInitializingAsync
.As you mentioned, the new SDK follows a different folder format for the checkpointing store than the old SDK. If you want to use the new SDK with the checkpoint data created by the old SDK, you need to migrate the checkpoint data to the new format. You can use the
BlobCheckpointStore
class in the new SDK to store the checkpoint data in the new format.Here's an example of how to use the
BlobCheckpointStore
class to store the checkpoint data in the new format:from azure.eventhub import EventHubConsumerClient from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore from azure.messaging.eventhubs import EventProcessorClient # Create a BlobCheckpointStore object to store the checkpoint data in the new format checkpoint_store = BlobCheckpointStore.from_connection_string( "<connection-string>", "<blob-container-name>" ) # Create an EventProcessorClient object to read from the Event Hub event_processor_client = EventProcessorClient( eventhub_connection_str="<connection-string>", consumer_group="<consumer-group-name>", eventhub_name="<event-hub-name>", checkpoint_store=checkpoint_store ) # Start reading from the Event Hub event_processor_client.start()
In this example,, you create a
BlobCheckpointStore
object to store the checkpoint data in the new format, and then create anEventProcessorClient
object to read from the Event Hub. You pass theBlobCheckpointStore
object to theEventProcessorClient
constructor as thecheckpoint_store
parameter.After creating the
EventProcessorClient
object, you call thestart
method to start reading from the Event Hub.I hope this helps! Let me know if you have any other questions.
Sign in to comment