How to move data from event hub to cosmos db

Chandrakant Pawle 81 Reputation points
2023-03-20T05:25:50.02+00:00

How to move data from event hub to cosmos db? Which services we need to use for doing this?

Azure Functions
Azure Functions
An Azure service that provides an event-driven serverless compute platform.
4,566 questions
Azure Event Hubs
Azure Event Hubs
An Azure real-time data ingestion service.
591 questions
Azure Cosmos DB
Azure Cosmos DB
An Azure NoSQL database service for app development.
1,517 questions
{count} votes

Accepted answer
  1. TP 82,656 Reputation points
    2023-03-20T05:45:42.3666667+00:00

    Hi,

    You can use Azure Functions with Azure Event Hubs trigger and Cosmos DB output binding to move data from event hub to Cosmos DB.

    Azure Event Hubs trigger for Azure Functions

    https://video2.skills-academy.com/en-us/azure/azure-functions/functions-bindings-event-hubs-trigger

    Azure Cosmos DB output binding for Azure Functions 2.x and higher

    https://video2.skills-academy.com/en-us/azure/azure-functions/functions-bindings-cosmosdb-v2-output

    Please click Accept Answer if the above was helpful.

    Thanks.

    -TP

    0 comments No comments

1 additional answer

Sort by: Most helpful
  1. Francesco Cesareo 56 Reputation points
    2024-02-07T11:22:20.55+00:00

    Hi, I need a clarification about this discussion. I have a function with these features (Event Hubs trigger and Cosmos DB output binding).
    Randomly, I obtain 408 error with the following diagnostic info

    {
        "Summary": {
            "GatewayCalls": {
                "(408, 0)": 1
            }
        },
        "name": "UpsertItemAsync",
        "id": "b3563c9d-a18f-4091-b18e-9af2c18f593a",
        "start time": "07:49:11:492",
        "duration in milliseconds": 10333.2846,
        "data": {
            "Client Configuration": {
                "Client Created Time Utc": "2024-02-04T00:02:41.7472248Z",
                "MachineId": "hashedMachineName:18544af8-8085-8109-67b5-ad29146331a3",
                "NumberOfClientsCreated": 1,
                "NumberOfActiveClients": 1,
                "ConnectionMode": "Gateway",
                "User Agent": "cosmos-netstandard-sdk/3.31.0|1|X64|Linux 5.15.138.1-4.cm2 1 SMP |.NET Core 3.1.25|N|",
                "ConnectionConfig": {
                    "gw": "(cps:50, urto:10, p:False, httpf: False)",
                    "rntbd": "(cto: 5, icto: -1, mrpc: 30, mcpe: 65535, erd: True, pr: ReuseUnicastPort)",
                    "other": "(ed:False, be:False)"
                },
                "ConsistencyConfig": "(consistency: NotSet, prgns:[], apprgn: )",
                "ProcessorCount": 2
            }
        },
        "children": [
            {
                "name": "ItemSerialize",
                "id": "51f64eaa-11d2-4b07-a88a-26e9f97c8654",
                "start time": "07:49:11:492",
                "duration in milliseconds": 0.0244
            },
            {
                "name": "Get PkValue From Stream",
                "id": "683d006c-8988-4928-bbe6-cabef1703ec2",
                "start time": "07:49:11:492",
                "duration in milliseconds": 0.0756,
                "children": [
                    {
                        "name": "Get Collection Cache",
                        "id": "552086c2-80c4-40eb-a5ab-92b6cba5513d",
                        "start time": "07:49:11:492",
                        "duration in milliseconds": 0.0019
                    }
                ]
            },
            {
                "name": "Microsoft.Azure.Cosmos.Handlers.RequestInvokerHandler",
                "id": "043938f4-8d8f-453c-b7c3-2743ae0577a8",
                "start time": "07:49:11:492",
                "duration in milliseconds": 10332.7954,
                "children": [
                    {
                        "name": "Microsoft.Azure.Cosmos.Handlers.DiagnosticsHandler",
                        "id": "8d3163e1-7aea-4e9c-b3cf-772f905d9e90",
                        "start time": "07:49:11:492",
                        "duration in milliseconds": 10332.7679,
                        "data": {
                            "System Info": {
                                "systemHistory": [
                                    {
                                        "dateUtc": "2024-02-07T07:47:47.7640332Z",
                                        "cpu": 4.851,
                                        "memory": 4899976,
                                        "threadInfo": {
                                            "isThreadStarving": "False",
                                            "threadWaitIntervalInMs": 0.0283,
                                            "availableThreads": 32758,
                                            "minThreads": 12,
                                            "maxThreads": 32767
                                        },
                                        "numberOfOpenTcpConnection": 0
                                    },
                                    {
                                        "dateUtc": "2024-02-07T07:48:07.7649049Z",
                                        "cpu": 5.36,
                                        "memory": 4900792,
                                        "threadInfo": {
                                            "isThreadStarving": "False",
                                            "threadWaitIntervalInMs": 0.0337,
                                            "availableThreads": 32758,
                                            "minThreads": 12,
                                            "maxThreads": 32767
                                        },
                                        "numberOfOpenTcpConnection": 0
                                    },
                                    {
                                        "dateUtc": "2024-02-07T07:48:17.7663096Z",
                                        "cpu": 5.303,
                                        "memory": 4900860,
                                        "threadInfo": {
                                            "isThreadStarving": "False",
                                            "threadWaitIntervalInMs": 0.0286,
                                            "availableThreads": 32758,
                                            "minThreads": 12,
                                            "maxThreads": 32767
                                        },
                                        "numberOfOpenTcpConnection": 0
                                    },
                                    {
                                        "dateUtc": "2024-02-07T07:48:37.7630816Z",
                                        "cpu": 4.439,
                                        "memory": 4901360,
                                        "threadInfo": {
                                            "isThreadStarving": "False",
                                            "threadWaitIntervalInMs": 0.0307,
                                            "availableThreads": 32758,
                                            "minThreads": 12,
                                            "maxThreads": 32767
                                        },
                                        "numberOfOpenTcpConnection": 0
                                    },
                                    {
                                        "dateUtc": "2024-02-07T07:48:47.7662562Z",
                                        "cpu": 7.053,
                                        "memory": 4901132,
                                        "threadInfo": {
                                            "isThreadStarving": "False",
                                            "threadWaitIntervalInMs": 0.0268,
                                            "availableThreads": 32758,
                                            "minThreads": 12,
                                            "maxThreads": 32767
                                        },
                                        "numberOfOpenTcpConnection": 0
                                    },
                                    {
                                        "dateUtc": "2024-02-07T07:49:07.7670392Z",
                                        "cpu": 4.826,
                                        "memory": 4906480,
                                        "threadInfo": {
                                            "isThreadStarving": "False",
                                            "threadWaitIntervalInMs": 0.0274,
                                            "availableThreads": 32758,
                                            "minThreads": 12,
                                            "maxThreads": 32767
                                        },
                                        "numberOfOpenTcpConnection": 0
                                    }
                                ]
                            }
                        },
                        "children": [
                            {
                                "name": "Microsoft.Azure.Cosmos.Handlers.RetryHandler",
                                "id": "8312b092-9405-4aec-b08d-0ca4b1f2b04b",
                                "start time": "07:49:11:492",
                                "duration in milliseconds": 10332.75,
                                "children": [
                                    {
                                        "name": "Microsoft.Azure.Cosmos.Handlers.RouterHandler",
                                        "id": "cdcc729c-59fb-4a6f-af6d-b969759fbde9",
                                        "start time": "07:49:11:492",
                                        "duration in milliseconds": 10327.4867,
                                        "children": [
                                            {
                                                "name": "Microsoft.Azure.Cosmos.Handlers.TransportHandler",
                                                "id": "c9cb2432-8794-4823-bb86-a4ba2eb6f9f4",
                                                "start time": "07:49:11:492",
                                                "duration in milliseconds": 10327.484,
                                                "children": [
                                                    {
                                                        "name": "Microsoft.Azure.Cosmos.GatewayStoreModel Transport Request",
                                                        "id": "ec47a4e5-2fb2-4203-be41-835dbce9ef52",
                                                        "start time": "07:49:11:492",
                                                        "duration in milliseconds": 10325.3752,
                                                        "data": {
                                                            "Client Side Request Stats": {
                                                                "Id": "AggregatedClientSideRequestStatistics",
                                                                "ContactedReplicas": [],
                                                                "RegionsContacted": [],
                                                                "FailedReplicas": [],
                                                                "AddressResolutionStatistics": [],
                                                                "StoreResponseStatistics": [],
                                                                "HttpResponseStats": [
                                                                    {
                                                                        "StartTimeUTC": "2024-02-07T07:49:11.4926451Z",
                                                                        "DurationInMs": 10322.848,
                                                                        "RequestUri": "https://HIDDEN/colls/events/docs",
                                                                        "ResourceType": "Document",
                                                                        "HttpMethod": "POST",
                                                                        "ActivityId": "dd1168e3-0478-421a-a9ba-be0dcc7a48e0",
                                                                        "StatusCode": "RequestTimeout",
                                                                        "ReasonPhrase": "Request timed out"
                                                                    }
                                                                ]
                                                            },
                                                            "Point Operation Statistics": {
                                                                "Id": "PointOperationStatistics",
                                                                "ActivityId": "dd1168e3-0478-421a-a9ba-be0dcc7a48e0",
                                                                "ResponseTimeUtc": "2024-02-07T07:49:21.8197984Z",
                                                                "StatusCode": 408,
                                                                "SubStatusCode": 0,
                                                                "RequestCharge": 0,
                                                                "RequestUri": "HIDDEN/colls/events",
                                                                "ErrorMessage": "Microsoft.Azure.Documents.DocumentClientException: Message: Request timed out. More info: https://aka.ms/cosmosdb-tsg-request-timeout\r\nActivityId: dd1168e3-0478-421a-a9ba-be0dcc7a48e0, Request URI: /apps/31ceca64-9f75-4dd7-b8a6-cb20525e5aea/services/1b7ac9c1-45af-4e56-9b6e-bd4b7cb8b0f9/partitions/fac3efb5-f607-454f-8bd0-496f5069adff/replicas/133473354095365873p, RequestStats: Microsoft.Azure.Cosmos.Tracing.TraceData.ClientSideRequestStatisticsTraceDatum, SDK: Windows/10.0.20348 cosmos-netstandard-sdk/3.18.0, Microsoft.Azure.Cosmos.Tracing.TraceData.ClientSideRequestStatisticsTraceDatum, Linux/10 cosmos-netstandard-sdk/3.29.1\n at Microsoft.Azure.Cosmos.GatewayStoreClient.ParseResponseAsync(HttpResponseMessage responseMessage, JsonSerializerSettings serializerSettings, DocumentServiceRequest request)\n at Microsoft.Azure.Cosmos.GatewayStoreClient.InvokeAsync(DocumentServiceRequest request, ResourceType resourceType, Uri physicalAddress, CancellationToken cancellationToken)\n at Microsoft.Azure.Cosmos.GatewayStoreModel.ProcessMessageAsync(DocumentServiceRequest request, CancellationToken cancellationToken)\n at Microsoft.Azure.Cosmos.GatewayStoreModel.ProcessMessageAsync(DocumentServiceRequest request, CancellationToken cancellationToken)\n at Microsoft.Azure.Cosmos.Handlers.TransportHandler.ProcessMessageAsync(RequestMessage request, CancellationToken cancellationToken)\n at Microsoft.Azure.Cosmos.Handlers.TransportHandler.SendAsync(RequestMessage request, CancellationToken cancellationToken)",
                                                                "RequestSessionToken": null,
                                                                "ResponseSessionToken": null,
                                                                "BELatencyInMs": null
                                                            }
                                                        }
                                                    }
                                                ]
                                            }
                                        ]
                                    }
                                ]
                            }
                        ]
                    }
                ]
            },
            {
                "name": "Get Collection Cache",
                "id": "6e1d870b-22c7-4969-a1b0-dc9214de8ee3",
                "start time": "07:49:21:825",
                "duration in milliseconds": 0.004
            }
        ]
    }
    

    I don't understand if the problem is related to CosmosDB resources or function app. Why numberOfOpenTcpConnection is 0? Do I need to upgrade Cosmos RUs or manage the retry using the clientRetryOptions of eventHub extensions in host.json?

    0 comments No comments