How to deserialize an IDictionary Object load into an Azure EventHub

BEPV 0 Reputation points
2024-04-14T22:22:19.27+00:00

I am trying to deserialize data that is coming from Azure Event Hub using Kafka integration, the data is loaded into the EventHub using .net app and the message is an IDictionary object as shown below, after reading from the EventHub the data that is loaded into the dataframe is a binary value and I tried to deserialice in python using an explicit cast to string and also using hex function but I am still not able to deserialize the data correctly, could you please guide me on how to deserialize it.

### Read message using Kafka integration

sasl_config = f'org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="{client_id}" clientSecret="{client_secret}" scope="https://{event_hubs_server}/.default" ssl.protocol="SSL";'
kafka_options = {
    # Port 9093 is the EventHubs Kafka port
    "kafka.bootstrap.servers": f"{event_hubs_server}:9093",
    "kafka.sasl.jaas.config": sasl_config,
    "kafka.sasl.oauthbearer.token.endpoint.url": f"https://login.microsoft.com/{tenant_id}/oauth2/v2.0/token",
    "subscribe": event_hubs_topic,
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.mechanism": "OAUTHBEARER",
    "startingOffsets":"earliest",
    "kafka.sasl.login.callback.handler.class": "org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler",
    "maxOffsetsPerTrigger": 1000
}
df = spark.readStream.format("kafka").options(**kafka_options).load()




### Save message into EventHub

Dictionary<string, object> metadata = new Dictionary<string, object>
{
    ["Id"] = Id,
    ["Document"] = Document,   
    ["IsTest"] = false
};

Dictionary<byte[], IDictionary<string, object>> messages = new Dictionary<byte[], IDictionary<string, object>>() { { guid.ToByteArray(), metadata } };
await EventHubProducer.SendBatchMessageAsync(messages, Id).ConfigureAwait(false);


Azure Event Hubs
Azure Event Hubs
An Azure real-time data ingestion service.
591 questions
Azure Synapse Analytics
Azure Synapse Analytics
An Azure analytics service that brings together data integration, enterprise data warehousing, and big data analytics. Previously known as Azure SQL Data Warehouse.
4,621 questions
{count} votes