Avro-Datei
Apache Avro ist ein Datenserialisierungssystem. Avro bietet:
- Umfangreiche Datenstrukturen.
- Ein kompaktes, schnelles binäres Datenformat.
- Eine Containerdatei zum Speichern persistenter Daten.
- Remoteprozeduraufruf (RPC).
- Einfache Integration in dynamische Sprachen. Die Codegenerierung ist weder zum Lesen oder Schreiben von Datendateien noch zum Verwenden oder Implementieren von RPC-Protokollen erforderlich. Codegenerierung als optionale Optimierung, deren Implementierung nur für statisch typisierte Sprachen sinnvoll ist.
Die Avro-Datenquelle unterstützt Folgendes:
- Schemakonvertierung: Automatische Konvertierung zwischen Apache Spark SQL- und Avro-Datensätzen.
- Partitionierung: Einfaches Lesen und Schreiben partitionierter Daten ohne zusätzliche Konfiguration.
- Komprimierung: Komprimierung, die beim Schreiben von Avro auf den Datenträger verwendet werden soll. Die unterstützten Typen sind
uncompressed
,snappy
unddeflate
. Sie können auch die Deflate-Ebene angeben. - Datensatznamen: Datensatzname und Namespace durch Übergabe einer Zuordnung von Parametern mit
recordName
undrecordNamespace
.
Weitere Informationen finden Sie auch unter Lesen und Schreiben von Avro-Streamingdaten.
Konfiguration
Sie können das Verhalten einer Avro-Datenquelle mithilfe verschiedener Konfigurationsparameter ändern.
Um Dateien ohne die Erweiterung .avro
beim Lesen zu ignorieren, können Sie den Parameter avro.mapred.ignore.inputs.without.extension
in der Hadoop-Konfiguration festlegen. Der Standardwert lautet false
.
spark
.sparkContext
.hadoopConfiguration
.set("avro.mapred.ignore.inputs.without.extension", "true")
Legen Sie die folgenden Spark-Eigenschaften fest, um die Komprimierung beim Schreiben zu konfigurieren:
- Komprimierungscodec:
spark.sql.avro.compression.codec
. Unterstützte Codecs sindsnappy
unddeflate
. Der Standardcodec istsnappy
. - Wenn der Komprimierungscodec
deflate
ist, können Sie die Komprimierungsebene mitspark.sql.avro.deflate.level
festlegen. Die Standardebene ist-1
.
Sie können diese Eigenschaften in der Spark-Konfiguration des Clusters oder zur Laufzeit mit spark.conf.set()
festlegen. Beispiele:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
Für Databricks Runtime 9.1 LTS und höher können Sie das standardmäßige Schemarückschlussverhalten in Avro ändern, indem Sie die Option mergeSchema
beim Lesen von Dateien bereitstellen. Wenn Sie mergeSchema
auf true
festlegen, wird ein Schema aus einem Satz von Avro-Dateien im Zielverzeichnis abgeleitet und zusammengeführt, anstatt das Leseschema aus einer einzelnen Datei abzuleiten.
Unterstützte Typen für Avro –> Spark-SQL-Konvertierung
Diese Bibliothek unterstützt das Lesen aller Avro-Typen. Dabei wird die folgende Zuordnung von Avro-Typen zu Spark-SQL-Typen verwendet:
Avro-Typ | Spark SQL-Typ |
---|---|
boolean | BooleanType |
int | IntegerType |
lang | LongType |
float | FloatType |
double | DoubleType |
Byte | BinaryType |
Zeichenfolge | StringType |
record | StructType |
enum | StringType |
array | ArrayType |
map | MapType |
behoben | BinaryType |
union | Siehe Union-Typen. |
Union-Datentypen
Die Avro-Datenquelle unterstützt das Lesen von union
-Typen. Avro betrachtet die folgenden drei Typen als union
-Typen:
union(int, long)
istLongType
zugeordnet.union(float, double)
istDoubleType
zugeordnet.union(something, null)
, wobeisomething
ein beliebiger unterstützter Avro-Typ ist. Dies wird demselben Spark SQL-Typ zugeordnet wie der vonsomething
, wobeinullable
auftrue
festgelegt ist.
Alle anderen union
-Typen sind komplexe Typen. Sie werden StructType
zugeordnet, wobei die Feldnamen in Übereinstimmung mit den Mitgliedern von union
member0
, member1
usw. lauten. Dies entspricht dem Verhalten beim Konvertieren zwischen Avro und Parquet.
Logische Typen
Die Avro-Datenquelle unterstützt das Lesen der folgenden logischen Avro-Typen:
Logischer Avro-Typ | Avro-Typ | Spark SQL-Typ |
---|---|---|
date | int | DateType |
timestamp-millis | lang | TimestampType |
timestamp-micros | lang | TimestampType |
Decimal | behoben | DecimalType |
Decimal | Byte | DecimalType |
Hinweis
Die Avro-Datenquelle ignoriert Dokumente, Aliase und andere Eigenschaften, die in der Avro-Datei vorhanden sind.
Unterstützte Typen für Spark SQL – Konvertierung von > Avro
Diese Bibliothek unterstützt das Schreiben aller Spark SQL-Typen in Avro. Bei den meisten Typen ist die Zuordnung von Spark-Typen zu Avro-Typen unkompliziert (z. B. wird IntegerType
in int
konvertiert). Im Folgenden finden Sie eine Liste der wenigen Sonderfälle:
Spark SQL-Typ | Avro-Typ | Logischer Avro-Typ |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | Byte | |
DecimalType | behoben | Decimal |
TimestampType | lang | timestamp-micros |
DateType | int | date |
Sie können auch das gesamte Avro-Ausgabeschema mit der Option avroSchema
festlegen, damit Spark SQL-Typen in andere Avro-Typen konvertiert werden können.
Die folgenden Konvertierungen werden standardmäßig nicht angewendet und erfordern ein vom Benutzer angegebenes Avro-Schema:
Spark SQL-Typ | Avro-Typ | Logischer Avro-Typ |
---|---|---|
ByteType | behoben | |
StringType | enum | |
DecimalType | Byte | Decimal |
TimestampType | lang | timestamp-millis |
Beispiele
In diesen Beispielen wird die Datei episodes.avro verwendet.
Scala
// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records
val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")
Dieses Beispiel veranschaulicht ein benutzerdefiniertes Avro-Schema:
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(new File("episode.avsc"))
spark
.read
.format("avro")
.option("avroSchema", schema.toString)
.load("/tmp/episodes.avro")
.show()
Dieses Beispiel veranschaulicht Avro-Komprimierungsoptionen:
// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")
val df = spark.read.format("avro").load("/tmp/episodes.avro")
// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")
Dieses Beispiel veranschaulicht partitionierte Avro-Datensätze:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
val df = spark.createDataFrame(
Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0))
).toDF("year", "month", "title", "rating")
df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")
Dieses Beispiel veranschaulicht den Datensatznamen und den Namespace:
val df = spark.read.format("avro").load("/tmp/episodes.avro")
val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)
df.write.options(parameters).format("avro").save("/tmp/output")
Python
# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")
# Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")
SQL
Um Avro-Daten in SQL abfragen zu können, registrieren Sie die Datendatei als Tabelle oder temporäre Ansicht:
CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")
SELECT * from episodes
Notebookbeispiel: Lesen und Schreiben von Avro-Dateien
Das folgende Notebook veranschaulicht das Lesen und Schreiben von Avro-Dateien.