Läsa och skriva Avro-direktuppspelningsdata

Apache Avro är ett vanligt data serialiseringssystem i strömningsvärlden. En vanlig lösning är att placera data i Avro-format i Apache Kafka, metadata i Confluent Schema Registry och sedan köra frågor med ett direktuppspelningsramverk som ansluter till både Kafka och Schema Registry.

Azure Databricks stöder from_avro funktionerna och to_avro för att skapa strömmande pipelines med Avro-data i Kafka och metadata i Schema Registry. Funktionen to_avro kodar en kolumn som binär i Avro-format och from_avro avkodar Binära Avro-data till en kolumn. Båda funktionerna transformerar en kolumn till en annan kolumn, och SQL-datatypen för indata/utdata kan vara en komplex typ eller en primitiv typ.

Kommentar

Funktionerna from_avro och to_avro :

  • Finns i Python, Scala och Java.
  • Kan skickas till SQL-funktioner i både batch- och strömningsfrågor.

Se även Avro-fildatakälla.

Manuellt angivet schemaexempel

På samma sätt som from_json och to_json kan du använda from_avro och to_avro med valfri binär kolumn. Du kan ange Avro-schemat manuellt, som i följande exempel:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

jsonFormatSchema-exempel

Du kan också ange ett schema som en JSON-sträng. Om är till exempel /tmp/user.avsc :

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

Du kan skapa en JSON-sträng:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Använd sedan schemat i from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Exempel med Schema Registry

Om klustret har en Schema Registry-tjänst from_avro kan du arbeta med den så att du inte behöver ange Avro-schemat manuellt.

Följande exempel visar läsning av ett Kafka-ämne "t", förutsatt att nyckeln och värdet redan är registrerade i Schema Registry som ämnen "t-key" och "t-value" av typer STRING och INT:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

För to_avrokanske standardutdataschemat för Avro inte matchar schemat för målämnet i Schema Registry-tjänsten av följande skäl:

Om standardutdataschemat to_avro matchar schemat för målämnet kan du göra följande:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Annars måste du ange schemat för målämnet i to_avro funktionen:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Autentisera till ett externt Confluent-schemaregister

I Databricks Runtime 12.2 LTS och senare kan du autentisera till ett externt Confluent-schemaregister. Följande exempel visar hur du konfigurerar schemaregisteralternativen så att de innehåller autentiseringsuppgifter och API-nycklar.

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Använda truststore- och keystore-filer i Unity Catalog-volymer

I Databricks Runtime 14.3 LTS och senare kan du använda säkerhetsarkiv- och nyckelarkivfiler i Unity Catalog-volymer för att autentisera till ett Confluent-schemaregister. Uppdatera konfigurationen i föregående exempel med hjälp av följande syntax:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

Använda schemautvecklingsläge med from_avro

I Databricks Runtime 14.2 och senare kan du använda schemautvecklingsläget med from_avro. Om du aktiverar schemautvecklingsläge genereras ett UnknownFieldException jobb när schemautvecklingen har upptäckts. Databricks rekommenderar att du konfigurerar jobb med schemautvecklingsläge för att automatiskt starta om vid aktivitetsfel. Se Produktionsöverväganden för strukturerad direktuppspelning.

Schemautveckling är användbart om du förväntar dig att schemat för dina källdata ska utvecklas över tid och mata in alla fält från datakällan. Om dina frågor redan uttryckligen anger vilka fält som ska frågas i datakällan ignoreras tillagda fält oavsett schemautveckling.

Använd alternativet avroSchemaEvolutionMode för att aktivera schemautveckling. I följande tabell beskrivs alternativen för schemautvecklingsläge:

Alternativ Funktionssätt
none Standard. Ignorerar schemautvecklingen och jobbet fortsätter.
restart Genererar en UnknownFieldException vid identifiering av schemautveckling. Kräver en omstart av jobbet.

Kommentar

Du kan ändra den här konfigurationen mellan direktuppspelningsjobb och återanvända samma kontrollpunkt. Om du inaktiverar schemautvecklingen kan det resultera i borttagna kolumner.

Konfigurera parsningsläget

Du kan konfigurera parsningsläget för att avgöra om du vill misslyckas eller generera null-poster när schemautvecklingsläget är inaktiverat och schemat utvecklas på ett icke-bakåtkompatibelt sätt. Med standardinställningar from_avro misslyckas när det observerar inkompatibla schemaändringar.

Använd alternativet mode för att ange parsningsläge. I följande tabell beskrivs alternativet för parsningsläge:

Alternativ Funktionssätt
FAILFAST Standard. Ett parsningsfel genererar en SparkException med en errorClass av MALFORMED_AVRO_MESSAGE.
PERMISSIVE Ett parsningsfel ignoreras och en nullpost genereras.

Kommentar

När schemautvecklingen är aktiverad FAILFAST utlöser endast undantag om en post är skadad.

Exempel med schemautveckling och inställning av parsningsläge

I följande exempel visas hur du aktiverar schemautveckling och anger parsningsläge FAILFAST med ett Confluent-schemaregister:

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)