如何通过 Java 使用队列存储

概述

本指南演示了如何为使用 Azure 队列存储服务的常见方案编写代码。 这些示例用 Java 编写并使用用于 Java 的 Azure 存储 SDK。 方案包括插入速览获取删除队列消息。 还介绍了用于创建删除队列的代码。 有关队列的详细信息,请参阅后续步骤部分。

什么是队列存储?

Azure 队列存储是一项可存储大量消息的服务,用户可以通过经验证的呼叫,使用 HTTP 或 HTTPS 从世界任何地方访问这些消息。 一条队列消息的大小最多可为 64 KB,一个队列中可以包含数百万条消息,直至达到存储帐户的总容量限值。 队列存储通常用于创建要异步处理的积压工作 (backlog)。

队列服务概念

Azure 队列服务包含以下组件:

Azure 队列服务组件

  • 存储帐户: 对 Azure 存储进行的所有访问都要通过存储帐户完成。 有关存储帐户的详细信息,请参阅存储帐户概述

  • 队列:一个队列包含一组消息。 所有消息必须位于相应的队列中。 请注意,队列名称必须全部小写。 有关命名队列的信息,请参阅 命名队列和元数据

  • 消息: 一条消息(无论哪种格式)的最大大小为 64 KB。 消息可以保留在队列中的最长时间为 7 天。 在 2017-07-29 或更高版本中,最大生存时间可以是任何正数,或者是 -1(表示消息不会过期)。 如果省略此参数,则默认的生存时间为 7 天。

  • URL 格式:使用以下 URL 格式对队列进行寻址:http://<storage account>.queue.core.windows.net/<queue>

    可使用以下 URL 访问示意图中的某个队列:

    http://myaccount.queue.core.windows.net/incoming-orders

创建 Azure 存储帐户

创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户。 若要了解更多信息,请参阅 创建存储帐户

还可使用 Azure PowerShellAzure CLI适用于 .NET 的 Azure 存储资源提供程序创建 Azure 存储帐户。

如果暂时不想在 Azure 中创建存储帐户,也可以使用 Azurite 存储模拟器在本地环境中运行和测试代码。 有关详细信息,请参阅使用 Azurite 模拟器进行本地 Azure 存储开发

创建 Java 应用程序

首先,验证你的开发系统是否满足用于 Java 的 Azure 队列存储客户端库 v12中列出的先决条件。

创建名为 queues-how-to-v12 的 Java 应用程序:

  1. 在控制台窗口(例如 cmd、PowerShell 或 Bash)中,使用 Maven 创建名为 blob-quickstart-v12 的新控制台应用queues-how-to-v12。 键入以下 mvn 命令,创建“hello world”Java 项目。

     mvn archetype:generate \
         --define interactiveMode=n \
         --define groupId=com.queues.howto \
         --define artifactId=queues-howto-v12 \
         --define archetypeArtifactId=maven-archetype-quickstart \
         --define archetypeVersion=1.4
    
    mvn archetype:generate `
        --define interactiveMode=n `
        --define groupId=com.queues.howto `
        --define artifactId=queues-howto-v12 `
        --define archetypeArtifactId=maven-archetype-quickstart `
        --define archetypeVersion=1.4
    
  2. 生成项目的输出应如下所示:

    [INFO] Scanning for projects...
    [INFO]
    [INFO] ------------------< org.apache.maven:standalone-pom >-------------------
    [INFO] Building Maven Stub Project (No POM) 1
    [INFO] --------------------------------[ pom ]---------------------------------
    [INFO]
    [INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>>
    [INFO]
    [INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<<
    [INFO]
    [INFO]
    [INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom ---
    [INFO] Generating project in Batch mode
    [INFO] ----------------------------------------------------------------------------
    [INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4
    [INFO] ----------------------------------------------------------------------------
    [INFO] Parameter: groupId, Value: com.queues.howto
    [INFO] Parameter: artifactId, Value: queues-howto-v12
    [INFO] Parameter: version, Value: 1.0-SNAPSHOT
    [INFO] Parameter: package, Value: com.queues.howto
    [INFO] Parameter: packageInPathFormat, Value: com/queues/howto
    [INFO] Parameter: version, Value: 1.0-SNAPSHOT
    [INFO] Parameter: package, Value: com.queues.howto
    [INFO] Parameter: groupId, Value: com.queues.howto
    [INFO] Parameter: artifactId, Value: queues-howto-v12
    [INFO] Project created from Archetype in dir: C:\queues\queues-howto-v12
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  6.775 s
    [INFO] Finished at: 2020-08-17T15:27:31-07:00
    [INFO] ------------------------------------------------------------------------
    
  3. 切换到新创建的 queues-howto-v12 目录。

    cd queues-howto-v12
    

安装包

在文本编辑器中打开 pom.xml 文件pom.xml。 将以下依赖项元素添加到依赖项组。

<dependency>
  <groupId>com.azure</groupId>
  <artifactId>azure-storage-queue</artifactId>
  <version>12.6.0</version>
</dependency>

配置应用程序以访问队列存储

将下列 import 语句添加到需要在其中使用 Azure 存储 API 来访问队列的 Java 文件的顶部:

// Include the following imports to use queue APIs
import com.azure.core.util.*;
import com.azure.storage.queue.*;
import com.azure.storage.queue.models.*;

设置 Azure 存储连接字符串

Azure 存储客户端使用存储连接字符串来访问数据管理服务。 获取 Azure 门户中列出的你的存储帐户的名称和主访问密钥。 将它们用作连接字符串中的 AccountNameAccountKey 值。 此示例演示如何声明一个静态字段以保存连接字符串:

// Define the connection-string with your values
final String connectStr = 
    "DefaultEndpointsProtocol=https;" +
    "AccountName=your_storage_account;" +
    "AccountKey=your_storage_account_key";

以下示例假设你有一个包含存储连接字符串的 String 对象。

如何:创建队列

QueueClient 对象包含用于与队列进行交互的操作。 以下代码创建 QueueClient 对象。 使用该 QueueClient 对象创建要使用的队列。

public static String createQueue(String connectStr)
{
    try
    {
        // Create a unique name for the queue
        String queueName = "queue-" + java.util.UUID.randomUUID();

        System.out.println("Creating queue: " + queueName);

        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queue = new QueueClientBuilder()
                                .connectionString(connectStr)
                                .queueName(queueName)
                                .buildClient();

        // Create the queue
        queue.create();
        return queue.getQueueName();
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println("Error code: " + e.getErrorCode() + "Message: " + e.getMessage());
        return null;
    }
}

如何:向队列添加消息

若要在现有队列中插入消息,请调用 sendMessage 方法。 消息可以是字符串(UTF-8 格式)或字节数组。 下面是将字符串消息发送到队列的代码。

public static void addQueueMessage
    (String connectStr, String queueName, String messageText)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        System.out.println("Adding message to the queue: " + messageText);

        // Add a message to the queue
        queueClient.sendMessage(messageText);
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

如何:扫视下一条消息

通过调用 peekMessage,可以速览队列前面的消息,而不会从队列中删除它。

public static void peekQueueMessage
    (String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // Peek at the first message
        PeekedMessageItem peekedMessageItem = queueClient.peekMessage();
        System.out.println("Peeked message: " + peekedMessageItem.getMessageText());
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

如何:更改已排队消息的内容

可以更改队列中现有消息的内容。 如果消息表示一个工作任务,则可以使用此功能来更新状态。 以下代码使用新内容更新队列消息,并将可见性超时设置为再延长 30 秒。 延长可见性超时会再给客户端 30 秒时间来继续处理该消息。 你还可以保留重试计数。 如果消息重试了 n 次以上,则你会将其删除。 此方案可避免每次处理某条消息时都触发应用程序错误。

下面的代码示例将在消息队列中进行搜索,查找与搜索字符串匹配的第一个消息内容,对消息内容进行修改,然后退出。

public static void updateQueueMessage
    (String connectStr, String queueName,
    String searchString, String updatedContents)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // The maximum number of messages to retrieve is 32
        final int MAX_MESSAGES = 32;

        // Iterate through the queue messages
        for (QueueMessageItem message : queueClient.receiveMessages(MAX_MESSAGES))
        {
            // Check for a specific string
            if (message.getMessageText().equals(searchString))
            {
                // Update the message to be visible in 30 seconds
                queueClient.updateMessage(message.getMessageId(),
                                          message.getPopReceipt(),
                                          updatedContents,
                                          Duration.ofSeconds(30));
                System.out.println(
                    String.format("Found message: \'%s\' and updated it to \'%s\'",
                                    searchString,
                                    updatedContents)
                                  );
                break;
            }
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

以下代码示例只更新队列中的第一个可见消息。

public static void updateFirstQueueMessage
    (String connectStr, String queueName, String updatedContents)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // Get the first queue message
        QueueMessageItem message = queueClient.receiveMessage();

        // Check for a specific string
        if (null != message)
        {
            // Update the message to be visible in 30 seconds
            UpdateMessageResult result = queueClient.updateMessage(message.getMessageId(),
                                                                   message.getPopReceipt(),
                                                                   updatedContents,
                                                                   Duration.ofSeconds(30));
            System.out.println("Updated the first message with the receipt: " +
                    result.getPopReceipt());
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

如何:获取队列长度

可以获取队列中消息的估计数。

getProperties 方法可返回多个值,包括队列中的当前消息数。 此计数仅为近似值,因为可能会在请求后添加或删除消息。 getApproximateMessageCount 方法可返回通过调用 getProperties 检索到的最后一个值,而不会调用队列存储。

public static void getQueueLength(String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        QueueProperties properties = queueClient.getProperties();
        long messageCount = properties.getApproximateMessagesCount();

        System.out.println(String.format("Queue length: %d", messageCount));
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

如何:取消对下一条消息的排队

代码通过两个步骤来取消对队列中某条消息的排队。 调用 receiveMessage 时,会获得队列中的下一条消息。 从 receiveMessage 返回的消息对于从此队列读取消息的任何其他代码都是不可见的。 默认情况下,此消息持续 30 秒不可见。 若要完成从队列中删除消息,还必须调用 deleteMessage。 如果你的代码未能处理消息,此两步过程可确保你可以获取同一消息并重试。 代码在处理消息后会立即调用 deleteMessage

public static void dequeueMessage(String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // Get the first queue message
        QueueMessageItem message = queueClient.receiveMessage();

        // Check for a specific string
        if (null != message)
        {
            System.out.println("Dequeing message: " + message.getMessageText());

            // Delete the message
            queueClient.deleteMessage(message.getMessageId(), message.getPopReceipt());
        }
        else
        {
            System.out.println("No visible messages in queue");
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

用于取消对消息进行排队的其他选项

可以通过两种方式自定义队列中的消息检索。 首先,获取一批消息(最多 32 条)。 其次,设置更长或更短的不可见超时时间,从而允许代码使用更多或更少的时间来完全处理每个消息。

以下代码示例使用 receiveMessages 方法在一个调用中获取 20 条消息。 然后,使用 for 循环处理每条消息。 它还将每条消息的不可见超时设置为 5 分钟(300 秒)。 超时同时针对所有消息启动。 自调用 receiveMessages 起经过五分钟后,未删除的任何消息都将再次变得可见。

public static void dequeueMessages(String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // The maximum number of messages to retrieve is 20
        final int MAX_MESSAGES = 20;

        // Retrieve 20 messages from the queue with a
        // visibility timeout of 300 seconds (5 minutes)
        for (QueueMessageItem message : queueClient.receiveMessages(MAX_MESSAGES,
                Duration.ofSeconds(300), Duration.ofSeconds(1), new Context("key1", "value1")))
        {
            // Do processing for all messages in less than 5 minutes,
            // deleting each message after processing.
            System.out.println("Dequeing message: " + message.getMessageText());
            queueClient.deleteMessage(message.getMessageId(), message.getPopReceipt());
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

如何:列出队列

若要获取当前队列的列表,请调用 QueueServiceClient.listQueues() 方法,它将返回 QueueItem 对象的集合。

public static void listQueues(String connectStr)
{
    try
    {
        // Instantiate a QueueServiceClient which will be
        // used to list the queues
        QueueServiceClient queueServiceClient = new QueueServiceClientBuilder()
                                    .connectionString(connectStr)
                                    .buildClient();

        // Loop through the collection of queues.
        for (QueueItem queue : queueServiceClient.listQueues())
        {
            // Output each queue name.
            System.out.println(queue.getName());
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

如何:删除队列

若要删除队列及其包含的所有消息,请对 QueueClient 对象调用 delete 方法。

public static void deleteMessageQueue(String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        System.out.println("Deleting queue: " + queueClient.getQueueName());

        // Delete the queue
        queueClient.delete();
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

提示

查看 Azure 存储代码示例存储库

如需易用且能够下载和运行的端到端 Azure 存储代码示例,请查看我们的 Azure 存储示例列表。

后续步骤

现在,你已了解了有关队列存储的基础知识,请单击下面的链接来了解更复杂的存储任务。

有关使用已弃用的 Java 版本 8 SDK 的相关代码示例,请参阅使用 Java 版本 8 的代码示例