Co je režim oznámení souboru automatického zavaděče?
V režimu oznámení souborů automaticky zavaděč nastaví službu oznámení a službu fronty, která se přihlásí k odběru událostí souborů ze vstupního adresáře. Oznámení o souborech můžete použít ke škálování automatického zavaděče na ingestování milionů souborů za hodinu. Ve srovnání s režimem výpisu adresářů je režim oznámení souboru výkonnější a škálovatelný pro velké vstupní adresáře nebo velký objem souborů, ale vyžaduje další cloudová oprávnění.
Mezi oznámeními o souborech a výpisem adresářů můžete kdykoli přepínat a přitom zachovat záruky zpracování dat přesně jednou.
Poznámka:
Režim oznámení souborů není pro účty Azure Premium Storage podporovaný, protože účty Premium nepodporují službu Queue Storage.
Upozorňující
Změna zdrojové cesty pro automatický zavaděč není podporována v režimu oznámení souboru. Pokud se použije režim oznámení souboru a cesta se změní, může se stát, že se nepodaří ingestovat soubory, které jsou již v novém adresáři v době aktualizace adresáře.
Režim oznámení o souboru se podporuje jenom na výpočetních prostředcích s jedním uživatelem.
Cloudové prostředky používané v režimu oznámení souborů automatického zavaděče
Důležité
K automatické konfiguraci cloudové infrastruktury pro režim oznámení souborů potřebujete zvýšená oprávnění. Obraťte se na správce cloudu nebo správce pracovního prostoru. Vidět:
Automatický zavaděč vám může automaticky nastavit oznámení o souborech, když nastavíte možnost cloudFiles.useNotifications
true
a poskytnete potřebná oprávnění k vytváření cloudových prostředků. Kromě toho možná budete muset poskytnout další možnosti , jak udělit autorizaci automatického zavaděče pro vytvoření těchto prostředků.
Následující tabulka shrnuje, které prostředky jsou vytvořeny automatickým zavaděčem.
Cloudové úložiště | Předplatná služba | Služba front | Předpona* | Limit** |
---|---|---|---|---|
AWS S3 | AWS SNS | AWS SQS | Automatické ingestování databricks | 100 na kbelík S3 |
ADLS Gen2 | Azure Event Grid | Azure Queue Storage | databricks | 500 na účet úložiště |
GCS | Google Pub/Sub | Google Pub/Sub | Automatické ingestování databricks | 100 na kbelík GCS |
Azure Blob Storage | Azure Event Grid | Azure Queue Storage | databricks | 500 na účet úložiště |
- Automaticky zavaděč pojmenuje prostředky s touto předponou.
** Kolik souběžných kanálů oznámení o souborech je možné spustit
Pokud potřebujete pro daný účet úložiště spustit více než omezený počet kanálů oznámení o souborech, můžete:
- Využijte službu, jako je AWS Lambda, Azure Functions nebo Google Cloud Functions, k rozdlouchejte oznámení z jedné fronty, která naslouchá celému kontejneru nebo kontejneru do front specifických pro adresář.
Události oznámení souboru
AWS S3 poskytuje ObjectCreated
událost při nahrání souboru do kontejneru S3 bez ohledu na to, jestli byl nahraný vložením nebo vícedílným nahráním.
ADLS Gen2 poskytuje různá oznámení událostí pro soubory, které se zobrazují v kontejneru Gen2.
- Automatické zavaděče naslouchá
FlushWithClose
události pro zpracování souboru. - Streamy automatického zavaděče
RenameFile
podporují akci zjišťování souborů.RenameFile
Akce vyžadují požadavek rozhraní API na systém úložiště, aby získaly velikost přejmenovaného souboru. - Automatické streamy zavaděče vytvořené pomocí Databricks Runtime 9.0 a po podpoře
RenameDirectory
akce zjišťování souborů.RenameDirectory
Akce vyžadují, aby požadavky rozhraní API na systém úložiště vypsaly obsah přejmenovaného adresáře.
Google Cloud Storage poskytuje OBJECT_FINALIZE
událost při nahrání souboru, který zahrnuje přepsání a kopie souborů. Neúspěšné nahrání negenerují tuto událost.
Poznámka:
Poskytovatelé cloudu nezaručují 100% doručení všech událostí souborů za velmi vzácných podmínek a neposkytují přísné smlouvy SLA na latenci událostí souborů. Databricks doporučuje aktivovat pravidelné zavaděče automatického zavaděče pomocí cloudFiles.backfillInterval
možnosti zaručit, že se všechny soubory v rámci dané smlouvy SLA zjistí, pokud je požadavek na dokončení dat. Aktivace pravidelných backfillů nezpůsobí duplicity.
Požadovaná oprávnění pro konfiguraci oznámení o souboru pro ADLS Gen2 a Azure Blob Storage
Pro vstupní adresář musíte mít oprávnění ke čtení. Viz Azure Blob Storage.
Pokud chcete použít režim oznámení souboru, musíte zadat přihlašovací údaje pro ověřování pro nastavení a přístup ke službám oznámení událostí. K ověřování potřebujete pouze instanční objekt.
Instanční objekt – použití předdefinovaných rolí Azure
Vytvořte aplikaci a instanční objekt Microsoft Entra ID (dříve Azure Active Directory) ve formě ID klienta a tajného klíče klienta.
Přiřaďte této aplikaci následující role k účtu úložiště, ve kterém se nachází vstupní cesta:
- Přispěvatel: Tato role slouží k nastavení prostředků ve vašem účtu úložiště, jako jsou fronty a odběry událostí.
- Přispěvatel dat fronty úložiště: Tato role slouží k provádění operací front, jako je načítání a odstraňování zpráv z front. Tato role se vyžaduje pouze v případě, že zadáte instanční objekt bez připojovací řetězec.
Přiřaďte této aplikaci následující roli ke související skupině prostředků:
- Přispěvatel EventGrid EventSubscription: Tato role slouží k provádění operací odběru event gridu, jako je vytváření nebo výpis odběrů událostí.
Další informace viz Přiřazení rolí Azure pomocí webu Azure Portal.
Instanční objekt – použití vlastní role
Pokud máte obavy o nadměrná oprávnění požadovaná pro předchozí role, můžete vytvořit vlastní roli s alespoň následujícími oprávněními, která jsou uvedená níže ve formátu JSON role Azure:
"permissions": [ { "actions": [ "Microsoft.EventGrid/eventSubscriptions/write", "Microsoft.EventGrid/eventSubscriptions/read", "Microsoft.EventGrid/eventSubscriptions/delete", "Microsoft.EventGrid/locations/eventSubscriptions/read", "Microsoft.Storage/storageAccounts/read", "Microsoft.Storage/storageAccounts/write", "Microsoft.Storage/storageAccounts/queueServices/read", "Microsoft.Storage/storageAccounts/queueServices/write", "Microsoft.Storage/storageAccounts/queueServices/queues/write", "Microsoft.Storage/storageAccounts/queueServices/queues/read", "Microsoft.Storage/storageAccounts/queueServices/queues/delete" ], "notActions": [], "dataActions": [ "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write", "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action" ], "notDataActions": [] } ]
Potom můžete této vlastní roli přiřadit k aplikaci.
Další informace viz Přiřazení rolí Azure pomocí webu Azure Portal.
Řešení běžných chyb
Chyba:
java.lang.RuntimeException: Failed to create event grid subscription.
Pokud se tato chybová zpráva zobrazí při prvním spuštění automatického zavaděče, služba Event Grid není ve vašem předplatném Azure zaregistrovaná jako poskytovatel prostředků. Postup registrace na webu Azure Portal:
- Přejděte k předplatnému.
- V části Nastavení klikněte na Poskytovatelé prostředků.
- Zaregistrujte poskytovatele
Microsoft.EventGrid
.
Chyba:
403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...
Pokud se tato chybová zpráva zobrazí při prvním spuštění automatického zavaděče, ujistěte se, že jste svému instančnímu objektu pro Event Grid i vašemu účtu úložiště udělili roli Přispěvatel.
Požadovaná oprávnění pro konfiguraci oznámení o souboru pro AWS S3
Pro vstupní adresář musíte mít oprávnění ke čtení. Další podrobnosti najdete v podrobnostech o připojení S3.
Pokud chcete použít režim oznámení souboru, připojte k uživateli nebo roli IAM následující dokument zásad JSON.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:s3:::<bucket-name>",
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
"arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
]
}
]
}
kde:
<bucket-name>
: Název kontejneru S3, kde stream bude číst soubory,auto-logs
například . Můžete použít*
napříkladdatabricks-*-logs
jako zástupný znak . Pokud chcete zjistit základní kontejner S3 pro cestu DBFS, můžete zobrazit seznam všech přípojných bodů DBFS v poznámkovém bloku spuštěním%fs mounts
.<region>
: Oblast AWS, ve které se nachází kontejner S3,us-west-2
například . Pokud nechcete zadávat oblast, použijte*
.<account-number>
: Číslo účtu AWS, které vlastní kbelík S3,123456789012
například . Pokud nechcete zadat číslo účtu, použijte*
.
Řetězec databricks-auto-ingest-*
ve specifikaci SQS a SNS ARN je předpona názvu, kterou cloudFiles
zdroj používá při vytváření služeb SQS a SNS. Vzhledem k tomu, že Azure Databricks nastaví služby oznámení v počátečním spuštění streamu, můžete po počátečním spuštění použít zásadu s omezenými oprávněními (například zastavit stream a poté ho restartovat).
Poznámka:
Předchozí zásada se zabývá pouze oprávněními potřebnými k nastavení služeb oznámení souborů, konkrétně služby S3 bucket notification, SNS a SQS a předpokládá, že už máte přístup pro čtení do kontejneru S3. Pokud potřebujete přidat oprávnění jen pro čtení S3, přidejte do seznamu v příkazu v DatabricksAutoLoaderSetup
dokumentu JSON následujícíAction
:
s3:ListBucket
s3:GetObject
Omezená oprávnění po počátečním nastavení
Výše popsaná oprávnění k nastavení prostředků se vyžadují pouze při počátečním spuštění datového proudu. Po prvním spuštění můžete přepnout na následující zásady IAM s omezenými oprávněními.
Důležité
S omezenými oprávněními nemůžete v případě selhání spustit nové dotazy streamování ani znovu vytvořit prostředky (například fronta SQS byla omylem odstraněna); Nemůžete také použít rozhraní API pro správu cloudových prostředků k výpisu nebo odstraňování prostředků.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderUse",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:TagResource",
"sns:Publish",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:sqs:<region>:<account-number>:<queue-name>",
"arn:aws:sns:<region>:<account-number>:<topic-name>",
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetBucketLocation",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<bucket-name>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucket-name>/*"
]
},
{
"Sid": "DatabricksAutoLoaderListTopics",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "arn:aws:sns:<region>:<account-number>:*"
}
]
}
Požadovaná oprávnění ke konfiguraci oznámení o souboru pro službu GCS
Ke kontejneru GCS a ke všem objektům musíte mít list
oprávnění get
. Podrobnosti najdete v dokumentaci Google k oprávněním IAM.
Pokud chcete použít režim oznámení souborů, musíte přidat oprávnění pro účet služby GCS a účet použitý pro přístup k prostředkům Google Cloud Pub/Sub.
Pub/Sub Publisher
Přidejte roli do účtu služby GCS. Účet tak může publikovat zprávy oznámení o událostech z kontejnerů GCS do Google Cloud Pub/Sub.
Pokud jde o účet služby používaný pro prostředky Google Cloud Pub/Sub, musíte přidat následující oprávnění:
pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update
K tomu můžete buď vytvořit vlastní roli IAM s těmito oprávněními, nebo přiřadit existující role GCP k pokrytí těchto oprávnění.
Vyhledání účtu služby GCS
V konzole Google Cloud Console pro odpovídající projekt přejděte na Cloud Storage > Settings
.
Část "Účet služby cloudového úložiště" obsahuje e-mail účtu služby GCS.
Vytvoření vlastní role IAM cloudu Google pro režim oznámení souborů
V konzole Google Cloud pro odpovídající projekt přejděte na IAM & Admin > Roles
. Pak buď vytvořte roli v horní části, nebo aktualizujte existující roli. Na obrazovce pro vytvoření nebo úpravu role klikněte na Add Permissions
. Zobrazí se nabídka, ve které můžete přidat požadovaná oprávnění k roli.
Ruční konfigurace nebo správa prostředků oznámení o souborech
Privilegovaní uživatelé můžou ručně konfigurovat nebo spravovat prostředky oznámení o souborech.
- Službu oznámení souborů nastavte ručně prostřednictvím poskytovatele cloudu a ručně zadejte identifikátor fronty. Další podrobnosti najdete v tématu Možnosti oznámení o souboru.
- Rozhraní SCALA API slouží k vytvoření nebo správě oznámení a služeb řazení do front, jak je znázorněno v následujícím příkladu:
Poznámka:
Ke konfiguraci nebo úpravě cloudové infrastruktury musíte mít příslušná oprávnění. Prohlédni si dokumentaci k oprávněním pro Azure, S3 nebo GCS.
Python
# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds
# COMMAND ----------
#####################################
## Creating a ResourceManager in AWS
#####################################
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
.newManager() \
.option("cloudFiles.region", <region>) \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in Azure
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
.newManager() \
.option("cloudFiles.connectionString", <connection-string>) \
.option("cloudFiles.resourceGroup", <resource-group>) \
.option("cloudFiles.subscriptionId", <subscription-id>) \
.option("cloudFiles.tenantId", <tenant-id>) \
.option("cloudFiles.clientId", <service-principal-client-id>) \
.option("cloudFiles.clientSecret", <service-principal-client-secret>) \
.option("path", <path-to-specific-container-and-folder>) \
.create()
#######################################
## Creating a ResourceManager in GCP
#######################################
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
.newManager() \
.option("path", <path-to-specific-bucket-and-folder>) \
.create()
# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)
# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Scala
/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////
import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
.newManager
.option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
.option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////
import com.databricks.sql.CloudFilesAzureResourceManager
val manager = CloudFilesAzureResourceManager
.newManager
.option("cloudFiles.connectionString", <connection-string>)
.option("cloudFiles.resourceGroup", <resource-group>)
.option("cloudFiles.subscriptionId", <subscription-id>)
.option("cloudFiles.tenantId", <tenant-id>)
.option("cloudFiles.clientId", <service-principal-client-id>)
.option("cloudFiles.clientSecret", <service-principal-client-secret>)
.option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
.create()
///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////
import com.databricks.sql.CloudFilesGCPResourceManager
val manager = CloudFilesGCPResourceManager
.newManager
.option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
.create()
// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)
// List notification services created by <AL>
val df = manager.listNotificationServices()
// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)
Slouží setUpNotificationServices(<resource-suffix>)
k vytvoření fronty a předplatného s názvem <prefix>-<resource-suffix>
(předpona závisí na systému úložiště shrnutého v cloudových prostředcích používaných v režimu oznámení souboru automatického zavaděče. Pokud existuje existující prostředek se stejným názvem, Azure Databricks znovu použije existující prostředek místo vytvoření nového prostředku. Tato funkce vrátí identifikátor fronty, který můžete předat cloudFiles
zdroji pomocí identifikátoru v možnostech oznámení souboru. To umožňuje zdrojovému cloudFiles
uživateli mít méně oprávnění než uživatel, který prostředky vytvoří.
Zadejte možnost newManager
pouze v případě, že volání setUpNotificationServices
není nutné nebo listNotificationServices
tearDownNotificationServices
."path"
To je stejné path
, jaké používáte při spouštění streamovacího dotazu.
Následující matice označuje, které metody rozhraní API jsou podporované v tom, ve kterých databricks Runtime pro každý typ úložiště:
Cloudové úložiště | Nastavení rozhraní API | Rozhraní API pro výpis | Roztrhání rozhraní API |
---|---|---|---|
AWS S3 | Všechny verze | Všechny verze | Všechny verze |
ADLS Gen2 | Všechny verze | Všechny verze | Všechny verze |
GCS | Databricks Runtime 9.1 a novější | Databricks Runtime 9.1 a novější | Databricks Runtime 9.1 a novější |
Azure Blob Storage | Všechny verze | Všechny verze | Všechny verze |
ADLS Gen1 | Nepodporované | Nepodporované | Nepodporované |