Utiliser Java pour envoyer ou recevoir des événements d’Azure Event Hubs

Ce guide de démarrage rapide montre comment recevoir des événements d’un hub d’événements et lui en envoyer en utilisant le package Java azure-messaging-eventhubs.

Conseil

Si vous utilisez des ressources Azure Event Hubs dans une application Spring, nous vous recommandons de considérer Spring Cloud Azure comme alternative. Azure Spring Cloud est un projet open source qui fournit une intégration de Spring fluide aux services Azure. Pour en savoir plus sur Spring Cloud Azure et pour voir un exemple d’utilisation d’Event Hubs, consultez Spring Cloud Stream avec Azure Event Hubs.

Prérequis

Si vous débutez avec Azure Event Hubs, consultez la vue d’ensemble d’Event Hubs avant de suivre ce guide de démarrage rapide.

Pour effectuer ce démarrage rapide, vous avez besoin de ce qui suit :

  • Abonnement Microsoft Azure. Pour utiliser les services Azure, y compris Azure Event Hubs, vous avez besoin d’un abonnement. Si vous n’avez pas de compte Azure, vous pouvez vous inscrire à un essai gratuit ou utiliser les avantages de votre abonnement MSDN quand vous créez un compte.
  • Un environnement de développement Java. Ce guide de démarrage rapide utilise Eclipse. Le Kit de développement Java (JDK) avec version 8 ou ultérieure est nécessaire.
  • Créez un espace de noms Event Hubs et un Event Hub. La première étape consiste à utiliser le portail Azure pour créer un espace de noms de type Event Hubs et obtenir les informations de gestion nécessaires à votre application pour communiquer avec le concentrateur d’événements. Pour créer un espace de noms et un hub d’événements, suivez la procédure décrite dans cet article. Ensuite, obtenez la chaîne de connexion de l’espace de noms Event Hubs en suivant les instructions à partir de l’article : Obtenir la chaîne de connexion. Vous utiliserez la chaîne de connexion plus loin dans ce guide de démarrage rapide.

Envoyer des événements

Cette section montre comment créer une application Java pour envoyer des événements à un hub d’événements.

Ajouter une référence à la bibliothèque Azure Event Hubs

Créez tout d’abord un nouveau projet Maven pour une application de console/shell dans votre environnement de développement Java favori. Mettez le fichier pom.xml à jour comme suit. La bibliothèque cliente de Java pour Event Hubs est disponible dans le Référentiel central Maven.

		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.18.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.11.2</version>
		    <scope>compile</scope>
		</dependency>

Notes

Procédez à une mise à jour vers la dernière version publiée sur le référentiel Maven.

Authentifier l’application sur Azure

Ce guide de démarrage rapide présente deux façons de vous connecter à Azure Event Hubs : sans mot de passe et avec une chaîne de connexion. La première option vous explique comment utiliser votre principal de sécurité dans Microsoft Entra ID et le contrôle d’accès en fonction du rôle (RBAC) pour vous connecter à un espace de noms Event Hubs. Vous n’avez pas à vous soucier d’avoir des chaînes de connexion codées en dur dans votre code, dans un fichier config ni dans un stockage sécurisé comme Azure Key Vault. La deuxième option consiste à se servir d’une chaîne de connexion pour se connecter à un espace de noms Event Hubs. Si vous débutez avec Azure, vous trouverez peut-être l’option de chaîne de connexion plus facile à suivre. Nous vous recommandons d’utiliser l’option sans mot de passe dans les applications réelles et les environnements de production. Pour plus d’informations, consultez Authentification et autorisation. Pour en savoir plus sur l’authentification sans mot de passe, reportez-vous à la page de présentation.

Attribuer des rôles à votre utilisateur Microsoft Entra

Lors du développement localement, vous devez vérifier que le compte d’utilisateur qui se connecte à Azure Event Hubs dispose des autorisations appropriées. Vous aurez besoin du rôle Propriétaire de données Azure Event Hubs pour envoyer et recevoir des messages. Pour vous attribuer ce rôle, vous aurez besoin du rôle Administrateur de l’accès utilisateur ou d’un autre rôle qui inclut l’action Microsoft.Authorization/roleAssignments/write. Vous pouvez attribuer des rôles RBAC Azure à un utilisateur à l’aide du Portail Azure, Azure CLI ou Azure PowerShell. Découvrez les étendues disponibles pour les attributions de rôles dans la page vue d’ensemble de l’étendue.

L’exemple suivant attribue le rôle Azure Event Hubs Data Owner à votre compte d’utilisateur, qui fournit un accès complet aux ressources Azure Event Hubs. Dans un scénario réel, suivez le principe des privilèges minimum pour accorder aux utilisateurs uniquement les autorisations minimales nécessaires à un environnement de production plus sécurisé.

Rôles intégrés Azure pour Azure Event Hubs

Pour Azure Event Hubs, la gestion des espaces de noms et de toutes les ressources associées via le portail Azure et l’API de gestion des ressources Azure est déjà protégée à l’aide du modèle RBAC Azure. Azure fournit les rôles Azure intégrés ci-dessous pour autoriser l’accès à un espace de noms Event Hubs :

Si vous souhaitez créer un rôle personnalisé, consultez Droits requis pour les opérations Service Bus.

Important

Dans la plupart des cas, la propagation de l’attribution de rôle dans Azure peut prendre une ou deux minutes. Dans de rares cas, cela peut prendre jusqu’à huit minutes. Si vous recevez des erreurs d’authentification lorsque vous exécutez votre code pour la première fois, patientez quelques instants et réessayez.

  1. Dans le portail Azure, recherchez votre espace de noms Event Hubs à l’aide de la barre de recherche principale ou de la navigation gauche.

  2. Dans la page vue d’ensemble, sélectionnez Contrôle d’accès (IAM) dans le menu de gauche.

  3. Sur la page Contrôle d’accès (IAM), sélectionnez l’onglet Attributions de rôles.

  4. Sélectionnez + Ajouter dans le menu supérieur, puis Ajouter une attribution de rôle dans le menu déroulant résultant.

    A screenshot showing how to assign a role.

  5. Utilisez la zone de recherche pour filtrer les résultats sur le rôle souhaité. Pour cet exemple, recherchez Azure Event Hubs Data Owner et sélectionnez le résultat correspondant. Ensuite, choisissez Suivant.

  6. Sous Attribuer l’accès à, sélectionnez Utilisateur, groupe ou principal de service, puis sélectionnez + Sélectionner des membres.

  7. Dans la boîte de dialogue, recherchez votre nom d’utilisateur Microsoft Entra (généralement votre adresse e-mail utilisateur@domaine), puis choisissez Sélectionner en bas de la boîte de dialogue.

  8. Sélectionnez Vérifier + affecter pour accéder à la page finale, puis Vérifier + attribuer à nouveau pour terminer le processus.

Écriture de code pour envoyer des messages à un hub d’événements

Ajoutez une classe nommée Sender, et ajoutez-y le code suivant :

Important

  • Mettez à jour <NAMESPACE NAME> avec le nom de votre espace de noms Event Hub.
  • Mettez à jour <EVENT HUB NAME> avec le nom de votre hub d’événements.
package ehubquickstart;

import com.azure.messaging.eventhubs.*;
import java.util.Arrays;
import java.util.List;

import com.azure.identity.*;

public class SenderAAD {

    // replace <NAMESPACE NAME> with the name of your Event Hubs namespace.
    // Example: private static final String namespaceName = "contosons.servicebus.windows.net";
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";

    // Replace <EVENT HUB NAME> with the name of your event hub. 
    // Example: private static final String eventHubName = "ordersehub";
    private static final String eventHubName = "<EVENT HUB NAME>";

    public static void main(String[] args) {
        publishEvents();
    }
    /**
     * Code sample for publishing events.
     * @throws IllegalArgumentException if the EventData is bigger than the max batch size.
     */
    public static void publishEvents() {
        // create a token using the default Azure credential        
        DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
                .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
                .build();

        // create a producer client        
        EventHubProducerClient producer = new EventHubClientBuilder()        
            .fullyQualifiedNamespace(namespaceName)
            .eventHubName(eventHubName)
            .credential(credential)
            .buildProducerClient();

        // sample events in an array
        List<EventData> allEvents = Arrays.asList(new EventData("Foo"), new EventData("Bar"));

        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        for (EventData eventData : allEvents) {
            // try to add the event from the array to the batch
            if (!eventDataBatch.tryAdd(eventData)) {
                // if the batch is full, send it and then create a new batch
                producer.send(eventDataBatch);
                eventDataBatch = producer.createBatch();

                // Try to add that event that couldn't fit before.
                if (!eventDataBatch.tryAdd(eventData)) {
                    throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
                }
            }
        }
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) {
            producer.send(eventDataBatch);
        }
        producer.close();
    }   
}

Générez le programme et assurez-vous qu’il n’y a aucune erreur. Vous exécuterez ce programme après avoir exécuté le programme récepteur.

Recevoir des événements

Le code de ce tutoriel est basé sur l’exemple EventProcessorClient sur GitHub, que vous pouvez analyser pour voir la version complète de l’application en cours.

Suivez les recommandations ci-dessous quand vous utilisez Stockage Blob Azure comme magasin de points de contrôle :

  • Utilisez un conteneur distinct pour chaque groupe de consommateurs. Vous pouvez utiliser le même compte de stockage, mais utiliser un conteneur par groupe.
  • N’utilisez pas le conteneur et le compte de stockage pour quoi que ce soit d’autre.
  • Le compte de stockage doit se trouver dans la même région que l’application déployée. Si l’application est locale, essayez de choisir la région la plus proche possible.

Sur la page Compte de stockage du Portail Azure, dans la section Service BLOB, vérifiez que les paramètres suivants sont désactivés.

  • Espace de noms hiérarchique
  • Suppression réversible de blob
  • Contrôle de version

Créer un compte Stockage Azure et un conteneur de blobs

Dans ce guide de démarrage rapide, vous utilisez Stockage Azure (plus particulièrement, Stockage Blob) comme magasin de points de contrôle. Le marquage de point de contrôle est un processus par lequel un processeur d’événements marque ou valide la position du dernier événement correctement traité dans une partition. Le marquage d’un point de contrôle s’effectue généralement au sein de la fonction qui traite les événements. Pour en savoir plus sur le marquage de point de contrôle, consultez Processeur d’événements.

Suivez les étapes ci-dessous pour créer un compte Stockage Azure.

  1. Création d’un compte de stockage Azure
  2. Créer un conteneur d’objets blob
  3. S’authentifier auprès du conteneur d’objets blob

Lors du développement local, assurez-vous que le compte d’utilisateur qui accède aux données blob dispose des autorisations appropriées. Vous aurez besoin du Contributeur aux données Blob de stockage pour lire et écrire des données blob. Pour vous attribuer ce rôle, vous aurez besoin du rôle Administrateur de l’accès utilisateur ou d’un autre rôle qui inclut l’action Microsoft.Authorization/roleAssignments/write. Vous pouvez attribuer des rôles RBAC Azure à un utilisateur à l’aide du Portail Azure, Azure CLI ou Azure PowerShell. Vous pouvez en savoir plus sur les étendues disponibles pour les attributions de rôles dans la page vue d’ensemble de l’étendue .

Dans ce scénario, vous allez attribuer des autorisations à votre compte d’utilisateur, étendues au compte de stockage, pour suivre le Principe des privilèges minimum. Cette pratique offre aux utilisateurs uniquement les autorisations minimales nécessaires et crée des environnements de production plus sécurisés.

L’exemple suivant affecte le rôle Contributeur aux données Blob du stockage à votre compte d’utilisateur, qui fournit à la fois un accès en lecture et en écriture aux données d’objet blob dans votre compte de stockage.

Important

Dans la plupart des cas, la propagation de l’attribution de rôle dans Azure peut prendre une ou deux minutes, mais dans de rares cas, cela peut prendre jusqu’à huit minutes. Si vous recevez des erreurs d’authentification lorsque vous exécutez votre code pour la première fois, patientez quelques instants et réessayez.

  1. Dans le Portail Azure, recherchez votre compte de stockage à l’aide de la barre de recherche principale ou de la navigation gauche.

  2. Dans la page vue d’ensemble du compte de stockage, sélectionnez Contrôle d’accès (IAM) dans le menu de gauche.

  3. Sur la page Contrôle d’accès (IAM), sélectionnez l’onglet Attributions de rôles.

  4. Sélectionnez + Ajouter dans le menu supérieur, puis Ajouter une attribution de rôle dans le menu déroulant résultant.

    A screenshot showing how to assign a storage account role.

  5. Utilisez la zone de recherche pour filtrer les résultats sur le rôle souhaité. Pour cet exemple, recherchez Contributeur aux données Blob du stockage, sélectionnez le résultat correspondant, puis choisissez Suivant.

  6. Sous Attribuer l’accès à, sélectionnez Utilisateur, groupe ou principal de service, puis sélectionnez + Sélectionner des membres.

  7. Dans la boîte de dialogue, recherchez votre nom d’utilisateur Microsoft Entra (généralement votre adresse e-mail utilisateur@domaine), puis choisissez Sélectionner en bas de la boîte de dialogue.

  8. Sélectionnez Vérifier + affecter pour accéder à la page finale, puis Vérifier + attribuer à nouveau pour terminer le processus.

Ajouter des bibliothèques Event Hubs à votre projet Java

Ajoutez les dépendances suivantes dans le fichier pom.xml.

	<dependencies>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs</artifactId>
		    <version>5.15.0</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
		    <version>1.16.1</version>
		</dependency>
		<dependency>
		    <groupId>com.azure</groupId>
		    <artifactId>azure-identity</artifactId>
		    <version>1.8.0</version>
		    <scope>compile</scope>
		</dependency>	
	</dependencies>
  1. Ajoutez les instructions import suivantes au début du fichier Java.

    import com.azure.messaging.eventhubs.*;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.*;
    import com.azure.storage.blob.*;
    import java.util.function.Consumer;
    
    import com.azure.identity.*;
    
  2. Créez une classe nommée Receiver, puis ajoutez les variables de chaîne suivantes à la classe. Remplacez les espaces réservés par les valeurs correctes.

    Important

    Remplacez les espaces réservés par les valeurs correctes.

    • <NAMESPACE NAME> par le nom de votre espace de noms Event Hubs.
    • <EVENT HUB NAME> par le nom de votre instance Event Hub dans l'espace de noms.
    private static final String namespaceName = "<NAMESPACE NAME>.servicebus.windows.net";
    private static final String eventHubName = "<EVENT HUB NAME>";
    
  3. Ajoutez la méthode main suivante à la classe.

    Important

    Remplacez les espaces réservés par les valeurs correctes.

    • <STORAGE ACCOUNT NAME> par le nom de votre compte de stockage Azure.
    • <CONTAINER NAME> par le nom du conteneur d’objets blob dans le compte de stockage.
    // create a token using the default Azure credential
    DefaultAzureCredential credential = new DefaultAzureCredentialBuilder()
            .authorityHost(AzureAuthorityHosts.AZURE_PUBLIC_CLOUD)
            .build();
    
    // Create a blob container client that you use later to build an event processor client to receive and process events
    BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
            .credential(credential)
            .endpoint("https://<STORAGE ACCOUNT NAME>.blob.core.windows.net")
            .containerName("<CONTAINER NAME>")
            .buildAsyncClient();
    
    // Create an event processor client to receive and process events and errors.
    EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
        .fullyQualifiedNamespace(namespaceName)
        .eventHubName(eventHubName)
        .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
        .processEvent(PARTITION_PROCESSOR)
        .processError(ERROR_HANDLER)
        .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))            
        .credential(credential)
        .buildEventProcessorClient();
    
    System.out.println("Starting event processor");
    eventProcessorClient.start();
    
    System.out.println("Press enter to stop.");
    System.in.read();
    
    System.out.println("Stopping event processor");
    eventProcessorClient.stop();
    System.out.println("Event processor stopped.");
    
    System.out.println("Exiting process");  
    
  1. Ajoutez les deux méthodes d’assistance (PARTITION_PROCESSOR et ERROR_HANDLER) qui traitent les événements et les erreurs à la classe Receiver.

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();
    
        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
            partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());
    
        // Every 10 events received, it will update the checkpoint stored in Azure Blob Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };
    
    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
            errorContext.getPartitionContext().getPartitionId(),
            errorContext.getThrowable());
    };
    
  2. Générez le programme et assurez-vous qu’il n’y a aucune erreur.

Exécution des applications

  1. Exécutez d’abord l’application réceptrice.

  2. Ensuite, exécutez l’application émettrice.

  3. Dans la fenêtre de l’application réceptrice, vérifiez que vous voyez les événements qui ont été publiés par l’application émettrice.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
  4. Appuyez sur Entrée dans la fenêtre d’application réceptrice pour arrêter l’application.

    Starting event processor
    Press enter to stop.
    Processing event from partition 0 with sequence number 331 with body: Foo
    Processing event from partition 0 with sequence number 332 with body: Bar
    
    Stopping event processor
    Event processor stopped.
    Exiting process
    

Étapes suivantes

Consultez les exemples suivants sur GitHub :