Azure 队列存储入门(使用 .NET)

概述

Azure 队列存储用于在应用程序组件之间进行云消息传送。 设计可缩放的应用程序时,应用程序组件通常是分离的,各组件可以独立缩放。 队列存储在应用程序组件之间提供异步消息传送,无论这些组件是运行在云中、桌面上、本地服务器上还是移动设备上。 队列存储还支持管理异步任务以及构建过程工作流。

关于本教程

本教程展示了如何针对某些使用 Azure 队列存储的常见情形编写 .NET 代码。 涉及的方案包括创建和删除队列、添加、读取和删除队列消息。

估计完成时间: 45 分钟

先决条件

什么是队列存储?

Azure 队列存储是一项可存储大量消息的服务,用户可以通过经验证的呼叫,使用 HTTP 或 HTTPS 从世界任何地方访问这些消息。 一条队列消息的大小最多可为 64 KB,一个队列中可以包含数百万条消息,直至达到存储帐户的总容量限值。 队列存储通常用于创建要异步处理的积压工作 (backlog)。

队列服务概念

Azure 队列服务包含以下组件:

Azure 队列服务组件

  • 存储帐户: 对 Azure 存储进行的所有访问都要通过存储帐户完成。 有关存储帐户的详细信息,请参阅存储帐户概述

  • 队列:一个队列包含一组消息。 所有消息必须位于相应的队列中。 请注意,队列名称必须全部小写。 有关命名队列的信息,请参阅 命名队列和元数据

  • 消息: 一条消息(无论哪种格式)的最大大小为 64 KB。 消息可以保留在队列中的最长时间为 7 天。 在 2017-07-29 或更高版本中,最大生存时间可以是任何正数,或者是 -1(表示消息不会过期)。 如果省略此参数,则默认的生存时间为 7 天。

  • URL 格式:使用以下 URL 格式对队列进行寻址:http://<storage account>.queue.core.windows.net/<queue>

    可使用以下 URL 访问示意图中的某个队列:

    http://myaccount.queue.core.windows.net/incoming-orders

创建 Azure 存储帐户

创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户。 若要了解更多信息,请参阅 创建存储帐户

还可使用 Azure PowerShellAzure CLI适用于 .NET 的 Azure 存储资源提供程序创建 Azure 存储帐户。

如果暂时不想在 Azure 中创建存储帐户,也可以使用 Azurite 存储模拟器在本地环境中运行和测试代码。 有关详细信息,请参阅使用 Azurite 模拟器进行本地 Azure 存储开发

设置开发环境

接下来在 Visual Studio 中设置开发环境,即可试用本指南中的代码示例。

创建 Windows 控制台应用程序项目

在 Visual Studio 中创建新的 Windows 控制台应用程序。 以下步骤演示了如何在 Visual Studio 2019 中创建控制台应用程序。 在其他版本的 Visual Studio 中,这些步骤是类似的。

  1. 选择“文件”>“新建”>“项目”
  2. 选择“平台”>“Windows”
  3. 选择“控制台应用(.NET Framework)”
  4. 选择“下一步”
  5. 在“项目名称”字段中输入应用程序的名称
  6. 选择“创建”

本教程中的所有代码示例都可以添加到控制台应用程序的 Program.cs 文件的 Main() 方法。

可以在任意类型的 .NET 应用程序(包括 Azure 云服务或 Web 应用,以及桌面和移动应用程序)中使用 Azure 存储客户端库。 为简单起见,我们在本指南中使用控制台应用程序。

使用 NuGet 安装所需包

为完成此教程,需要在项目中引用下述四个包:

可以使用 NuGet 获取这些包。 执行以下步骤:

  1. 在“解决方案资源管理器”中,右键单击项目并选择“管理 NuGet 包”。
  2. 选择“浏览”
  3. 在线搜索 Azure.Storage.Queues,然后选择“安装”以安装 Azure 存储客户端库及其依赖项。 这还会安装 Azure.Storage.Common 和 Azure.Core 库,它们是队列库的依赖项。
  4. 在线搜索 System.Configuration.ConfigurationManager,然后选择“安装”以安装 Configuration Manager。

确定目标环境

可从两个环境中选择用于运行本指南中示例的环境:

  • 可针对云中的 Azure 存储帐户运行代码。
  • 可针对 Azurite 存储仿真器运行代码。 Azurite 是仿真云中 Azure 存储帐户的本地环境。 当应用程序正在开发时,Azurite 是用于测试和调试代码的免费选项。 模拟器使用已知帐户和密钥。 有关详细信息,请参阅使用 Azurite 仿真器进行本地 Azure 存储开发和测试

注意

可以指向存储模拟器以避免引发与 Azure 存储有关的任何费用。 但是,如果你确实选择以云中的 Azure 存储帐户为目标,则执行此教程的费用可以忽略不计。

获取存储连接字符串

用于 .NET 的 Azure 存储客户端库支持使用存储连接字符串来配置终结点和用于访问存储服务的凭据。 有关详细信息,请参阅管理存储帐户访问密钥

从 Azure 门户复制凭据

此示例代码需要对存储帐户访问进行授权。 若要授权,请以连接字符串的形式向应用程序提供存储帐户凭据。 若要查看存储帐户凭据,请执行以下操作:

  1. 导航到 Azure 门户

  2. 找到自己的存储帐户。

  3. 在存储帐户概述的“设置”部分,选择“访问密钥”。 此时会显示帐户访问密钥,以及每个密钥的完整连接字符串。

  4. 找到“密钥 1”下面的“连接字符串”值,单击“复制”按钮复制该连接字符串。 下一步需将此连接字符串值添加到某个环境变量。

    显示如何从 Azure 门户复制连接字符串的屏幕截图

有关连接字符串的详细信息,请参阅配置 Azure 存储的连接字符串

注意

存储帐户密钥类似于存储帐户的根密码。 始终要小心保护存储帐户密钥。 避免将其分发给其他用户、对其进行硬编码或将其保存在其他人可以访问的纯文本文件中。 如果认为密钥可能已泄漏,请使用 Azure 门户重新生成密钥。

维护存储连接字符串的最佳方法在配置文件中。 若要配置连接字符串,请从 Visual Studio 中的解决方案资源管理器打开 app.config 文件。 添加 <appSettings> 元素的内容,如此处所示。 将 connection-string 替换为你从门户中的存储帐户复制的值:

<configuration>
    <startup>
        <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.7.2" />
    </startup>
    <appSettings>
        <add key="StorageConnectionString" value="connection-string" />
    </appSettings>
</configuration>

例如,配置设置看起来类似于:

<add key="StorageConnectionString" value="DefaultEndpointsProtocol=https;AccountName=storagesample;AccountKey=GMuzNHjlB3S9itqZJHHCnRkrokLkcSyW7yK9BRbGp0ENePunLPwBgpxV1Z/pVo9zpem/2xSHXkMqTHHLcx8XRA==EndpointSuffix=core.windows.net" />

若要以 Azurite 存储仿真器为目标,可使用映射到已知帐户名称和密钥的快捷方式。 在这种情况下,连接字符串设置如下所示:

<add key="StorageConnectionString" value="UseDevelopmentStorage=true" />

添加 using 指令

将以下 using 指令添加到 Program.cs 文件顶部:

using System; // Namespace for Console output
using System.Configuration; // Namespace for ConfigurationManager
using System.Threading.Tasks; // Namespace for Task
using Azure.Identity;
using Azure.Storage.Queues; // Namespace for Queue storage types
using Azure.Storage.Queues.Models; // Namespace for PeekedMessage

创建队列存储客户端

使用 QueueClient 类可以检索存储在队列存储中的队列。 下面是创建服务客户端的一种方法:

//-------------------------------------------------
// Create the queue service client
//-------------------------------------------------
public void CreateQueueClient(string queueName)
{
    // Get the connection string from app settings
    string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];

    // Instantiate a QueueClient which will be used to create and manipulate the queue
    QueueClient queueClient = new QueueClient(connectionString, queueName);
}

提示

使用 QueueClient 类发送的消息必须采用可包含在具有 UTF-8 编码的 XML 请求中的格式。 或者,可以将 MessageEncoding 选项设置为 Base64 来处理不兼容的消息。

现在,你已准备好编写从队列存储读取数据并将数据写入队列存储的代码。

创建队列

此示例演示如何创建队列:

//-------------------------------------------------
// Create a message queue
//-------------------------------------------------
public bool CreateQueue(string queueName)
{
    try
    {
        // Get the connection string from app settings
        string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];

        // Instantiate a QueueClient which will be used to create and manipulate the queue
        QueueClient queueClient = new QueueClient(connectionString, queueName);

        // Create the queue
        queueClient.CreateIfNotExists();

        if (queueClient.Exists())
        {
            Console.WriteLine($"Queue created: '{queueClient.Name}'");
            return true;
        }
        else
        {
            Console.WriteLine($"Make sure the Azurite storage emulator running and try again.");
            return false;
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Exception: {ex.Message}\n\n");
        Console.WriteLine($"Make sure the Azurite storage emulator running and try again.");
        return false;
    }
}

在队列中插入消息

若要在现有队列中插入消息,请调用 SendMessage 方法。 消息可以是字符串(UTF-8 格式)或字节数组。 下面的代码将创建一个队列(如果该队列不存在)并插入一条消息:

//-------------------------------------------------
// Insert a message into a queue
//-------------------------------------------------
public void InsertMessage(string queueName, string message)
{
    // Get the connection string from app settings
    string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];

    // Instantiate a QueueClient which will be used to create and manipulate the queue
    QueueClient queueClient = new QueueClient(connectionString, queueName);

    // Create the queue if it doesn't already exist
    queueClient.CreateIfNotExists();

    if (queueClient.Exists())
    {
        // Send a message to the queue
        queueClient.SendMessage(message);
    }

    Console.WriteLine($"Inserted: {message}");
}

扫视下一条消息

通过调用 PeekMessages 方法,可以速览队列中的消息,不将其从队列中删除。 如果没有为 maxMessages 参数传递值,则默认设置是速览一条消息。

//-------------------------------------------------
// Peek at a message in the queue
//-------------------------------------------------
public void PeekMessage(string queueName)
{
    // Get the connection string from app settings
    string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];

    // Instantiate a QueueClient which will be used to manipulate the queue
    QueueClient queueClient = new QueueClient(connectionString, queueName);

    if (queueClient.Exists())
    { 
        // Peek at the next message
        PeekedMessage[] peekedMessage = queueClient.PeekMessages();

        // Display the message
        Console.WriteLine($"Peeked message: '{peekedMessage[0].Body}'");
    }
}

更改已排队消息的内容

可以更改队列中现有消息的内容。 如果消息表示工作任务,可使用此功能来更新该工作任务的状态。 以下代码使用新内容更新队列消息,并将可见性超时设置为再延长 60 秒。 这会保存与消息关联的工作的状态,并额外为客户端提供一分钟的时间来继续处理消息。 可使用此方法跟踪队列消息上的多步骤工作流,即使处理步骤因硬件或软件故障而失败,也无需从头开始操作。 通常同时保留重试计数,当消息重试次数超过 n 时再删除该消息。 这可避免每次处理某条消息时都触发应用程序错误。

//-------------------------------------------------
// Update an existing message in the queue
//-------------------------------------------------
public void UpdateMessage(string queueName)
{
    // Get the connection string from app settings
    string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];

    // Instantiate a QueueClient which will be used to manipulate the queue
    QueueClient queueClient = new QueueClient(connectionString, queueName);

    if (queueClient.Exists())
    {
        // Get the message from the queue
        QueueMessage[] message = queueClient.ReceiveMessages();

        // Update the message contents
        queueClient.UpdateMessage(message[0].MessageId, 
                message[0].PopReceipt, 
                "Updated contents",
                TimeSpan.FromSeconds(60.0)  // Make it invisible for another 60 seconds
            );
    }
}

取消下一条消息的排队

可通过两个步骤取消消息在队列中的排队。 调用 ReceiveMessages 时,会获得队列中的下一条消息。 从 ReceiveMessages 返回的消息对于从此队列读取消息的任何其他代码都是不可见的。 默认情况下,此消息持续 30 秒不可见。 若要完成从队列中删除消息,还必须调用 DeleteMessage。 此删除消息的两步过程可确保,如果代码因硬件或软件故障而无法处理消息,则代码的其他实例可以获取相同消息并重试。 代码在处理消息后会立即调用 DeleteMessage

//-------------------------------------------------
// Process and remove a message from the queue
//-------------------------------------------------
public void DequeueMessage(string queueName)
{
    // Get the connection string from app settings
    string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];

    // Instantiate a QueueClient which will be used to manipulate the queue
    QueueClient queueClient = new QueueClient(connectionString, queueName);

    if (queueClient.Exists())
    {
        // Get the next message
        QueueMessage[] retrievedMessage = queueClient.ReceiveMessages();

        // Process (i.e. print) the message in less than 30 seconds
        Console.WriteLine($"Dequeued message: '{retrievedMessage[0].Body}'");

        // Delete the message
        queueClient.DeleteMessage(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt);
    }
}

将 Async-Await 模式与公用队列存储 API 配合使用

此示例展示了如何将 Async-Await 模式与公用队列存储 API 配合使用。 示例调用每个给定方法的异步版本,如每个方法的 Async 后缀所示。 使用异步方法时,Async-Await 模式会暂停本地执行,直到调用完成。 此行为允许当前的线程执行其他工作,这有助于避免性能瓶颈并提高应用程序的整体响应能力。 有关在 .NET 中使用 Async-Await 模式的详细信息,请参阅 Async 和 Await(C# 和 Visual Basic)

//-------------------------------------------------
// Perform queue operations asynchronously
//-------------------------------------------------
public async Task QueueAsync(string queueName)
{
    // Get the connection string from app settings
    string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];

    // Instantiate a QueueClient which will be used to manipulate the queue
    QueueClient queueClient = new QueueClient(connectionString, queueName);

    // Create the queue if it doesn't already exist
    await queueClient.CreateIfNotExistsAsync();

    if (await queueClient.ExistsAsync())
    {
        Console.WriteLine($"Queue '{queueClient.Name}' created");
    }
    else
    {
        Console.WriteLine($"Queue '{queueClient.Name}' exists");
    }

    // Async enqueue the message
    await queueClient.SendMessageAsync("Hello, World");
    Console.WriteLine($"Message added");

    // Async receive the message
    QueueMessage[] retrievedMessage = await queueClient.ReceiveMessagesAsync();
    Console.WriteLine($"Retrieved message with content '{retrievedMessage[0].Body}'");

    // Async delete the message
    await queueClient.DeleteMessageAsync(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt);
    Console.WriteLine($"Deleted message: '{retrievedMessage[0].Body}'");

    // Async delete the queue
    await queueClient.DeleteAsync();
    Console.WriteLine($"Deleted queue: '{queueClient.Name}'");
}

使用用于取消消息排队的其他选项

可通过两种方式自定义队列中消息的检索。 首先,可获取一批消息(最多 32 条)。 其次,可以设置更长或更短的不可见超时时间,从而允许代码使用更多或更少时间来完全处理每个消息。

以下代码示例使用 ReceiveMessages 方法在一个调用中获取 20 条消息。 然后,使用 foreach 循环处理每条消息。 它还将每条消息的不可见超时时间设置为 5 分钟。 请注意,5 分钟超时时间对于所有消息都是同时开始的,因此在调用 ReceiveMessages 5 分钟后,尚未删除的任何消息都会再次变得可见。

//-----------------------------------------------------
// Process and remove multiple messages from the queue
//-----------------------------------------------------
public void DequeueMessages(string queueName)
{
    // Get the connection string from app settings
    string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];

    // Instantiate a QueueClient which will be used to manipulate the queue
    QueueClient queueClient = new QueueClient(connectionString, queueName);

    if (queueClient.Exists())
    {
        // Receive and process 20 messages
        QueueMessage[] receivedMessages = queueClient.ReceiveMessages(20, TimeSpan.FromMinutes(5));

        foreach (QueueMessage message in receivedMessages)
        {
            // Process (i.e. print) the messages in less than 5 minutes
            Console.WriteLine($"De-queued message: '{message.Body}'");

            // Delete the message
            queueClient.DeleteMessage(message.MessageId, message.PopReceipt);
        }
    }
}

获取队列长度

可以获取队列中消息的估计数。 GetProperties 方法返回包括消息计数在内的队列属性。 ApproximateMessagesCount 属性包含队列中的大致消息数。 此数字不低于队列中的实际消息数,但可能会更高。

//-----------------------------------------------------
// Get the approximate number of messages in the queue
//-----------------------------------------------------
public void GetQueueLength(string queueName)
{
    // Get the connection string from app settings
    string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];

    // Instantiate a QueueClient which will be used to manipulate the queue
    QueueClient queueClient = new QueueClient(connectionString, queueName);

    if (queueClient.Exists())
    {
        QueueProperties properties = queueClient.GetProperties();

        // Retrieve the cached approximate message count.
        int cachedMessagesCount = properties.ApproximateMessagesCount;

        // Display number of messages.
        Console.WriteLine($"Number of messages in queue: {cachedMessagesCount}");
    }
}

删除队列

若要删除队列及其包含的所有消息,请对队列对象调用 Delete 方法。

//-------------------------------------------------
// Delete the queue
//-------------------------------------------------
public void DeleteQueue(string queueName)
{
    // Get the connection string from app settings
    string connectionString = ConfigurationManager.AppSettings["StorageConnectionString"];

    // Instantiate a QueueClient which will be used to manipulate the queue
    QueueClient queueClient = new QueueClient(connectionString, queueName);

    if (queueClient.Exists())
    {
        // Delete the queue
        queueClient.Delete();
    }

    Console.WriteLine($"Queue deleted: '{queueClient.Name}'");
}

后续步骤

现在,你已了解了有关队列存储的基础知识,请单击下面的链接来了解更复杂的存储任务。

有关使用已弃用的 .NET 版本 11.x SDK 的相关代码示例,请参阅使用 .NET 版本 11.x 的代码示例