Skicka händelser till eller ta emot händelser från händelsehubbar med hjälp av Python
Den här snabbstarten visar hur du skickar händelser till och tar emot händelser från en händelsehubb med azure-eventhub Python-paketet.
Förutsättningar
Om du inte har använt Azure Event Hubs tidigare kan du läsa Översikt över Event Hubs innan du gör den här snabbstarten.
För att slutföra den här snabbstarten, behöver du följande förhandskrav:
- Microsoft Azure-prenumeration. Om du vill använda Azure-tjänster, inklusive Azure Event Hubs, behöver du en prenumeration. Om du inte har ett befintligt Azure-konto registrerar du dig för en kostnadsfri utvärderingsversion.
- Python 3.8 eller senare, med pip installerat och uppdaterat.
- Visual Studio Code (rekommenderas) eller någon annan integrerad utvecklingsmiljö (IDE).
- Skapa ett Event Hubs-namnområde och en händelsehubb. Det första steget är att använda Azure Portal för att skapa ett Event Hubs-namnområde och hämta de hanteringsuppgifter som programmet behöver för att kommunicera med händelsehubben. Om du behöver skapa ett namnområde och en händelsehubb följer du anvisningarna i den här artikeln.
Installera paketen för att skicka händelser
Om du vill installera Python-paketen för Event Hubs öppnar du en kommandotolk som har Python i sin sökväg. Ändra katalogen till den mapp där du vill behålla dina exempel.
pip install azure-eventhub
pip install azure-identity
pip install aiohttp
Autentisera appen till Azure
Den här snabbstarten visar två sätt att ansluta till Azure Event Hubs: lösenordslösa och anslutningssträng. Det första alternativet visar hur du använder ditt säkerhetsobjekt i Microsoft Entra-ID och rollbaserad åtkomstkontroll (RBAC) för att ansluta till ett Event Hubs-namnområde. Du behöver inte bekymra dig om att ha hårdkodade anslutningssträng i koden eller i en konfigurationsfil eller i en säker lagring som Azure Key Vault. Det andra alternativet visar hur du använder en anslutningssträng för att ansluta till ett Event Hubs-namnområde. Om du inte har använt Azure tidigare kan det vara lättare att följa anslutningssträng alternativet. Vi rekommenderar att du använder det lösenordslösa alternativet i verkliga program och produktionsmiljöer. Mer information finns i Autentisering och auktorisering. Du kan också läsa mer om lösenordslös autentisering på översiktssidan.
Tilldela roller till din Microsoft Entra-användare
När du utvecklar lokalt kontrollerar du att användarkontot som ansluter till Azure Event Hubs har rätt behörigheter. Du behöver rollen Azure Event Hubs-dataägare för att kunna skicka och ta emot meddelanden. Om du vill tilldela dig själv den här rollen behöver du rollen Administratör för användaråtkomst eller en annan roll som innehåller åtgärden Microsoft.Authorization/roleAssignments/write
. Du kan tilldela Azure RBAC-roller till en användare med hjälp av Azure Portal, Azure CLI eller Azure PowerShell. Läs mer om tillgängliga omfång för rolltilldelningar på översiktssidan för omfång .
I följande exempel tilldelas Azure Event Hubs Data Owner
rollen till ditt användarkonto, vilket ger fullständig åtkomst till Azure Event Hubs-resurser. I ett verkligt scenario följer du principen om lägsta behörighet för att ge användarna endast de minsta behörigheter som krävs för en säkrare produktionsmiljö.
Inbyggda Azure-roller för Azure Event Hubs
För Azure Event Hubs skyddas redan hanteringen av namnområden och alla relaterade resurser via Azure Portal och Azure-resurshanterings-API:et med hjälp av Azure RBAC-modellen. Azure tillhandahåller de inbyggda Azure-rollerna nedan för att auktorisera åtkomst till ett Event Hubs-namnområde:
- Azure Event Hubs-dataägare: Ger dataåtkomst till Event Hubs-namnområdet och dess entiteter (köer, ämnen, prenumerationer och filter)
- Azure Event Hubs Data Sender: Använd den här rollen för att ge avsändaren åtkomst till Event Hubs-namnområdet och dess entiteter.
- Azure Event Hubs-datamottagare: Använd den här rollen för att ge mottagaren åtkomst till Event Hubs-namnområdet och dess entiteter.
Om du vill skapa en anpassad roll läser du Rättigheter som krävs för Event Hubs-åtgärder.
Viktigt!
I de flesta fall tar det en minut eller två innan rolltilldelningen sprids i Azure. I sällsynta fall kan det ta upp till åtta minuter. Om du får autentiseringsfel när du först kör koden väntar du en stund och försöker igen.
Leta upp event hubs-namnområdet i Azure Portal med hjälp av huvudsökfältet eller det vänstra navigeringsfältet.
På översiktssidan väljer du Åtkomstkontroll (IAM) på den vänstra menyn.
På sidan Åtkomstkontroll (IAM) väljer du fliken Rolltilldelningar .
Välj + Lägg till på den översta menyn och sedan Lägg till rolltilldelning från den resulterande nedrullningsbara menyn.
Använd sökrutan för att filtrera resultatet till önskad roll. I det här exemplet söker
Azure Event Hubs Data Owner
du efter och väljer matchande resultat. Välj sedan Nästa.Under Tilldela åtkomst till väljer du Användare, grupp eller tjänstens huvudnamn och sedan + Välj medlemmar.
I dialogrutan söker du efter ditt Microsoft Entra-användarnamn (vanligtvis din user@domain e-postadress) och väljer sedan Välj längst ned i dialogrutan.
Välj Granska + tilldela för att gå till den sista sidan och sedan Granska + tilldela igen för att slutföra processen.
Skicka händelser
I det här avsnittet skapar du ett Python-skript för att skicka händelser till händelsehubben som du skapade tidigare.
Öppna din favoritredigerare för Python, till exempel Visual Studio Code.
Skapa ett skript med namnet send.py. Det här skriptet skickar en batch händelser till den händelsehubb som du skapade tidigare.
Klistra in följande kod i send.py:
I koden använder du verkliga värden för att ersätta följande platshållare:
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
EVENT_HUB_NAME
import asyncio from azure.eventhub import EventData from azure.eventhub.aio import EventHubProducerClient from azure.identity.aio import DefaultAzureCredential EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE" EVENT_HUB_NAME = "EVENT_HUB_NAME" credential = DefaultAzureCredential() async def run(): # Create a producer client to send messages to the event hub. # Specify a credential that has correct role assigned to access # event hubs namespace and the event hub name. producer = EventHubProducerClient( fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, eventhub_name=EVENT_HUB_NAME, credential=credential, ) async with producer: # Create a batch. event_data_batch = await producer.create_batch() # Add events to the batch. event_data_batch.add(EventData("First event ")) event_data_batch.add(EventData("Second event")) event_data_batch.add(EventData("Third event")) # Send the batch of events to the event hub. await producer.send_batch(event_data_batch) # Close credential when no longer needed. await credential.close() asyncio.run(run())
Kommentar
Exempel på andra alternativ för att skicka händelser till händelsehubben asynkront med hjälp av en anslutningssträng finns på sidan GitHub send_async.py. De mönster som visas där gäller även för att skicka händelser utan lösenord.
Ta emot händelser
Den här snabbstarten använder Azure Blob Storage som kontrollpunktslager. Kontrollpunktsarkivet används för att bevara kontrollpunkter (d.s. de senast lästa positionerna).
Följ dessa rekommendationer när du använder Azure Blob Storage som kontrollpunktslager:
- Använd en separat container för varje konsumentgrupp. Du kan använda samma lagringskonto, men använda en container per grupp.
- Använd inte containern för något annat och använd inte lagringskontot för något annat.
- Lagringskontot ska finnas i samma region som det distribuerade programmet finns i. Om programmet är lokalt kan du försöka välja den region som är närmast.
På sidan Lagringskonto i Azure Portal i avsnittet Blob Service kontrollerar du att följande inställningar är inaktiverade.
- Hierarkisk namnrymd
- Mjuk borttagning av blob
- Versionshantering
Skapa ett Azure Storage-konto och en blobcontainer
Skapa ett Azure Storage-konto och en blobcontainer i det genom att göra följande:
- Skapa ett Azure Storage-konto
- Skapa en blobcontainer.
- Autentisera till blobcontainern.
Se till att registrera anslutningssträng och containernamnet för senare användning i mottagarkoden.
När du utvecklar lokalt kontrollerar du att användarkontot som har åtkomst till blobdata har rätt behörigheter. Du behöver Storage Blob Data-deltagare för att läsa och skriva blobdata. Om du vill tilldela dig själv den här rollen måste du tilldelas rollen Administratör för användaråtkomst eller en annan roll som innehåller åtgärden Microsoft.Authorization/roleAssignments/write . Du kan tilldela Azure RBAC-roller till en användare med hjälp av Azure Portal, Azure CLI eller Azure PowerShell. Du kan lära dig mer om tillgängliga omfång för rolltilldelningar på översiktssidan för omfång .
I det här scenariot tilldelar du behörigheter till ditt användarkonto, begränsat till lagringskontot, för att följa principen om lägsta behörighet. Den här metoden ger användarna endast de minsta behörigheter som krävs och skapar säkrare produktionsmiljöer.
I följande exempel tilldelas rollen Storage Blob Data Contributor till ditt användarkonto, vilket ger både läs- och skrivåtkomst till blobdata i ditt lagringskonto.
Viktigt!
I de flesta fall tar det en minut eller två för rolltilldelningen att spridas i Azure, men i sällsynta fall kan det ta upp till åtta minuter. Om du får autentiseringsfel när du först kör koden väntar du en stund och försöker igen.
Leta upp ditt lagringskonto i Azure Portal med hjälp av huvudsökfältet eller det vänstra navigeringsfältet.
På översiktssidan för lagringskontot väljer du Åtkomstkontroll (IAM) på den vänstra menyn.
På sidan Åtkomstkontroll (IAM) väljer du fliken Rolltilldelningar .
Välj + Lägg till på den översta menyn och sedan Lägg till rolltilldelning från den resulterande nedrullningsbara menyn.
Använd sökrutan för att filtrera resultatet till önskad roll. I det här exemplet söker du efter Storage Blob Data Contributor och väljer matchande resultat och väljer sedan Nästa.
Under Tilldela åtkomst till väljer du Användare, grupp eller tjänstens huvudnamn och sedan + Välj medlemmar.
I dialogrutan söker du efter ditt Microsoft Entra-användarnamn (vanligtvis din user@domain e-postadress) och väljer sedan Välj längst ned i dialogrutan.
Välj Granska + tilldela för att gå till den sista sidan och sedan Granska + tilldela igen för att slutföra processen.
Installera paketen för att ta emot händelser
För mottagarsidan måste du installera ett eller flera paket. I den här snabbstarten använder du Azure Blob Storage för att spara kontrollpunkter så att programmet inte läser de händelser som det redan har läst. Den utför metadatakontrollpunkter på mottagna meddelanden med jämna mellanrum i en blob. Den här metoden gör det enkelt att fortsätta ta emot meddelanden senare där du slutade.
pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity
Skapa ett Python-skript för att ta emot händelser
I det här avsnittet skapar du ett Python-skript för att ta emot händelser från händelsehubben:
Öppna din favoritredigerare för Python, till exempel Visual Studio Code.
Skapa ett skript med namnet recv.py.
Klistra in följande kod i recv.py:
I koden använder du verkliga värden för att ersätta följande platshållare:
BLOB_STORAGE_ACCOUNT_URL
BLOB_CONTAINER_NAME
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
EVENT_HUB_NAME
import asyncio from azure.eventhub.aio import EventHubConsumerClient from azure.eventhub.extensions.checkpointstoreblobaio import ( BlobCheckpointStore, ) from azure.identity.aio import DefaultAzureCredential BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL" BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME" EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE" EVENT_HUB_NAME = "EVENT_HUB_NAME" credential = DefaultAzureCredential() async def on_event(partition_context, event): # Print the event data. print( 'Received the event: "{}" from the partition with ID: "{}"'.format( event.body_as_str(encoding="UTF-8"), partition_context.partition_id ) ) # Update the checkpoint so that the program doesn't read the events # that it has already read when you run it next time. await partition_context.update_checkpoint(event) async def main(): # Create an Azure blob checkpoint store to store the checkpoints. checkpoint_store = BlobCheckpointStore( blob_account_url=BLOB_STORAGE_ACCOUNT_URL, container_name=BLOB_CONTAINER_NAME, credential=credential, ) # Create a consumer client for the event hub. client = EventHubConsumerClient( fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, eventhub_name=EVENT_HUB_NAME, consumer_group="$Default", checkpoint_store=checkpoint_store, credential=credential, ) async with client: # Call the receive method. Read from the beginning of the partition # (starting_position: "-1") await client.receive(on_event=on_event, starting_position="-1") # Close credential when no longer needed. await credential.close() if __name__ == "__main__": # Run the main method. asyncio.run(main())
Kommentar
Exempel på andra alternativ för att ta emot händelser från Händelsehubb asynkront med hjälp av en anslutningssträng finns på sidan GitHub recv_with_checkpoint_store_async.py. De mönster som visas där gäller även för att ta emot händelser utan lösenord.
Kör mottagarappen
Om du vill köra skriptet öppnar du en kommandotolk som har Python i sökvägen och kör sedan det här kommandot:
python recv.py
Kör avsändarappen
Om du vill köra skriptet öppnar du en kommandotolk som har Python i sökvägen och kör sedan det här kommandot:
python send.py
Mottagarfönstret ska visa de meddelanden som skickades till händelsehubben.
Felsökning
Om du inte ser händelser i mottagarfönstret eller om koden rapporterar ett fel kan du prova följande felsökningstips:
Om du inte ser resultat från recy.py kör du send.py flera gånger.
Om du ser fel om "coroutine" när du använder den lösenordslösa koden (med autentiseringsuppgifter) kontrollerar du att du använder import från
azure.identity.aio
.Om du ser "Avsluten klientsession" med lösenordslös kod (med autentiseringsuppgifter) kontrollerar du att du stänger autentiseringsuppgifterna när du är klar. Mer information finns i Async-autentiseringsuppgifter.
Om du ser auktoriseringsfel med recv.py när du kommer åt lagringen kontrollerar du att du har följt stegen i Skapa ett Azure-lagringskonto och en blobcontainer och tilldelat rollen Storage Blob Data Contributor till tjänstens huvudnamn.
Om du får händelser med olika partitions-ID:t förväntas det här resultatet. Partitioner är en mekanism för organisering av data som har att göra med vilken underordnad parallellitet som krävs i de program som används. Antalet partitioner i en händelsehubb är direkt kopplat till antalet samtidiga läsare som du förväntar dig. Mer information finns i Läs mer om partitioner.
Nästa steg
I den här snabbstarten har du skickat och tagit emot händelser asynkront. Om du vill veta hur du skickar och tar emot händelser synkront går du till GitHub-sync_samples-sidan.
För alla exempel (både synkrona och asynkrona) på GitHub går du till Azure Event Hubs-klientbiblioteket för Python-exempel.