Avro-fil
Apache Avro är ett data serialiseringssystem. Avro tillhandahåller:
- Omfattande datastrukturer.
- Ett kompakt, snabbt, binärt dataformat.
- En containerfil för att lagra beständiga data.
- Fjärrproceduranrop (RPC).
- Enkel integrering med dynamiska språk. Kodgenerering krävs inte för att läsa eller skriva datafiler eller för att använda eller implementera RPC-protokoll. Kodgenerering som en valfri optimering, bara värt att implementera för statiskt skrivna språk.
- Schemakonvertering: Automatisk konvertering mellan Apache Spark SQL- och Avro-poster.
- Partitionering: Läs och skriv enkelt partitionerade data utan extra konfiguration.
- Komprimering: Komprimering som ska användas när Avro skrivs ut till disk. De typer som stöds är
uncompressed
,snappy
ochdeflate
. Du kan också ange deflate-nivån. - Postnamn: Postnamn och namnområde genom att skicka en karta över parametrar med
recordName
ochrecordNamespace
.
Se även Läsa och skriva strömmande Avro-data.
Konfiguration
Du kan ändra beteendet för en Avro-datakälla med hjälp av olika konfigurationsparametrar.
Om du vill ignorera filer utan .avro
tillägget när du läser kan du ange parametern avro.mapred.ignore.inputs.without.extension
i Hadoop-konfigurationen. Standardvärdet är false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Konfigurera komprimering när du skriver genom att ange följande Spark-egenskaper:
- Komprimeringskod:
spark.sql.avro.compression.codec
. Codecs som stöds ärsnappy
ochdeflate
. Standardkodcen ärsnappy
. - Om komprimeringskodc är
deflate
kan du ange komprimeringsnivån med:spark.sql.avro.deflate.level
. Standardnivån är-1
.
Du kan ange dessa egenskaper i Spark-klustrets konfiguration eller vid körning med hjälp av spark.conf.set()
. Till exempel:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
För Databricks Runtime 9.1 LTS och senare kan du ändra standardbeteendet för schemainferens i Avro genom att ange mergeSchema
alternativet när du läser filer. true
Om du anger mergeSchema
till härleds ett schema från en uppsättning Avro-filer i målkatalogen och sammanfogar dem i stället för att härleda lässchemat från en enda fil.
Typer som stöds för Avro –> Spark SQL-konvertering
Det här biblioteket stöder läsning av alla Avro-typer. Den använder följande mappning från Avro-typer till Spark SQL-typer:
Avro-typ | Spark SQL-typ |
---|---|
boolean | BooleanType |
heltal | IntegerType |
lång | LongType |
flyttal | FloatType |
dubbel | DoubleType |
byte | BinaryType |
sträng | StringType |
rekord | StructType |
uppräkning | StringType |
matris | ArrayType |
map | MapType |
fast | BinaryType |
union | Se Union-typer. |
Union-typer
Avro-datakällan stöder lästyper union
. Avro anser att följande tre typer är union
typer:
union(int, long)
mappar tillLongType
.union(float, double)
mappar tillDoubleType
.union(something, null)
, därsomething
finns någon Avro-typ som stöds. Detta mappar till samma Spark SQL-typ som försomething
, mednullable
inställt påtrue
.
Alla andra union
typer är komplexa typer. De mappar till StructType
var fältnamnen är member0
, member1
och så vidare, i enlighet med medlemmarna union
i . Detta överensstämmer med beteendet vid konvertering mellan Avro och Parquet.
Logiska typer
Avro-datakällan stöder läsning av följande logiska Avro-typer:
Logisk avro-typ | Avro-typ | Spark SQL-typ |
---|---|---|
datum | heltal | DateType |
timestamp-millis | lång | Tidsstämpeltyp |
timestamp-micros | lång | Tidsstämpeltyp |
decimal | fast | Decimaltyp |
decimal | byte | Decimaltyp |
Kommentar
Avro-datakällan ignorerar dokument, alias och andra egenskaper som finns i Avro-filen.
Typer som stöds för Spark SQL –> Avro-konvertering
Det här biblioteket stöder skrivning av alla Spark SQL-typer till Avro. För de flesta typer är mappningen från Spark-typer till Avro-typer enkel (till exempel IntegerType
konverteras till int
); följande är en lista över de få specialfallen:
Spark SQL-typ | Avro-typ | Logisk avro-typ |
---|---|---|
ByteType | heltal | |
ShortType | heltal | |
BinaryType | byte | |
Decimaltyp | fast | decimal |
Tidsstämpeltyp | lång | timestamp-micros |
DateType | heltal | datum |
Du kan också ange hela Avro-schemat med alternativet avroSchema
, så att Spark SQL-typer kan konverteras till andra Avro-typer.
Följande konverteringar tillämpas inte som standard och kräver användarens angivna Avro-schema:
Spark SQL-typ | Avro-typ | Logisk avro-typ |
---|---|---|
ByteType | fast | |
StringType | uppräkning | |
Decimaltyp | byte | decimal |
Tidsstämpeltyp | lång | timestamp-millis |
Exempel
I de här exemplen används filen episodes.avro .
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
Det här exemplet visar ett anpassat Avro-schema:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
Det här exemplet visar Alternativ för Avro-komprimering:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
Det här exemplet visar partitionerade Avro-poster:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
Det här exemplet visar postnamnet och namnområdet:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Om du vill köra frågor mot Avro-data i SQL registrerar du datafilen som en tabell eller temporär vy:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Notebook-exempel: Läsa och skriva Avro-filer
Följande notebook-fil visar hur du läser och skriver Avro-filer.