Python 用 Azure Storage Queues クライアント ライブラリ - バージョン 12.9.0

Azure キュー ストレージは、HTTP または HTTPS を使用した認証された呼び出しを介して世界中のどこからでもアクセスできる大量のメッセージを格納するためのサービスです。 1 つのキュー メッセージのサイズは最大 64 KiB で、キューにはストレージ アカウントの合計容量制限まで、数百万のメッセージを含めることができます。

キュー ストレージの一般的な用途には、次のようなものがあります。

  • 非同期に処理する作業のバックログを作成する
  • 分散アプリケーションのさまざまな部分間でメッセージを渡す

ソースコード | パッケージ (PyPI) | パッケージ (Conda) | API リファレンス ドキュメント | 製品ドキュメント | サンプル

作業の開始

前提条件

パッケージをインストールする

pip を使用して Python 用 Azure Storage Queues クライアント ライブラリをインストールします。

pip install azure-storage-queue

ストレージ アカウントの作成

新しいストレージ アカウントを作成する場合は、Azure PortalAzure PowerShell、または Azure CLI を使用できます。

# Create a new resource group to hold the storage account -
# if using an existing resource group, skip this step
az group create --name my-resource-group --location westus2

# Create the storage account
az storage account create -n my-storage-account-name -g my-resource-group

クライアントを作成する

Python 用 Azure Storage Queues クライアント ライブラリを使用すると、ストレージ アカウント自体、キュー、メッセージの 3 種類のリソースを操作できます。 これらのリソースとの対話は、 クライアントのインスタンスから始まります。 クライアント オブジェクトを作成するには、ストレージ アカウントのキュー サービス エンドポイント URL と、ストレージ アカウントにアクセスできる資格情報が必要です。

from azure.storage.queue import QueueServiceClient

service = QueueServiceClient(account_url="https://<my-storage-account-name>.queue.core.windows.net/", credential=credential)

アカウントの URL を検索する

ストレージ アカウントのキュー サービス URL は、Azure PortalAzure PowerShell、または Azure CLI を使用して確認できます。

# Get the queue service URL for the storage account
az storage account show -n my-storage-account-name -g my-resource-group --query "primaryEndpoints.queue"

資格情報の種類

パラメーターは credential 、使用する 承認 の種類に応じて、さまざまな形式で指定できます。

  1. Shared Access Signature (SAS) トークンを使用するには、トークンを文字列として指定します。 アカウント URL に SAS トークンが含まれている場合は、credential パラメーターを省略します。 Azure Portal の [Shared Access Signature]\(共有アクセス署名\) で SAS トークンを生成するか、いずれかの関数を generate_sas() 使用してストレージ アカウントまたはキューの sas トークンを作成できます。

    from datetime import datetime, timedelta
    from azure.storage.queue import QueueServiceClient, generate_account_sas, ResourceTypes, AccountSasPermissions
    
    sas_token = generate_account_sas(
        account_name="<storage-account-name>",
        account_key="<account-access-key>",
        resource_types=ResourceTypes(service=True),
        permission=AccountSasPermissions(read=True),
        start=datetime.utcnow(),
        expiry=datetime.utcnow() + timedelta(hours=1)
    )
    
    queue_service_client = QueueServiceClient(account_url="https://<my_account_name>.queue.core.windows.net", credential=sas_token)
    
  2. ストレージ アカウント 共有キー (アカウント キーまたはアクセス キー) を使用するには、キーを文字列として指定します。 これは、Azure Portal の [アクセス キー] セクションで、または次の Azure CLI コマンドを実行して確認できます。

    az storage account keys list -g MyResourceGroup -n MyStorageAccount

    資格情報パラメーターとしてキーを使用して、クライアントを認証します。

    from azure.storage.queue import QueueServiceClient
    service = QueueServiceClient(account_url="https://<my_account_name>.queue.core.windows.net", credential="<account_access_key>")
    
  3. Azure Active Directory (AAD) トークン資格情報を使用するには、azure-identity ライブラリから取得した目的の資格情報の種類のインスタンスを指定します。 たとえば、 DefaultAzureCredential を使用してクライアントを認証できます。

    これには、いくつかの初期セットアップが必要です。

    返されたトークン資格情報を使用して、クライアントを認証します。

        from azure.identity import DefaultAzureCredential
        from azure.storage.queue import QueueServiceClient
        token_credential = DefaultAzureCredential()
    
        queue_service_client = QueueServiceClient(
            account_url="https://<my_account_name>.queue.core.windows.net",
            credential=token_credential
        )
    

接続文字列からクライアントを作成する

ユース ケースと承認方法によっては、アカウントの URL と資格情報を個別に指定するのではなく、ストレージ 接続文字列を使用してクライアント インスタンスを初期化することをお勧めします。 これを行うには、ストレージ 接続文字列をクライアントのfrom_connection_stringクラス メソッドに渡します。

from azure.storage.queue import QueueServiceClient

connection_string = "DefaultEndpointsProtocol=https;AccountName=xxxx;AccountKey=xxxx;EndpointSuffix=core.windows.net"
service = QueueServiceClient.from_connection_string(conn_str=connection_string)

ストレージ アカウントへの接続文字列は、Azure Portal の [アクセス キー] セクションで、または次の CLI コマンドを実行することによって確認できます。

az storage account show-connection-string -g MyResourceGroup -n MyStorageAccount

主要な概念

Azure Queue Service を構成するコンポーネントは次のとおりです。

  • ストレージ アカウント自体
  • メッセージのセットを含むストレージ アカウント内のキュー
  • キュー内の最大 64 KiB の任意の形式のメッセージ

Python 用 Azure Storage Queues クライアント ライブラリを使用すると、専用のクライアント オブジェクトを使用して、これらの各コンポーネントと対話できます。

非同期クライアント

このライブラリには、Python 3.5 以降でサポートされている完全な非同期 API が含まれています。 これを使用するには、まず、 aiohttp などの非同期トランスポートをインストールする必要があります。 詳細については、 azure-core のドキュメント を参照してください。

非同期クライアントと資格情報は、不要になったら閉じる必要があります。 これらのオブジェクトは非同期コンテキスト マネージャーであり、非同期 close メソッドを定義します。

クライアント

Queue Service のさまざまなコンポーネントと対話するために、次の 2 つの異なるクライアントが用意されています。

  1. QueueServiceClient - このクライアントは Azure ストレージ アカウント自体との対話を表し、構成済みのクライアント インスタンスを取得して、内のキューにアクセスできるようにします。 アカウントのプロパティを取得および構成する操作と、アカウント内のキューの一覧表示、作成、削除を行う操作が提供されます。 特定のキューに対して操作を実行するには、 メソッドを使用してクライアントを get_queue_client 取得します。
  2. QueueClient - このクライアントは、特定のキューとの対話を表します (まだ存在する必要はありません)。 キューを作成、削除、または構成する操作が提供され、キュー内のメッセージの送受信、ピーク、削除、更新を行う操作が含まれます。

メッセージ

  • 送信 - メッセージをキューに追加し、必要に応じてメッセージの可視性タイムアウトを設定します。
  • 受信 - キューからメッセージを取得し、他のコンシューマーに対して非表示にします。
  • プレビュー - メッセージの可視性を変更せずに、キューの前面からメッセージを取得します。
  • Update - メッセージやメッセージの内容の可視性タイムアウトを更新します。
  • Delete - 指定したメッセージをキューから削除します。
  • [クリア] - キューからのすべてのメッセージをクリアします。

次のセクションでは、次のような最も一般的なストレージ キュー タスクの一部をカバーするいくつかのコード スニペットを示します。

キューの作成

ストレージ アカウントにキューを作成する

from azure.storage.queue import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
queue.create_queue()

非同期クライアントを使用してキューを作成する

from azure.storage.queue.aio import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
await queue.create_queue()

メッセージを送信する

キューにメッセージを送信する

from azure.storage.queue import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
queue.send_message("I'm using queues!")
queue.send_message("This is my second message")

メッセージを非同期に送信する

import asyncio
from azure.storage.queue.aio import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
await asyncio.gather(
    queue.send_message("I'm using queues!"),
    queue.send_message("This is my second message")
)

メッセージの受信

キューからのメッセージの受信と処理

from azure.storage.queue import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
response = queue.receive_messages()

for message in response:
    print(message.content)
    queue.delete_message(message)

# Printed messages from the front of the queue:
# >> I'm using queues!
# >> This is my second message

メッセージをバッチで受信して処理する

from azure.storage.queue import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
response = queue.receive_messages(messages_per_page=10)

for message_batch in response.by_page():
    for message in message_batch:
        print(message.content)
        queue.delete_message(message)

メッセージを非同期的に受信および処理する

from azure.storage.queue.aio import QueueClient

queue = QueueClient.from_connection_string(conn_str="<connection_string>", queue_name="myqueue")
response = queue.receive_messages()

async for message in response:
    print(message.content)
    await queue.delete_message(message)

オプションの構成

クライアントおよび操作ごとのレベルで渡すことができる省略可能なキーワード (keyword)引数。

再試行ポリシーの構成

再試行ポリシーを構成するには、クライアントをインスタンス化するときに、次のキーワード (keyword)引数を使用します。

  • retry_total (int): 許可する再試行の合計数。 他のカウントよりも優先されます。 要求時に retry_total=0 再試行しない場合は、 を渡します。 既定値は 10 です。
  • retry_connect (int): 再試行する接続関連エラーの数。 既定値は 3 です。
  • retry_read (int): 読み取りエラーで再試行する回数。 既定値は 3 です。
  • retry_status (int): 無効な状態コードで再試行する回数。 既定値は 3 です。
  • retry_to_secondary (bool): 要求をセカンダリに再試行する必要があるかどうか (可能な場合)。 これは、RA-GRS アカウントのみが使用され、古いデータを処理できる可能性がある場合にのみ有効にする必要があります。 既定値は False です。

その他のクライアント/操作ごとの構成

その他の省略可能な構成キーワード (keyword)クライアントまたは操作ごとに指定できる引数です。

クライアント キーワード (keyword)引数:

  • connection_timeout (int): クライアントがサーバーへの接続の確立を待機する秒数。 既定値は 20 秒です。
  • read_timeout (int): サーバーからの応答について、クライアントが連続する読み取り操作の間に待機する秒数。 これはソケット レベルのタイムアウトであり、全体的なデータ サイズの影響を受けません。 クライアント側の読み取りタイムアウトは自動的に再試行されます。 既定値は 60 秒です。
  • transport (Any): HTTP 要求を送信するためのユーザー指定のトランスポート。

操作ごとのキーワード (keyword)引数:

  • raw_response_hook (呼び出し可能): 指定されたコールバックは、サービスから返される応答を使用します。
  • raw_request_hook (呼び出し可能): 指定されたコールバックは、サービスに送信される前に要求を使用します。
  • client_request_id (str): 要求の省略可能なユーザー指定 ID。
  • user_agent (str): 要求と共に送信するユーザー エージェント ヘッダーにカスタム値を追加します。
  • logging_enable (bool): DEBUG レベルでログ記録を有効にします。 既定値は False です。 クライアント レベルで渡して、すべての要求に対して有効にすることもできます。
  • logging_body (bool): 要求本文と応答本文のログ記録を有効にします。 既定値は False です。 クライアント レベルで渡して、すべての要求に対して有効にすることもできます。
  • headers (dict): カスタム ヘッダーをキー、値のペアとして渡します。 例: headers={'CustomValue': value}

トラブルシューティング

全般

ストレージ キュー クライアントは 、Azure Core で定義されている例外を発生させます。

このリストは、スローされた例外をキャッチするための参照に使用できます。 例外の特定のエラー コードを取得するには、 属性 ( error_code つまり) exception.error_codeを使用します。

ログの記録

このライブラリでは、ログ記録に標準 のログ ライブラリが使用されます。 HTTP セッションに関する基本情報 (URL、ヘッダーなど) は INFO レベルでログに記録されます。

要求/応答本文、未変換ヘッダーなど、詳細な DEBUG レベルのログは、 引数を使用してクライアントで logging_enable 有効にすることができます。

import sys
import logging
from azure.storage.queue import QueueServiceClient

# Create a logger for the 'azure.storage.queue' SDK
logger = logging.getLogger('azure.storage.queue')
logger.setLevel(logging.DEBUG)

# Configure a console output
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

# This client will log detailed information about its HTTP sessions, at DEBUG level
service_client = QueueServiceClient.from_connection_string("your_connection_string", logging_enable=True)

同様に、logging_enable は、詳細なログ記録がクライアントで有効になっていない場合でも、1 回の操作のために有効にすることができます。

service_client.get_service_stats(logging_enable=True)

次のステップ

その他のサンプル コード

Queue サンプルの使用を開始します。

SDK の GitHub リポジトリでは、いくつかの Storage Queues Python SDK サンプルを使用できます。 これらのサンプルでは、ストレージ キューの操作中に一般的に発生するその他のシナリオのコード例を示します。

  • queue_samples_hello_world.py (非同期バージョン) - この記事の例:

    • クライアントの作成
    • キューを作成する
    • メッセージを送信する
    • メッセージを受信する
  • queue_samples_authentication.py (非同期バージョン) - クライアントの認証と作成の例:

    • 接続文字列から
    • 共有アクセス キーから
    • 共有アクセス署名トークンから
    • Azure Active Directory から
  • queue_samples_service.py (非同期バージョン) - キュー サービスとの対話の例:

    • サービス プロパティを取得および設定する
    • ストレージ アカウント内のキューを一覧表示する
    • サービスからキューを作成および削除する
    • QueueClient を取得する
  • queue_samples_message.py (非同期バージョン) - キューとメッセージを操作する例:

    • アクセス ポリシーを設定する
    • キュー メタデータを取得して設定する
    • メッセージを送受信する
    • 指定したメッセージを削除し、すべてのメッセージをクリアする
    • メッセージのプレビューと更新

その他のドキュメント

Azure Queue Storage の詳細なドキュメントについては、docs.microsoft.com に関する Azure Queue Storage のドキュメントを参照してください

共同作成

このプロジェクトでは、共同作成と提案を歓迎しています。 ほとんどの共同作成では、共同作成者使用許諾契約書 (CLA) にご同意いただき、ご自身の共同作成内容を使用する権利を Microsoft に供与する権利をお持ちであり、かつ実際に供与することを宣言していただく必要があります。 詳細については、 https://cla.microsoft.com を参照してください。

pull request を送信すると、CLA を提供して PR (ラベル、コメントなど) を適宜装飾する必要があるかどうかを CLA ボットが自動的に決定します。 ボットによって提供される手順にそのまま従ってください。 この操作は、Microsoft の CLA を使用するすべてのリポジトリについて、1 回だけ行う必要があります。

このプロジェクトでは、Microsoft オープン ソースの倫理規定を採用しています。 詳しくは、「Code of Conduct FAQ (倫理規定についてよくある質問)」を参照するか、opencode@microsoft.com 宛てに質問またはコメントをお送りください。