Ler e gravar dados de streaming do Avro

O Apache Avro é um sistema de serialização de dados comumente usado no mundo de streaming. Uma solução típica é colocar os dados no formato Avro no Apache Kafka, nos metadados no registro de esquema confluentee executar consultas com uma estrutura de streaming que se conecta ao Kafka e ao registro de esquema.

O Azure Databricks dá suporte às from_avrofunções to_avro e para criar pipelines de streaming com dados Avro em Kafka e metadados no registro de esquema. A função to_avro codifica uma coluna como Binary no formato Avro e from_avro decodifica dados binários Avro em uma coluna. Ambas as funções transformam uma coluna em outra, e o tipo de dados de entrada/saída SQL pode ser um tipo complexo ou primitivo.

Observação

As funções from_avro e to_avro:

  • Estão disponíveis em Python, Scala e Java.
  • pode ser passado para funções de SQL em consultas em lote e de streaming.

Consulte também fonte de dados de arquivo Avro.

Exemplo de esquema especificado manualmente

Semelhante a from_json e to_json, você pode usar from_avro e to_avro com qualquer coluna binária. Você pode especificar o esquema Avro manualmente, como no exemplo a seguir:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

Exemplo de jsonFormatSchema

Você também pode especificar um esquema como uma cadeia de caracteres JSON. Por exemplo, se /tmp/user.avsc for:

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

Você pode criar uma cadeia de caracteres JSON:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Em seguida, use o esquema em from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Exemplo com registro de esquema

Se o cluster tiver um serviço de registro de esquema, from_avro o poderá trabalhar com ele para que você não precise especificar o esquema Avro manualmente.

O exemplo a seguir demonstra a leitura de um tópico do Kafka "t", supondo que a chave e o valor já estejam registrados no Registro de Esquema como tópicos "t-key" e "t-value" dos tipos STRING e INT:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

Para to_avro, o esquema de saída Avro padrão pode não corresponder ao esquema do assunto de destino no serviço de registro de esquema pelos seguintes motivos:

Se o esquema to_avro de saída padrão corresponder ao esquema do assunto de destino, você poderá fazer o seguinte:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Caso contrário, você deve fornecer o esquema do assunto de destino na função to_avro:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Autenticar em um Registro de Esquema Confluent externo

No Databricks Runtime 12.2 LTS e versões posteriores, você pode autenticar em um registro de esquema de configuração externo. Os exemplos a seguir demonstram como configurar as opções de registro de esquema para incluir credenciais de autenticação e chaves de API.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Use arquivos de truststore e keystore em volumes do Unity Catalog

No Databricks Runtime 14.3 LTS e superior, você pode usar arquivos de armazenamento confiável e de armazenamento de chaves em volumes do Catálogo do Unity para autenticar em um Registro de Esquema Confluente. Atualize a configuração no exemplo anterior usando a seguinte sintaxe:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

Usar o modo de evolução do esquema com from_avro

No Databricks Runtime 14.2 e superior, você pode usar o modo de evolução do esquema com from_avro. A habilitação do modo de evolução do esquema faz com que o trabalho lance um UnknownFieldException após detectar a evolução do esquema. O Databricks recomenda configurar trabalhos com o modo de evolução do esquema para reiniciar automaticamente em caso de falha da tarefa. Confira Considerações de produção para o Streaming estruturado.

A evolução do esquema será útil se você espera que o esquema dos dados de origem evolua com o tempo e ingira todos os campos da fonte de dados. Se suas consultas já especificam explicitamente quais campos consultar na fonte de dados, os campos adicionados serão ignorados independentemente da evolução do esquema.

Use a opção avroSchemaEvolutionMode para habilitar a evolução do esquema. A tabela a seguir descreve as opções para o modo de evolução do esquema:

Opção Comportamento
none Default. Ignora a evolução do esquema e o trabalho continua.
restart Gera um UnknownFieldException ao detectar a evolução do esquema. Requer uma reinicialização do trabalho.

Observação

Você pode alterar essa configuração entre trabalhos de fluxo e reutilizar o mesmo ponto de verificação. Desabilitar a evolução do esquema pode resultar em colunas descartadas.

Configurar o modo de análise

Você pode configurar o modo de análise para determinar se quer falhar ou emitir registros nulos quando o modo de evolução do esquema está desabilitado e o esquema evolui de maneira não compatível com versões anteriores. Com as configurações padrão, from_avro falha quando observa alterações de esquema incompatíveis.

Use a opção mode para especificar o modo de análise. A tabela a seguir descreve a opção para o modo de análise:

Opção Comportamento
FAILFAST Default. Um erro de análise lança um SparkException com um errorClass de MALFORMED_AVRO_MESSAGE.
PERMISSIVE Um erro de análise é ignorado e um registro nulo é emitido.

Observação

Com a evolução do esquema habilitada, FAILFAST só gerará exceções se um registro estiver corrompido.

Exemplo usando a evolução do esquema e o modo de análise de configuração

O exemplo a seguir demonstra como habilitar a evolução do esquema e especificar o modo de análise FAILFAST com um Registro de Esquema de Configuração:

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)