Usar os Hubs de Eventos do Azure como uma fonte de dados das Tabelas Dinâmicas Delta

Este artigo explica como usar as Tabelas Dinâmicas Delta para processar mensagens dos Hubs de Eventos do Azure. Não é possível usar o conector dos Hubs de Eventos de Fluxo Estruturado porque essa biblioteca não está disponível como parte do Databricks Runtime, além disso, as Tabelas Dinâmicas Delta não permitem que você use bibliotecas JVM de terceiros.

Como as Tabelas Dinâmicas Delta se conectam aos Hubs de Eventos do Azure?

Os Hubs de Eventos do Azure fornecem um ponto de extremidade compatível com o Apache Kafka que você pode usar com o Conector Kafka de streaming estruturado, disponível no Databricks Runtime, para processar mensagens dos Hubs de Eventos do Azure. Para obter mais informações sobre os Hubs de Eventos do Azure e a compatibilidade do Apache Kafka, consulte Usar Hubs de Eventos do Azure de aplicativos do Apache Kafka.

As etapas a seguir descrevem como conectar um pipeline do Delta Live Tables a uma instância existente dos Hubs de Eventos e consumir eventos de um tópico. Para concluir estas etapas, você precisa dos seguintes valores de conexão dos Hubs de Eventos:

  • O nome do namespace dos Hubs de Eventos.
  • O nome da instância do Hub de Eventos no namespace dos Hubs de Eventos.
  • Um nome da política de acesso e chave da política compartilhados para os Hubs de Eventos. Por padrão, uma política RootManageSharedAccessKey é criada para cada namespace dos Hubs de Eventos. Essa política tem permissões manage, send e listen. Se o pipeline fizer leitura apenas dos Hubs de Eventos, o Databricks recomenda a criação de uma nova política com permissão de escuta somente.

Para obter mais informações sobre a cadeia de conexão dos Hubs de Eventos, consulte Obter uma cadeia de conexão dos Hubs de Eventos.

Observação

  • Os Hubs de Eventos do Azure fornecem opções OAuth 2.0 e SAS (assinatura de acesso compartilhado) para autorizar o acesso aos seus recursos seguros. Essas instruções usam a autenticação baseada em SAS.
  • Se você receber a cadeia de conexão dos Hubs de Eventos do portal do Azure, ela poderá não conter o valor EntityPath. O valor EntityPath é necessário somente ao usar o conector dos Hubs de Eventos de Streaming Estruturado. Usar o Conector Kafka de streaming estruturado exige fornecer apenas o nome do tópico.

Armazenar a chave de política em um segredo do Azure Databricks

Como a chave de política é uma informação confidencial, o Databricks recomenda não codificar o valor em seu código de pipeline. Em vez disso, use os segredos do Azure Databricks para armazenar e gerenciar o acesso à chave.

O exemplo a seguir usa a CLI do Databricks para criar um escopo secreto e armazenar a chave nele. No código do pipeline, use a função dbutils.secrets.get() com o scope-name e shared-policy-name para recuperar o valor da chave.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Para obter mais informações sobre segredos do Azure Databricks, consulte Gerenciamento de segredos.

Criar um notebook e adicionar o código do pipeline para consumir eventos

O exemplo a seguir faz a leitura de eventos IoT de um tópico, mas você pode adaptar o exemplo para os requisitos do aplicativo. Como melhor prática, o Databricks recomenda usar as configurações de pipeline do Delta Live Tables para configurar variáveis de aplicativo. Em seguida, o código do pipeline usa a função spark.conf.get() para recuperar valores. Para obter mais informações sobre como usar configurações de pipeline para parametrizar seu pipeline, consulte Usar parâmetros com pipelines do Delta Live Tables.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Criar o pipeline

Crie um pipeline com as configurações a seguir, substituindo os valores do espaço reservado por valores apropriados para seu ambiente.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Substitua

  • <container-name> pelo nome de um contêiner de conta de armazenamento do Azure.
  • <storage-account-name> pelo nome de uma conta de armazenamento do ADLS Gen2.
  • <eh-namespace> com o nome do namespace dos Hubs de Eventos.
  • <eh-policy-name> pela chave de escopo do segredo para a chave de política dos Hubs de Eventos.
  • <eventhub> pelo nome da sua instância dos Hubs de Eventos.
  • <secret-scope-name> pelo nome do escopo do segredo do Azure Databricks que contém a chave de política dos Hubs de Eventos.

Como melhor prática, esse pipeline não usa o caminho de armazenamento DBFS padrão, mas usa uma conta de armazenamento do ADLS Gen2 (Azure Data Lake Storage Gen2). Para obter mais informações sobre como configurar a autenticação para uma conta de armazenamento do ADLS Gen2, consulte Acessar com segurança credenciais com segredos em um pipeline.