Arquivo Avro
O Apache Avro é um sistema de serialização de dados. O Avro fornece:
- Estruturas de dados avançadas.
- Um formato de dados binário, rápido e compacto.
- Um arquivo de contêiner para armazenar dados persistentes.
- Chamada de procedimento remoto (RPC).
- Integração simples com linguagens dinâmicas. A geração de código não é necessária para ler ou gravar arquivos de dados nem para usar ou implementar protocolos de RPC. Geração de código como uma otimização opcional vale a pena implementar apenas para linguagens de tipo estático.
A fonte de dados Avro dá suporte a:
- Conversão de esquema: conversão automática entre registros do Apache Spark SQL e do Avro.
- Particionamento: leitura e gravação fácil de dados particionados sem nenhuma configuração extra.
- Compactação: a compactação a ser usada ao gravar o Avro no disco. Os tipos com suporte são
uncompressed
,snappy
edeflate
. Também é possível especificar o nível de esvaziamento (deflate). - Nomes de registro: registre o nome e o namespace passando um mapa de parâmetros com
recordName
erecordNamespace
.
Veja também Ler e gravar dados de streaming do Avro.
Configuração
Você pode alterar o comportamento de uma fonte de dados Avro usando vários parâmetros de configuração.
Para ignorar arquivos sem a extensão .avro
durante a leitura, você pode definir o parâmetro avro.mapred.ignore.inputs.without.extension
na configuração do Hadoop. O padrão é false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Para configurar a compactação durante a gravação, defina as seguintes propriedades do Spark:
- Codec de compactação:
spark.sql.avro.compression.codec
. Os codecs com suporte sãosnappy
edeflate
. O codec padrão ésnappy
. - Se o codec de compactação for
deflate
, você poderá definir o nível de compactação com:spark.sql.avro.deflate.level
. O nível padrão é-1
.
Você pode definir essas propriedades na configuração do Spark do cluster ou em runtime usando spark.conf.set()
. Por exemplo:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Para o Databricks Runtime 9.1 LTS e superiores, você pode alterar o comportamento de inferência do esquema padrão no Avro fornecendo a opção mergeSchema
ao ler arquivos. Definir mergeSchema
como true
inferirá um esquema de um conjunto de arquivos Avro no diretório de destino e os mesclará em vez de inferir o esquema de leitura de um único arquivo.
Tipos com suporte para conversão de Spark SQL -> Avro
Esta biblioteca dá suporte à leitura de todos os tipos Avro. Ela usa o seguinte mapeamento de tipos Avro para tipos Spark SQL:
Tipos Avro | Tipos Spark SQL |
---|---|
booleano | BooleanType |
INT | IntegerType |
long | LongType |
FLOAT | FloatType |
double | DoubleType |
bytes | BinaryType |
string | StringType |
registro | StructType |
enum | StringType |
matriz | ArrayType |
map | MapType |
corrigido | BinaryType |
union | Confira Tipos de união. |
Tipos de união
A fonte de dados Avro dá suporte à leitura de tipos union
. O Avro considera os três tipos a seguir como tipos union
:
- O
union(int, long)
é mapeado paraLongType
. - O
union(float, double)
é mapeado paraDoubleType
. - O
union(something, null)
, ondesomething
é qualquer tipo Avro com suporte. Isso é mapeado para o mesmo tipo Spark SQL desomething
, comnullable
definido comotrue
.
Todos os outros tipos union
são tipos complexos. Eles são mapeados para StructType
onde os nomes de campo são member0
, member1
e assim por diante, de acordo com os membros do union
. Isso é consistente com o comportamento ao converter entre Avro e Parquet.
Tipos lógicos
A fonte de dados Avro dá suporte à leitura dos seguintes tipos lógicos Avro:
Tipos lógicos Avro | Tipos Avro | Tipos Spark SQL |
---|---|---|
date | INT | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
decimal | corrigido | DecimalType |
decimal | bytes | DecimalType |
Observação
A fonte de dados Avro ignora documentos, aliases e outras propriedades presentes no arquivo Avro.
Tipos com suporte para conversão entre Spark SQL -> Avro
Esta biblioteca dá suporte à gravação de todos os tipos Spark SQL em Avro. Para a maioria dos tipos, o mapeamento de tipos Spark para tipos Avro é simples (por exemplo IntegerType
é convertido em int
). A seguir está uma lista dos alguns casos especiais:
Tipos Spark SQL | Tipos Avro | Tipos lógicos Avro |
---|---|---|
ByteType | INT | |
ShortType | INT | |
BinaryType | bytes | |
DecimalType | corrigido | decimal |
TimestampType | long | timestamp-micros |
DateType | INT | date |
Você também pode especificar todo o esquema Avro de saída com a opção avroSchema
, para que os tipos Spark SQL possam ser convertidos em outros tipos Avro.
As conversões a seguir não são aplicadas por padrão e exigem esquema Avro especificado pelo usuário:
Tipos Spark SQL | Tipos Avro | Tipos lógicos Avro |
---|---|---|
ByteType | corrigido | |
StringType | enum | |
DecimalType | bytes | decimal |
TimestampType | long | timestamp-millis |
Exemplos
Esses exemplos usam o arquivo episodes.avro.
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
Este exemplo demonstra um esquema Avro personalizado:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
Este exemplo demonstra as opções de compactação Avro:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
Este exemplo demonstra registros Avro particionados:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
Este exemplo demonstra o nome do registro e o namespace:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Para consultar dados Avro no SQL, registre o arquivo de dados como uma tabela ou exibição temporária:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Exemplo de notebook: ler e gravar arquivos Avro
O notebook a seguir demonstra como ler e gravar arquivos Avro.