Arquivo Avro

O Apache Avro é um sistema de serialização de dados. O Avro fornece:

  • Estruturas de dados avançadas.
  • Um formato de dados binário, rápido e compacto.
  • Um arquivo de contêiner para armazenar dados persistentes.
  • Chamada de procedimento remoto (RPC).
  • Integração simples com linguagens dinâmicas. A geração de código não é necessária para ler ou gravar arquivos de dados nem para usar ou implementar protocolos de RPC. Geração de código como uma otimização opcional vale a pena implementar apenas para linguagens de tipo estático.

A fonte de dados Avro dá suporte a:

  • Conversão de esquema: conversão automática entre registros do Apache Spark SQL e do Avro.
  • Particionamento: leitura e gravação fácil de dados particionados sem nenhuma configuração extra.
  • Compactação: a compactação a ser usada ao gravar o Avro no disco. Os tipos com suporte são uncompressed, snappy e deflate. Também é possível especificar o nível de esvaziamento (deflate).
  • Nomes de registro: registre o nome e o namespace passando um mapa de parâmetros com recordName e recordNamespace.

Veja também Ler e gravar dados de streaming do Avro.

Configuração

Você pode alterar o comportamento de uma fonte de dados Avro usando vários parâmetros de configuração.

Para ignorar arquivos sem a extensão .avro durante a leitura, você pode definir o parâmetro avro.mapred.ignore.inputs.without.extension na configuração do Hadoop. O padrão é false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Para configurar a compactação durante a gravação, defina as seguintes propriedades do Spark:

  • Codec de compactação: spark.sql.avro.compression.codec. Os codecs com suporte são snappy e deflate. O codec padrão é snappy.
  • Se o codec de compactação for deflate, você poderá definir o nível de compactação com: spark.sql.avro.deflate.level. O nível padrão é -1.

Você pode definir essas propriedades na configuração do Spark do cluster ou em runtime usando spark.conf.set(). Por exemplo:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Para o Databricks Runtime 9.1 LTS e superiores, você pode alterar o comportamento de inferência do esquema padrão no Avro fornecendo a opção mergeSchema ao ler arquivos. Definir mergeSchema como true inferirá um esquema de um conjunto de arquivos Avro no diretório de destino e os mesclará em vez de inferir o esquema de leitura de um único arquivo.

Tipos com suporte para conversão de Spark SQL -> Avro

Esta biblioteca dá suporte à leitura de todos os tipos Avro. Ela usa o seguinte mapeamento de tipos Avro para tipos Spark SQL:

Tipos Avro Tipos Spark SQL
booleano BooleanType
INT IntegerType
long LongType
FLOAT FloatType
double DoubleType
bytes BinaryType
string StringType
registro StructType
enum StringType
matriz ArrayType
map MapType
corrigido BinaryType
union Confira Tipos de união.

Tipos de união

A fonte de dados Avro dá suporte à leitura de tipos union. O Avro considera os três tipos a seguir como tipos union:

  • O union(int, long) é mapeado para LongType.
  • O union(float, double) é mapeado para DoubleType.
  • O union(something, null), onde something é qualquer tipo Avro com suporte. Isso é mapeado para o mesmo tipo Spark SQL de something, com nullable definido como true.

Todos os outros tipos union são tipos complexos. Eles são mapeados para StructType onde os nomes de campo são member0, member1 e assim por diante, de acordo com os membros do union. Isso é consistente com o comportamento ao converter entre Avro e Parquet.

Tipos lógicos

A fonte de dados Avro dá suporte à leitura dos seguintes tipos lógicos Avro:

Tipos lógicos Avro Tipos Avro Tipos Spark SQL
date INT DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal corrigido DecimalType
decimal bytes DecimalType

Observação

A fonte de dados Avro ignora documentos, aliases e outras propriedades presentes no arquivo Avro.

Tipos com suporte para conversão entre Spark SQL -> Avro

Esta biblioteca dá suporte à gravação de todos os tipos Spark SQL em Avro. Para a maioria dos tipos, o mapeamento de tipos Spark para tipos Avro é simples (por exemplo IntegerType é convertido em int). A seguir está uma lista dos alguns casos especiais:

Tipos Spark SQL Tipos Avro Tipos lógicos Avro
ByteType INT
ShortType INT
BinaryType bytes
DecimalType corrigido decimal
TimestampType long timestamp-micros
DateType INT date

Você também pode especificar todo o esquema Avro de saída com a opção avroSchema, para que os tipos Spark SQL possam ser convertidos em outros tipos Avro. As conversões a seguir não são aplicadas por padrão e exigem esquema Avro especificado pelo usuário:

Tipos Spark SQL Tipos Avro Tipos lógicos Avro
ByteType corrigido
StringType enum
DecimalType bytes decimal
TimestampType long timestamp-millis

Exemplos

Esses exemplos usam o arquivo episodes.avro.

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Este exemplo demonstra um esquema Avro personalizado:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Este exemplo demonstra as opções de compactação Avro:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Este exemplo demonstra registros Avro particionados:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Este exemplo demonstra o nome do registro e o namespace:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

Para consultar dados Avro no SQL, registre o arquivo de dados como uma tabela ou exibição temporária:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Exemplo de notebook: ler e gravar arquivos Avro

O notebook a seguir demonstra como ler e gravar arquivos Avro.

Ler e gravar o notebook de arquivos Avro

Obter notebook