Carregue um blob de bloco com Python

Este artigo mostra como carregar um blob usando a biblioteca de cliente do Armazenamento do Azure para Python. Você pode carregar dados para um blob de bloco a partir de um caminho de arquivo, um fluxo, um objeto binário ou uma cadeia de caracteres de texto. Você também pode carregar blobs com tags de índice.

Para saber mais sobre como carregar blobs usando APIs assíncronas, consulte Carregar blobs de forma assíncrona.

Pré-requisitos

Configurar o ambiente

Se você não tiver um projeto existente, esta seção mostra como configurar um projeto para trabalhar com a biblioteca de cliente do Armazenamento de Blobs do Azure para Python. Para obter mais detalhes, consulte Introdução ao Armazenamento de Blobs do Azure e Python.

Para trabalhar com os exemplos de código neste artigo, siga estas etapas para configurar seu projeto.

Instalar pacotes

Instale os seguintes pacotes usando pip install:

pip install azure-storage-blob azure-identity

Adicionar instruções de importação

Adicione as seguintes instruções import:

import io
import os
import uuid
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient, ContainerClient, BlobBlock, BlobClient, StandardBlobTier

Autorização

O mecanismo de autorização deve ter as permissões necessárias para carregar um blob. Para autorização com o Microsoft Entra ID (recomendado), você precisa da função interna do RBAC do Azure RBAC Storage Blob Data Contributor ou superior. Para saber mais, consulte as diretrizes de autorização para Put Blob (REST API) e Put Block (REST API).

Criar um objeto cliente

Para conectar um aplicativo ao Armazenamento de Blob, crie uma instância de BlobServiceClient. O exemplo a seguir mostra como criar um objeto cliente usando DefaultAzureCredential para autorização:

# TODO: Replace <storage-account-name> with your actual storage account name
account_url = "https://<storage-account-name>.blob.core.windows.net"
credential = DefaultAzureCredential()

# Create the BlobServiceClient object
blob_service_client = BlobServiceClient(account_url, credential=credential)

Você também pode criar objetos de cliente para contêineres ou blobs específicos, diretamente ou a partir do BlobServiceClient objeto. Para saber mais sobre como criar e gerenciar objetos de cliente, consulte Criar e gerenciar objetos de cliente que interagem com recursos de dados.

Carregar dados para um blob de bloco

Para carregar um blob usando um fluxo ou um objeto binário, use o seguinte método:

Esse método cria um novo blob a partir de uma fonte de dados com fragmentação automática, o que significa que a fonte de dados pode ser dividida em partes menores e carregada. Para executar o upload, a biblioteca do cliente pode usar Put Blob ou uma série de chamadas Put Block seguidas por Put Block List. Esse comportamento depende do tamanho geral do objeto e como as opções de transferência de dados são definidas.

Carregar um blob de bloco a partir de um caminho de arquivo local

O exemplo a seguir carrega um arquivo para um blob de bloco usando um BlobClient objeto:

def upload_blob_file(self, blob_service_client: BlobServiceClient, container_name: str):
    container_client = blob_service_client.get_container_client(container=container_name)
    with open(file=os.path.join('filepath', 'filename'), mode="rb") as data:
        blob_client = container_client.upload_blob(name="sample-blob.txt", data=data, overwrite=True)

Carregar um blob de bloco a partir de um fluxo

O exemplo a seguir cria bytes aleatórios de dados e carrega um BytesIO objeto para um blob de bloco usando um BlobClient objeto:

def upload_blob_stream(self, blob_service_client: BlobServiceClient, container_name: str):
    blob_client = blob_service_client.get_blob_client(container=container_name, blob="sample-blob.txt")
    input_stream = io.BytesIO(os.urandom(15))
    blob_client.upload_blob(input_stream, blob_type="BlockBlob")

Carregar dados binários para um blob de bloco

O exemplo a seguir carrega dados binários para um blob de bloco usando um BlobClient objeto:

def upload_blob_data(self, blob_service_client: BlobServiceClient, container_name: str):
    blob_client = blob_service_client.get_blob_client(container=container_name, blob="sample-blob.txt")
    data = b"Sample data for blob"

    # Upload the blob data - default blob type is BlockBlob
    blob_client.upload_blob(data, blob_type="BlockBlob")

Carregar um blob de bloco com tags de índice

O exemplo a seguir carrega um blob de bloco com tags de índice:

def upload_blob_tags(self, blob_service_client: BlobServiceClient, container_name: str):
    container_client = blob_service_client.get_container_client(container=container_name)
    sample_tags = {"Content": "image", "Date": "2022-01-01"}
    with open(file=os.path.join('filepath', 'filename'), mode="rb") as data:
        blob_client = container_client.upload_blob(name="sample-blob.txt", data=data, tags=sample_tags)

Carregar um blob de bloco com opções de configuração

Você pode definir opções de configuração da biblioteca do cliente ao carregar um blob. Essas opções podem ser ajustadas para melhorar o desempenho, aumentar a confiabilidade e otimizar os custos. Os exemplos de código a seguir mostram como definir opções de configuração para um carregamento no nível do método e no nível do cliente ao instanciar BlobClient. Essas opções também podem ser configuradas para uma instância ContainerClient ou uma instância BlobServiceClient .

Especificar opções de transferência de dados para upload

Você pode definir opções de configuração ao instanciar um cliente para otimizar o desempenho para operações de transferência de dados. Você pode passar os seguintes argumentos de palavra-chave ao construir um objeto cliente em Python:

  • max_block_size - O tamanho máximo do bloco para carregar um blob de bloco em pedaços. O padrão é 4 MiB.
  • max_single_put_size - Se o tamanho do blob for menor ou igual a max_single_put_size, o blob é carregado com uma única Put Blob solicitação. Se o tamanho do blob for maior ou max_single_put_size desconhecido, o blob será carregado em partes usando Put Block e confirmado usando Put Block List. O padrão é 64 MiB.

Para obter mais informações sobre limites de tamanho de transferência para armazenamento de Blob, consulte Dimensionar destinos para armazenamento de Blob.

Para operações de upload, você também pode passar o max_concurrency argumento ao chamar upload_blob. Esse argumento define o número máximo de conexões paralelas a serem usadas quando o tamanho do blob exceder 64 MiB.

O exemplo de código a seguir mostra como especificar opções de transferência de dados ao criar um BlobClient objeto e como carregar dados usando esse objeto cliente. Os valores fornecidos neste exemplo não pretendem ser uma recomendação. Para ajustar corretamente esses valores, você precisa considerar as necessidades específicas do seu aplicativo.

def upload_blob_transfer_options(self, account_url: str, container_name: str, blob_name: str):
    # Create a BlobClient object with data transfer options for upload
    blob_client = BlobClient(
        account_url=account_url, 
        container_name=container_name, 
        blob_name=blob_name,
        credential=DefaultAzureCredential(),
        max_block_size=1024*1024*4, # 4 MiB
        max_single_put_size=1024*1024*8 # 8 MiB
    )
    
    with open(file=os.path.join(r'file_path', blob_name), mode="rb") as data:
        blob_client = blob_client.upload_blob(data=data, overwrite=True, max_concurrency=2)

Para saber mais sobre como ajustar as opções de transferência de dados, consulte Ajuste de desempenho para uploads e downloads com Python.

Definir a camada de acesso de um blob ao carregar

Você pode definir a camada de acesso de um blob no upload passando o argumento da standard_blob_tier palavra-chave para upload_blob. O Armazenamento do Azure oferece diferentes camadas de acesso para lhe permitir armazenar os seus dados de blob da maneira mais económica possível com base na utilização.

O exemplo de código a seguir mostra como definir a camada de acesso ao carregar um blob:

def upload_blob_access_tier(self, blob_service_client: BlobServiceClient, container_name: str, blob_name: str):
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)
    
    #Upload blob to the cool tier
    with open(file=os.path.join(r'file_path', blob_name), mode="rb") as data:
        blob_client = blob_client.upload_blob(data=data, overwrite=True, standard_blob_tier=StandardBlobTier.COOL)

A definição da camada de acesso só é permitida para blobs de bloco. Você pode definir a camada de acesso para um blob de bloco como Hot, Cool, Coldou Archive. Para definir a camada de acesso como Cold, você deve usar uma versão mínima da biblioteca de cliente 12.15.0.

Para saber mais sobre as camadas de acesso, consulte Visão geral das camadas de acesso.

Carregue um blob de bloco preparando blocos e confirmando

Você pode ter maior controle sobre como dividir os uploads em blocos preparando manualmente blocos individuais de dados. Quando todos os blocos que compõem um blob são preparados, você pode confirmá-los no Armazenamento de Blobs.

Use o seguinte método para criar um novo bloco a ser confirmado como parte de um blob:

Use o seguinte método para escrever um blob especificando a lista de IDs de bloco que compõem o blob:

O exemplo a seguir lê dados de um arquivo e prepara blocos a serem confirmados como parte de um blob:

def upload_blocks(self, blob_container_client: ContainerClient, local_file_path: str, block_size: int):
    file_name = os.path.basename(local_file_path)
    blob_client = blob_container_client.get_blob_client(file_name)

    with open(file=local_file_path, mode="rb") as file_stream:
        block_id_list = []

        while True:
            buffer = file_stream.read(block_size)
            if not buffer:
                break

            block_id = uuid.uuid4().hex
            block_id_list.append(BlobBlock(block_id=block_id))

            blob_client.stage_block(block_id=block_id, data=buffer, length=len(buffer))

        blob_client.commit_block_list(block_id_list)

Carregar blobs de forma assíncrona

A biblioteca de cliente do Armazenamento de Blobs do Azure para Python dá suporte ao carregamento de blobs de forma assíncrona. Para saber mais sobre os requisitos de configuração do projeto, consulte Programação assíncrona.

Siga estas etapas para carregar um blob usando APIs assíncronas:

  1. Adicione as seguintes instruções de importação:

    import asyncio
    
    from azure.identity.aio import DefaultAzureCredential
    from azure.storage.blob.aio import BlobServiceClient, BlobClient, ContainerClient
    
  2. Adicione código para executar o programa usando asyncio.runo . Essa função executa a co-rotina passada, main() em nosso exemplo, e gerencia o loop de asyncio eventos. As co-rotinas são declaradas com a sintaxe async/await. Neste exemplo, a main() co-rotina primeiro cria o nível BlobServiceClient superior usando e, em async withseguida, chama o método que carrega o blob. Observe que apenas o cliente de nível superior precisa usar async witho , pois outros clientes criados a partir dele compartilham o mesmo pool de conexões.

    async def main():
        sample = BlobSamples()
    
        # TODO: Replace <storage-account-name> with your actual storage account name
        account_url = "https://<storage-account-name>.blob.core.windows.net"
        credential = DefaultAzureCredential()
    
        async with BlobServiceClient(account_url, credential=credential) as blob_service_client:
            await sample.upload_blob_file(blob_service_client, "sample-container")
    
    if __name__ == '__main__':
        asyncio.run(main())
    
  3. Adicione código para carregar o blob. O exemplo a seguir carrega um blob de um caminho de arquivo local usando um ContainerClient objeto. O código é o mesmo que o exemplo síncrono, exceto que o método é declarado com a async palavra-chave e a await palavra-chave é usada ao chamar o upload_blob método.

    async def upload_blob_file(self, blob_service_client: BlobServiceClient, container_name: str):
        container_client = blob_service_client.get_container_client(container=container_name)
        with open(file=os.path.join('filepath', 'filename'), mode="rb") as data:
            blob_client = await container_client.upload_blob(name="sample-blob.txt", data=data, overwrite=True)
    

Com essa configuração básica em vigor, você pode implementar outros exemplos neste artigo como co-rotinas usando a sintaxe async/await.

Recursos

Para saber mais sobre como carregar blobs usando a biblioteca de cliente do Armazenamento de Blobs do Azure para Python, consulte os recursos a seguir.

Amostras de código

Operações da API REST

O SDK do Azure para Python contém bibliotecas que se baseiam na API REST do Azure, permitindo que você interaja com operações da API REST por meio de paradigmas Python familiares. Os métodos da biblioteca de cliente para carregar blobs usam as seguintes operações de API REST:

Recursos da biblioteca do cliente

Consulte também

  • Este artigo faz parte do guia do desenvolvedor do Blob Storage para Python. Para saber mais, consulte a lista completa de artigos do guia do desenvolvedor em Build your Python app.