O que é o modo de notificação de arquivo do Carregador Automático?

No modo de notificação de arquivo, o Carregador Automático configura automaticamente um serviço de notificação e um serviço de fila que assina eventos de arquivo do diretório de entrada. Você pode usar as notificações de arquivo para escalar o Carregador Automático para ingerir milhões de arquivos por hora. Quando comparado ao modo de listagem de diretório, o modo de notificação de arquivos é mais eficaz e escalonável para grandes diretórios de entrada ou alto volume de arquivos, mas requer permissões adicionais de nuvem.

Alterne entre as notificações de arquivo e a listagem de diretório a qualquer momento, mantendo as garantias de processamento de dados de apenas uma vez.

Observação

O modo de notificação de arquivo não tem suporte em contas de armazenamento Premium do Azure porque essas contas não dão suporte ao armazenamento de filas.

Aviso

Não há suporte para a alteração do caminho de origem para o Carregador Automático no modo de notificação de arquivo. Se o modo de notificação de arquivo for usado e o caminho for alterado, você poderá falhar ao ingerir arquivos que já estão presentes no novo diretório no momento da atualização do diretório.

Só há suporte ao modo de notificação de arquivo em computação de usuário único.

Recursos de nuvem usados no modo de notificação de arquivo do Carregador Automático

Importante

Você precisa de permissões elevadas para configurar automaticamente a infraestrutura de nuvem para o modo de notificação de arquivo. Entre em contato com o administrador de nuvem ou o administrador de workspace. Ver:

O Carregador automático pode configurar notificações de arquivos automaticamente quando se define a opção cloudFiles.useNotifications como true e fornece as permissões para criar os recursos de nuvem. Além disso, talvez seja necessário fornecer as opções adicionais para conceder autorização ao Carregador Automático para criar esses recursos.

A tabela a seguir resume quais recursos são criados pelo Carregador automático.

Armazenamento em nuvem Serviço da assinatura Serviço Fila Prefixo * Limite **
AWS S3 AWS SNS AWS SQS ingestão automática do Databricks 100 por bucket S3
ADLS Gen2 Grade de Eventos do Azure Armazenamento de Filas do Azure databricks 500 por conta de armazenamento
GCS Google Pub/Sub Google Pub/Sub ingestão automática do Databricks 100 por bucket GCS
Armazenamento do Blobs do Azure Grade de Eventos do Azure Armazenamento de Filas do Azure databricks 500 por conta de armazenamento
  • O Carregador Automático nomeia os recursos com esse prefixo.

** Quantos pipelines simultâneos de notificação de arquivo podem ser inicializados

Se você precisar executar mais do que o número limitado de pipelines de notificação de arquivos para uma determinada conta de armazenamento, siga as orientações:

  • Aproveite um serviço como a AWS Lambda, o Azure Functions ou o Google Cloud Functions para dispersar as notificações de uma única fila que escuta um contêiner inteiro ou um bucket nas filas específicas do diretório.

Eventos de notificação de arquivos

A AWS S3 fornece um evento ObjectCreated quando um arquivo é carregado em um bucket S3, independentemente de ele ter sido carregado por um upload de várias partes ou por put.

O ADLS Gen2 fornece notificações de eventos diferentes para arquivos que aparecem no contêiner do Gen2.

  • O Carregador automático escuta o evento FlushWithClose para processar um arquivo.
  • Os fluxos do Carregador Automático dão suporte à ação RenameFile para descobrir arquivos. As ações RenameFile exigem uma solicitação de API para o sistema de armazenamento a fim de obter o tamanho do arquivo renomeado.
  • Os fluxos do Carregador automático criados com Databricks Runtime 9.0 e versões posteriores suportam a ação RenameDirectory para descoberta de arquivos. As ações RenameDirectory exigem solicitações de API para o sistema de armazenamento a fim de listar o conteúdo do diretório renomeado.

O Google Cloud Armazenamento fornece um evento OBJECT_FINALIZE quando um arquivo é carregado, que inclui substituições e cópias de arquivo. Os uploads com falha não geram esse evento.

Observação

Os provedores de nuvem não garantem a entrega de 100% de todos os eventos de arquivo em condições muito raras e não fornecem SLAs rigorosos da latência dos eventos de arquivo. O Databricks recomenda disparar os provisionamentos regulares com o Carregador automático ao usar a opção cloudFiles.backfillInterval para garantir que todos os arquivos sejam descobertos em um determinado SLA, se a conclusão dos dados for um requisito. O disparo de provisionamentos regulares não causa duplicações.

Permissões necessárias para configurar a notificação de arquivo para o ADLS Gen2 e Armazenamento de Blobs do Azure

Você deve ter permissões de leitura para o diretório de entrada. Confira Armazenamento de Blobs do Azure.

Para usar o modo de notificação de arquivo, você deve fornecer credenciais de autenticação para configurar e acessar os serviços de notificação de eventos. Você só precisa de uma entidade de serviço para autenticação.

  • Entidade de serviço – usando funções internas do Azure

    Crie um aplicativo e uma entidade de serviço do Microsoft Entra ID (antigo Azure Active Directory) na forma de ID do cliente e segredo do cliente.

    Atribua a este aplicativo as seguintes funções à conta de armazenamento na qual o caminho de entrada reside:

    • Colaborador: esta função é usada para configurar os recursos na conta de armazenamento, como filas e assinaturas de evento.
    • Colaborador de dados da fila de armazenamento: essa função é usada para executar operações de fila, como recuperar e excluir mensagens das filas. Esta função é necessária somente quando você fornece uma entidade de serviço sem uma cadeia de conexão.

    Atribua a este aplicativo a seguinte função ao grupo de recursos relacionados:

    Para obter mais informações, confira Atribuir funções do Azure usando o portal do Azure.

  • Entidade de serviço – usando uma função personalizada

    Se você estiver preocupado com as permissões excessivas necessárias para as funções anteriores, crie uma Função Personalizada com, pelo menos, as seguintes permissões listadas abaixo no formato JSON da função do Azure:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Em seguida, você poderá atribuir essa função personalizada ao aplicativo.

    Para obter mais informações, confira Atribuir funções do Azure usando o portal do Azure.

Permissões do carregador automático

Solucionar erros comuns

Erro:

java.lang.RuntimeException: Failed to create event grid subscription.

Se você vir essa mensagem de erro ao executar o Carregador automático pela primeira vez, a Grade de Eventos não será registrada como um Provedor de recursos em sua assinatura do Azure. Para registrar isso no portal do Azure:

  1. Acesse sua assinatura.
  2. Clique em Provedores de recursos na seção Configurações.
  3. Registre o provedor Microsoft.EventGrid.

Erro:

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Se você vir essa mensagem de erro ao executar o Carregador Automático pela primeira vez, certifique-se de ter atribuído a função Colaborador à entidade de serviço na Grade de Eventos, bem como à conta de armazenamento.

Permissões necessárias para configurar a notificação de arquivo para o AWS S3

Você deve ter permissões de leitura para o diretório de entrada. Consulte os detalhes sobre a conexão S3 para obter mais informações.

Para usar o modo de notificação de arquivo, anexe o seguinte documento de política JSON ao usuário ou função do IAM.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

onde:

  • <bucket-name>: nome do bucket S3 em que o fluxo lerá arquivos, por exemplo, auto-logs. Você pode usar * como um curinga, por exemplo, databricks-*-logs. Para descobrir o bucket S3 subjacente para o caminho DBFS, liste todos os pontos de montagem do DBFS em um notebook executando %fs mounts.
  • <region>: região da AWS em que o bucket S3 reside, por exemplo, us-west-2. Se você não quiser especificar a região, use *.
  • <account-number>: número da conta da AWS que possui o bucket S3, por exemplo, 123456789012. Se não quiser especificar o número da conta, use *.

A cadeia de caracteres databricks-auto-ingest-* na especificação do ARN SQS e SNS é o prefixo do nome que a origem cloudFiles usa ao criar serviços do SQS e SNS. Como o Azure Databricks configura os serviços de notificação na execução inicial do fluxo, você pode usar uma política com permissões reduzidas após a execução inicial (por exemplo, parar o fluxo e reiniciá-lo).

Observação

A política anterior se preocupa apenas com as permissões necessárias para configurar os serviços de notificação de arquivo, ou seja, notificação de bucket S3, serviços do SNS e SQS, e presume-se que você já tenha acesso de leitura ao bucket S3. Se precisar adicionar permissões de somente leitura ao S3, adicione o seguinte à lista Action na instrução DatabricksAutoLoaderSetup no documento JSON:

  • s3:ListBucket
  • s3:GetObject

Permissões reduzidas após a configuração inicial

As permissões de configuração de recurso descritas acima são necessárias apenas durante a execução inicial do fluxo. Após a primeira execução, você pode alternar para a seguinte política de IAM com permissões reduzidas.

Importante

Com as permissões reduzidas, você não conseguirá iniciar novas consultas de fluxo nem recriar recursos em caso de falhas (por exemplo, fila SQS excluída acidentalmente) e não poderá usar a API de gerenciamento de recursos de nuvem para listar ou destruir os recursos.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Permissões necessárias para configurar a notificação de arquivo para o GCS

Você precisa ter as permissões list e get no bucket do GCS e em todos os objetos. Para obter detalhes, confira a documentação do Google sobre permissões do IAM.

Para usar o modo de notificação de arquivo, você precisará adicionar permissões da conta de serviço do GCS e da conta usada para acessar os recursos de Pub/Sub do Google Cloud.

Adicione a função Pub/Sub Publisher à conta de serviço do GCS. Isso permite que a conta publique mensagens de notificação de eventos dos buckets do GCS na Pub/Sub do Google Cloud.

Quanto à conta de serviço usada para os recursos de Pub/Sub do Google Cloud, você precisa adicionar as seguintes permissões:

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

Para isso, você pode criar uma função personalizada do IAM com essas permissões ou atribuir funções do GCP pré-existentes para cobrir essas permissões.

Localizar a conta de serviço do GCS

No console do Google Cloud do projeto correspondente, navegue até Cloud Storage > Settings. A seção chamada “Conta do serviço de armazenamento em nuvem” contém o email da conta de serviço do GCS.

Conta de serviço do GCS

Criar uma função do IAM do Google Cloud personalizada para o modo de notificação de arquivo

No console do Google Cloud do projeto correspondente, navegue até IAM & Admin > Roles. Crie uma função na parte superior ou atualize uma função existente. Na tela para criação ou edição da função, clique em Add Permissions. Um menu será exibido, no qual você poderá adicionar as permissões à função.

Funções Personalizadas de IAM do GCP

Configurar ou gerenciar manualmente os recursos de notificação de arquivo

Usuários privilegiados podem configurar ou gerenciar manualmente os recursos de notificação de arquivo.

  • Configure os serviços de notificação de arquivo manualmente por meio do provedor de nuvem e especifique manualmente o identificador de fila. Confira Opções de notificação de arquivos para obter mais detalhes.
  • Use as APIs do Scala para criar ou gerenciar as notificações e serviços de enfileiramento, conforme mostrado no exemplo a seguir:

Observação

Você deve ter permissões apropriadas para configurar ou modificar a infraestrutura de nuvem. Consulte a documentação de permissões para o Azure, S3 ou GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Use setUpNotificationServices(<resource-suffix>) para criar uma fila e uma assinatura com o nome <prefix>-<resource-suffix> (o prefixo depende do sistema de armazenamento resumido em Recursos de nuvem usados no modo de notificação de arquivo do Carregador Automático. Se houver um recurso com o mesmo nome, o Azure Databricks reutilizará o recurso existente em vez de criar um. Essa função retorna um identificador de fila que pode ser passado para a origem cloudFiles usando o identificador em Opções de notificação de arquivos. Isso permite que o usuário da origem cloudFiles tenha menos permissões do que o usuário que cria os recursos.

Forneça a opção "path" para newManager somente se estiver chamando setUpNotificationServices; ela não é necessária para listNotificationServices ou tearDownNotificationServices. Isso é o mesmo path que você usa ao executar uma consulta de fluxo.

A matriz a seguir indica quais métodos de API têm suporte em qual Databricks Runtime para cada tipo de armazenamento:

Armazenamento em nuvem Instalação de API API de lista Desmontar a API
AWS S3 Todas as versões Todas as versões Todas as versões
ADLS Gen2 Todas as versões Todas as versões Todas as versões
GCS Databricks Runtime 9.1 e superiores Databricks Runtime 9.1 e superiores Databricks Runtime 9.1 e superiores
Armazenamento do Blobs do Azure Todas as versões Todas as versões Todas as versões
ADLS Gen1 Sem suporte Sem suporte Sem suporte