搭配使用提取傳遞與 JAVA 來接收事件

本文提供使用 [事件方格] 提取傳遞接收 CloudEvent 的快速逐步指南。 它提供範例程式碼以接收、認可 (從 [事件方格] 刪除事件)。

必要條件

在繼續之前,您需要具備的必要條件如下:

  • 了解什麼是提取傳遞。 如需詳細資訊,請參閱提取傳遞概念 (部分機器翻譯) 和提取傳遞概觀 (部分機器翻譯)。

  • 命名空間、主題和事件訂用帳戶。

  • 最新的「Beta」SDK 套件。 如果您使用 maven,可以參閱 maven 中央存放庫(英文)。

    重要

    可於「Beta」套件中取得提取傳遞資料平面 SDK 支援。 您應該在專案中使用最新的 Beta 套件。

  • 支援 JAVA 的 IDE,如 IntelliJ IDEA、Eclipse IDE 或 Visual Studio Code。

  • 執行 JAVA 8 語言層級的 JAVA JRE。

  • 您應該有關於某個主題的事件。 請參閱發佈事件至命名空間主題。

使用提取傳遞接收事件

您可以透過使用「接收」作業來指定命名空間主題和「佇列」事件訂用帳戶,以從 [事件方格] 讀取事件。 事件訂用帳戶是一種資源,可有效定義取用者用戶端可讀取的 CloudEvents 集合。 此範例程式碼會使用金鑰型驗證,因為它提供快速且簡單的驗證方法。 針對生產案例,您應該使用 Microsoft Entry ID 驗證,因為它提供更健全的驗證機制。

package com.azure.messaging.eventgrid.samples;

import com.azure.core.credential.AzureKeyCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.models.CloudEvent;
import com.azure.messaging.eventgrid.EventGridClient;
import com.azure.messaging.eventgrid.EventGridClientBuilder;
import com.azure.messaging.eventgrid.EventGridMessagingServiceVersion;
import com.azure.messaging.eventgrid.models.*;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

/**
 * <p>Simple demo consumer app of CloudEvents from queue event subscriptions created for namespace topics.
 * This code samples should use Java 1.8 level or above to avoid compilation errors.
 * You should consult the resources below to use the client SDK and set up your project using maven.
 * @see <a href="https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventgrid/azure-messaging-eventgrid">Event Grid data plane client SDK documentation</a>
 * @see <a href="https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/boms/azure-sdk-bom/README.md">Azure BOM for client libraries</a>
 * @see <a href="https://aka.ms/spring/versions">Spring Version Mapping</a> if you are using Spring.
 * @see <a href="https://aka.ms/azsdk">Tool with links to control plane and data plane SDKs across all languages supported</a>.
 *</p>
 */
public class NamespaceTopicConsumer {
    private static final String TOPIC_NAME = "<yourNamespaceTopicName>";
    public static final String EVENT_SUBSCRIPTION_NAME = "<yourEventSusbcriptionName>";
    public static final String ENDPOINT =  "<yourFullHttpsUrlToTheNamespaceEndpoint>";
    public static final int MAX_NUMBER_OF_EVENTS_TO_RECEIVE = 10;
    public static final Duration MAX_WAIT_TIME_FOR_EVENTS = Duration.ofSeconds(10);

    private static EventGridClient eventGridClient;
    private static List<String> receivedCloudEventLockTokens = new ArrayList<>();
    private static List<CloudEvent> receivedCloudEvents = new ArrayList<>();

    //TODO  Do NOT include keys in source code. This code's objective is to give you a succinct sample about using Event Grid, not to provide an authoritative example for handling secrets in applications.
    /**
     * For security concerns, you should not have keys or any other secret in any part of the application code.
     * You should use services like Azure Key Vault for managing your keys.
     */
    public static final AzureKeyCredential CREDENTIAL = new AzureKeyCredential("<namespace key>");
    public static void main(String[] args) {
        //TODO Update Event Grid version number to your desired version. You can find more information on data plane APIs here:
        //https://video2.skills-academy.com/en-us/rest/api/eventgrid/.
        eventGridClient = new EventGridClientBuilder()
                .httpClient(HttpClient.createDefault())  // Requires Java 1.8 level
                .endpoint(ENDPOINT)
                .serviceVersion(EventGridMessagingServiceVersion.V2023_06_01_PREVIEW)
                .credential(CREDENTIAL).buildClient();   // you may want to use .buildAsyncClient() for an asynchronous (project reactor) client.

        System.out.println("Waiting " +  MAX_WAIT_TIME_FOR_EVENTS.toSecondsPart() + " seconds for events to be read...");
        List<ReceiveDetails> receiveDetails = eventGridClient.receiveCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME,
                MAX_NUMBER_OF_EVENTS_TO_RECEIVE, MAX_WAIT_TIME_FOR_EVENTS).getValue();

        for (ReceiveDetails detail : receiveDetails) {
            // Add order message received to a tracking list
            CloudEvent orderCloudEvent = detail.getEvent();
            receivedCloudEvents.add(orderCloudEvent);
            // Add lock token to a tracking list. Lock token functions like an identifier to a cloudEvent
            BrokerProperties metadataForCloudEventReceived = detail.getBrokerProperties();
            String lockToken = metadataForCloudEventReceived.getLockToken();
            receivedCloudEventLockTokens.add(lockToken);
        }
        System.out.println("<-- Number of events received: " + receivedCloudEvents.size());

認可事件

若要認可事件,請使用用於接收事件的相同程式碼,並新增下列幾行以呼叫認可私人方法:

        // Acknowledge (i.e. delete from Event Grid the) events
        acknowledge(receivedCloudEventLockTokens);

認可方法的範例實作以及用來列印失敗鎖定權杖相關資訊的公用程式方法如下:

    private static void acknowledge(List<String> lockTokens) {
        AcknowledgeResult acknowledgeResult = eventGridClient.acknowledgeCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new AcknowledgeOptions(lockTokens));
        List<String> succeededLockTokens = acknowledgeResult.getSucceededLockTokens();
        if (succeededLockTokens != null && lockTokens.size() >= 1)
            System.out.println("@@@ " + succeededLockTokens.size() + " events were successfully acknowledged:");
        for (String lockToken : succeededLockTokens) {
            System.out.println("    Acknowledged event lock token: " + lockToken);
        }
        // Print the information about failed lock tokens
        if (succeededLockTokens.size() < lockTokens.size()) {
            System.out.println("    At least one event was not acknowledged (deleted from Event Grid)");
            writeFailedLockTokens(acknowledgeResult.getFailedLockTokens());
        }
    }

    private static void writeFailedLockTokens(List<FailedLockToken> failedLockTokens) {
        for (FailedLockToken failedLockToken : failedLockTokens) {
            System.out.println("    Failed lock token: " + failedLockToken.getLockToken());
            System.out.println("    Error code: " + failedLockToken.getErrorCode());
            System.out.println("    Error description: " + failedLockToken.getErrorDescription());
        }
    }

釋放事件

釋放事件,使其可供重新傳遞。 與認可事件類似,您可以新增下列靜態方法和一行進行叫用,藉此釋放由作為引數傳遞的鎖定權杖識別的事件。 您需要 writeFailedLockTokens 方法,才能編譯此方法。

   private static void release(List<String> lockTokens) {
        ReleaseResult releaseResult = eventGridClient.releaseCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new ReleaseOptions(lockTokens));
        List<String> succeededLockTokens = releaseResult.getSucceededLockTokens();
        if (succeededLockTokens != null && lockTokens.size() >= 1)
            System.out.println("^^^ " + succeededLockTokens.size() + " events were successfully released:");
        for (String lockToken : succeededLockTokens) {
            System.out.println("    Released event lock token: " + lockToken);
        }
        // Print the information about failed lock tokens
        if (succeededLockTokens.size() < lockTokens.size()) {
            System.out.println("    At least one event was not released back to Event Grid.");
            writeFailedLockTokens(releaseResult.getFailedLockTokens());
        }
    }

拒絕事件

拒絕取用者應用程式無法處理的事件。 您拒絕事件的條件包括無法剖析的格式錯誤事件或處理事件的應用程式發生問題。

    private static void reject(List<String> lockTokens) {
        RejectResult rejectResult = eventGridClient.rejectCloudEvents(TOPIC_NAME, EVENT_SUBSCRIPTION_NAME, new RejectOptions(lockTokens));
        List<String> succeededLockTokens = rejectResult.getSucceededLockTokens();
        if (succeededLockTokens != null && lockTokens.size() >= 1)
            System.out.println("--- " + succeededLockTokens.size() + " events were successfully rejected:");
        for (String lockToken : succeededLockTokens) {
            System.out.println("    Rejected event lock token: " + lockToken);
        }
        // Print the information about failed lock tokens
        if (succeededLockTokens.size() < lockTokens.size()) {
            System.out.println("    At least one event was not rejected.");
            writeFailedLockTokens(rejectResult.getFailedLockTokens());
        }
    }

下一步