ストリーミング Avro データの読み取りと書き込み

Apache Avro は、ストリーミング環境で一般的に使用されるデータ シリアル化システムです。 一般的なソリューションは、Avro 形式のデータを Apache Kafka に、メタデータを Confluent スキーマ レジストリに配置し、Kafka とスキーマ レジストリの両方に接続できるストリーミング フレームワークを使用してクエリを実行することです。

Azure Databricks は、from_avroto_avro 関数をサポートしており、Kafka 内のデータとスキーマ レジストリ内のメタデータを使用してストリーミング パイプラインを構築できます。 to_avro 関数は列を Avro 形式のバイナリとしてエンコードし、from_avro は Avro のバイナリ データを列にデコードします。 どちらの関数も 1 つの列を別の列に変換するもので、入力と出力の SQL データ型は複合型またはプリミティブ型にすることができます。

Note

from_avro および to_avro 関数は:

  • Python、Scala、Java で使用できます。
  • バッチとストリーミングの両方のクエリで SQL 関数に渡すことができます。

Avro ファイルのデータ ソースに関するページも参照してください。

手動で指定したスキーマの例

from_jsonto_json と同様に、任意のバイナリ列で from_avroto_avro を使用できます。 Avro スキーマは、次の例のように手動で指定できます。

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

jsonFormatSchema の例

スキーマを JSON 文字列として指定することもできます。 たとえば、/tmp/user.avsc が次の値で、

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

次の JSON 文字列を作成できます。

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

その後、from_avro でそのスキーマを使用します。

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

スキーマ レジストリを使用した例

クラスターでスキーマ レジストリ サービスを使用している場合、from_avro はそれに対応できるため、Avro スキーマを手動で指定する必要がありません。

次の例では、Kafka トピック "t" を読み取る方法を示します。キーと値が、STRING 型のサブジェクト "t-key" および INT 型の "t-value" としてスキーマ レジストリに既に登録されていると仮定しています。

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

to_avro では、次の理由により、既定の出力 Avro スキーマがスキーマ レジストリ サービスのターゲット サブジェクトのスキーマと一致しない可能性があります。

  • Spark SQL 型から Avro スキーマへのマッピングは 1 対 1 ではありません。 「Spark SQL でサポートされる型 -> Avro 変換」を参照してください。
  • 変換された出力 Avro スキーマがレコード型である場合、レコード名は topLevelRecord であり、既定では名前空間は存在しません。

to_avro の既定の出力スキーマがターゲット サブジェクトのスキーマと一致する場合は、次のようにすることができます。

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

それ以外の場合は、to_avro 関数でターゲット サブジェクトのスキーマを指定する必要があります。

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

外部 Confluent スキーマ レジストリに対して認証する

Databricks Runtime 12.2 LTS 以降では、外部 Confluent スキーマ レジストリに対して認証できます。 認証資格情報と API キーを含むようにスキーマ レジストリ オプションを構成する方法を次の例に示します。

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options,
      jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Unity Catalog ボリュームでトラストストアおよびキーストア ファイルを使用する

Databricks Runtime 14.3 LTS 以降では、Unity Catalog ボリュームのトラストストアおよびキーストア ファイルを使用して、Confluent スキーマ レジストリに対する認証を行うことができます。 次の構文を使用して、前の例の構成を更新します。

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

from_avro でスキーマ進化モードを使用する

Databricks Runtime 14.2 以降では from_avro でスキーマ進化モードを使用できます。 スキーマ進化モードを有効にすると、スキーマの進化を検出した後にジョブで UnknownFieldException がスローされます。 Databricks では、タスクの失敗時に自動的に再起動するように、スキーマ進化モードでジョブを構成することをお勧めします。 「障害時にストリーミング クエリを再起動するように、構造化ストリーミング ジョブを構成する」を参照してください。

スキーマの進化は、お使いのソース データのスキーマが時間の経過とともに進化し、データ ソースからすべてのフィールドを取り込む場合に便利です。 クエリでデータ ソースでクエリを実行するフィールドが既に明示的に指定されている場合、スキーマの進化に関係なく、追加されたフィールドは無視されます。

スキーマの進化を有効にするには、avroSchemaEvolutionMode オプションを使用してください。 次の表では、スキーマ進化モードのオプションについて説明します。

オプション Behavior
none Default。 スキーマの進化を無視し、ジョブを続行します。
restart スキーマの進化を検出するときにUnknownFieldException をスローします。 ジョブの再起動が必要です。

Note

ストリーミング ジョブ間でこの構成を変更し、同じチェックポイントを再利用できます。 スキーマの進化を無効にすると、列が削除される場合があります。

解析モードを構成する

解析モードを構成して、スキーマ進化モードが無効であり、スキーマが下位互換性のない方法で進化したときに、失敗するか null レコードを出力するかを決定できます。 既定の設定では、 換性のないスキーマの変更が観察されると from_avro は失敗します。

解析モードを指定するには、mode オプションを使用します。 次の表では、解析モードのオプションについて説明します。

オプション Behavior
FAILFAST Default。 解析エラーは、MALFORMED_AVRO_MESSAGE という errorClass を持つ SparkException がスローされます。
PERMISSIVE 解析エラーは無視され、null レコードが出力されます。

Note

スキーマの進化を有効にすると、レコードが破損している場合にのみ FAILFAST により例外がスローされます。

スキーマの進化と解析モードの設定の使用例

次の例では、スキーマの進化を有効にし、Confluent スキーマ レジストリを使用して FAILFAST 解析モードを指定する方法を示します。

Scala

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))

Python

from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)