Потоковые операции чтения и записи данных Avro
Apache Avro — это часто используемая система сериализации данных для их потоковой передачи. Типичным решением является размещение данных в формате Avro в Apache Kafka, а метаданных в реестре схемы Confluent и выполнение запросов с помощью платформы потоковой передачи, подключенной к Kafka и реестру схемы.
Azure Databricks поддерживает from_avro
и to_avro
функции для создания конвейеров потоковой передачи с данными Avro в Kafka и метаданных в реестре схем. Функция to_avro
преобразует столбец в двоичный формат Avro, а from_avro
декодирует двоичные данные Avro в столбец. Обе функции преобразуют один столбец в другой, а входные и выходные данные SQL могут быть представлены как сложным, так и простым типом.
Примечание.
Функции from_avro
и to_avro
:
- Доступны в Python, Scala и Java.
- Могут передаваться в функции SQL как в пакетных, так и в потоковых запросах.
См. также Источники данных – файлы Avro.
Пример схемы вручную
Аналогично from_json и to_json, можно использовать from_avro
и to_avro
с любым двоичным столбцом. Схему Avro можно указать вручную, как показано в следующем примере:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Пример jsonFormatSchema
Схему также можно указать в виде строки JSON. Например, если /tmp/user.avsc
— это
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Вы можете создать строку JSON:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Затем используйте схему в from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Пример с реестром схемы
Если в кластере есть служба реестра схемы, from_avro
может работать с ней. В этом случае вам не нужно указывать схему Avro вручную.
В следующем примере показано чтение раздела Kafka "t", при условии, что ключ и значение уже зарегистрированы в реестре схем в качестве субъектов "t-key" и "t-value" типов STRING
и INT
:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr).as("value"))
Для to_avro
выходная схема Avro по умолчанию может не соответствовать схеме целевого субъекта в службе реестра схемы по следующим причинам:
- Тип Spark SQL и схема Avro не сопоставлялись"один к одному". См. раздел Поддерживаемые типы для Spark SQL —> преобразование Avro.
- Если преобразованная исходящая схема Avro представлена записью с именем
topLevelRecord
и при этом отсутствует пространство имен по умолчанию.
Если у to_avro
выходная схема по умолчанию соответствует схеме целевого субъекта, можно выполнить следующие действия:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
В противном случае необходимо указать схему целевого субъекта в функции to_avro
:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Проверка подлинности во внешнем реестре схем Confluent
В Databricks Runtime 12.2 LTS и более поздних версиях можно пройти проверку подлинности во внешнем реестре схем Confluent. В следующих примерах показано, как настроить параметры реестра схем для включения учетных данных проверки подлинности и ключей API.
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Использование файлов truststore и хранилища ключей в томах каталога Unity
В Databricks Runtime 14.3 LTS и более поздних версиях можно использовать файлы доверия и хранилища ключей в томах каталога Unity для проверки подлинности в реестре схем Confluent. Обновите конфигурацию в предыдущем примере с помощью следующего синтаксиса:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Использование режима эволюции схемы с from_avro
В Databricks Runtime 14.2 и более поздних версиях можно использовать режим эволюции схемы с from_avro
. Включение режима эволюции схемы приводит к возникновению UnknownFieldException
задания после обнаружения эволюции схемы. Databricks рекомендует настраивать задания с режимом эволюции схемы для автоматического перезапуска при сбое задачи. Сведения о структурированной потоковой передаче см. в разделе "Рекомендации по рабочей среде".
Эволюция схемы полезна, если вы ожидаете, что схема исходных данных будет развиваться с течением времени и принимает все поля из источника данных. Если запросы уже явно указывают поля для запроса в источнике данных, добавленные поля игнорируются независимо от эволюции схемы.
avroSchemaEvolutionMode
Используйте этот параметр, чтобы включить эволюцию схемы. В следующей таблице описаны параметры режима эволюции схемы:
Вариант | Поведение |
---|---|
none |
По умолчанию. Игнорирует эволюцию схемы и задание продолжается. |
restart |
UnknownFieldException Создает исключение при обнаружении эволюции схемы. Требуется перезапуск задания. |
Примечание.
Эту конфигурацию можно изменить между заданиями потоковой передачи и повторно использовать одну и ту же контрольную точку. Отключение эволюции схемы может привести к удаленным столбцам.
Настройка режима синтаксического анализа
Вы можете настроить режим синтаксического анализа, чтобы определить, требуется ли сбой или выводить пустые записи при отключении режима эволюции схемы, а схема развивается в обратном режиме совместимости. При использовании параметров по умолчанию происходит сбой при from_avro
наблюдении несовместимых изменений схемы.
mode
Используйте параметр для указания режима синтаксического анализа. В следующей таблице описывается параметр для режима синтаксического анализа:
Вариант | Поведение |
---|---|
FAILFAST |
По умолчанию. Ошибка синтаксического анализа создает SparkException исключение с помощью функции MALFORMED_AVRO_MESSAGE errorClass . |
PERMISSIVE |
Ошибка синтаксического анализа игнорируется, и создается пустая запись. |
Примечание.
При включенной эволюции схемы создается только исключение, FAILFAST
если запись повреждена.
Пример использования режима эволюции схемы и настройки режима синтаксического анализа
В следующем примере показано включение эволюции схемы и указание FAILFAST
режима синтаксического анализа с помощью реестра схем Confluent:
Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
$"key",
"t-key",
schemaRegistryAddr,
schemaRegistryOptions.asJava).as("key"))
Python
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)