Läsa och skriva Avro-direktuppspelningsdata
Apache Avro är ett vanligt data serialiseringssystem i strömningsvärlden. En vanlig lösning är att placera data i Avro-format i Apache Kafka, metadata i Confluent Schema Registry och sedan köra frågor med ett direktuppspelningsramverk som ansluter till både Kafka och Schema Registry.
Azure Databricks stöder from_avro
funktionerna och to_avro
för att skapa strömmande pipelines med Avro-data i Kafka och metadata i Schema Registry. Funktionen to_avro
kodar en kolumn som binär i Avro-format och from_avro
avkodar Binära Avro-data till en kolumn. Båda funktionerna transformerar en kolumn till en annan kolumn, och SQL-datatypen för indata/utdata kan vara en komplex typ eller en primitiv typ.
Kommentar
Funktionerna from_avro
och to_avro
:
- Finns i Python, Scala och Java.
- Kan skickas till SQL-funktioner i både batch- och strömningsfrågor.
Manuellt angivet schemaexempel
På samma sätt som from_json och to_json kan du använda from_avro
och to_avro
med valfri binär kolumn. Du kan ange Avro-schemat manuellt, som i följande exempel:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
jsonFormatSchema-exempel
Du kan också ange ett schema som en JSON-sträng. Om är till exempel /tmp/user.avsc
:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Du kan skapa en JSON-sträng:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Använd sedan schemat i from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Exempel med Schema Registry
Om klustret har en Schema Registry-tjänst from_avro
kan du arbeta med den så att du inte behöver ange Avro-schemat manuellt.
Följande exempel visar läsning av ett Kafka-ämne "t", förutsatt att nyckeln och värdet redan är registrerade i Schema Registry som ämnen "t-key" och "t-value" av typer STRING
och INT
:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
För to_avro
kanske standardutdataschemat för Avro inte matchar schemat för målämnet i Schema Registry-tjänsten av följande skäl:
- Mappningen från Spark SQL-typ till Avro-schema är inte en-till-en. Se Typer som stöds för Spark SQL –> Avro-konvertering.
- Om avro-schemat för konverterade utdata är av posttyp är
topLevelRecord
postnamnet och det finns inget namnområde som standard.
Om standardutdataschemat to_avro
matchar schemat för målämnet kan du göra följande:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Annars måste du ange schemat för målämnet i to_avro
funktionen:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Autentisera till ett externt Confluent-schemaregister
I Databricks Runtime 12.2 LTS och senare kan du autentisera till ett externt Confluent-schemaregister. Följande exempel visar hur du konfigurerar schemaregisteralternativen så att de innehåller autentiseringsuppgifter och API-nycklar.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Använda truststore- och keystore-filer i Unity Catalog-volymer
I Databricks Runtime 14.3 LTS och senare kan du använda säkerhetsarkiv- och nyckelarkivfiler i Unity Catalog-volymer för att autentisera till ett Confluent-schemaregister. Uppdatera konfigurationen i föregående exempel med hjälp av följande syntax:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Använda schemautvecklingsläge med from_avro
I Databricks Runtime 14.2 och senare kan du använda schemautvecklingsläget med from_avro
. Om du aktiverar schemautvecklingsläge genereras ett UnknownFieldException
jobb när schemautvecklingen har upptäckts. Databricks rekommenderar att du konfigurerar jobb med schemautvecklingsläge för att automatiskt starta om vid aktivitetsfel. Se Produktionsöverväganden för strukturerad direktuppspelning.
Schemautveckling är användbart om du förväntar dig att schemat för dina källdata ska utvecklas över tid och mata in alla fält från datakällan. Om dina frågor redan uttryckligen anger vilka fält som ska frågas i datakällan ignoreras tillagda fält oavsett schemautveckling.
Använd alternativet avroSchemaEvolutionMode
för att aktivera schemautveckling. I följande tabell beskrivs alternativen för schemautvecklingsläge:
Alternativ | Funktionssätt |
---|---|
none |
Standard. Ignorerar schemautvecklingen och jobbet fortsätter. |
restart |
Genererar en UnknownFieldException vid identifiering av schemautveckling. Kräver en omstart av jobbet. |
Kommentar
Du kan ändra den här konfigurationen mellan direktuppspelningsjobb och återanvända samma kontrollpunkt. Om du inaktiverar schemautvecklingen kan det resultera i borttagna kolumner.
Konfigurera parsningsläget
Du kan konfigurera parsningsläget för att avgöra om du vill misslyckas eller generera null-poster när schemautvecklingsläget är inaktiverat och schemat utvecklas på ett icke-bakåtkompatibelt sätt. Med standardinställningar from_avro
misslyckas när det observerar inkompatibla schemaändringar.
Använd alternativet mode
för att ange parsningsläge. I följande tabell beskrivs alternativet för parsningsläge:
Alternativ | Funktionssätt |
---|---|
FAILFAST |
Standard. Ett parsningsfel genererar en SparkException med en errorClass av MALFORMED_AVRO_MESSAGE . |
PERMISSIVE |
Ett parsningsfel ignoreras och en nullpost genereras. |
Kommentar
När schemautvecklingen är aktiverad FAILFAST
utlöser endast undantag om en post är skadad.
Exempel med schemautveckling och inställning av parsningsläge
I följande exempel visas hur du aktiverar schemautveckling och anger parsningsläge FAILFAST
med ett Confluent-schemaregister:
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
$"key",
"t-key",
schemaRegistryAddr,
schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)