Arbeiten mit Daten in einem Spark-Dataframe
Spark verwendet nativ eine Datenstruktur, die als resilientes verteiltes Dataset (Resilient Distributed Dataset, RDD) bezeichnet wird. Sie können zwar Code schreiben, der direkt mit RDDs arbeitet, aber die am häufigsten verwendete Datenstruktur für die Arbeit mit strukturierten Daten in Spark ist der Dataframe, der als Teil der Spark SQL-Bibliothek bereitgestellt wird. Dataframes in Spark ähneln denen der allgegenwärtigen Python-Bibliothek Pandas, sind aber für die verteilte Verarbeitungsumgebung von Spark optimiert.
Hinweis
Zusätzlich zur Dataframe-API bietet Spark SQL eine stark typisierte Dataset-API, die in Java und Scala unterstützt wird. In diesem Modul werden wir uns auf die Dataframe-API konzentrieren.
Laden von Daten in einen Dataframe
Lassen Sie uns an einem hypothetischen Beispiel untersuchen, wie Sie einen Dataframe für die Arbeit mit Daten verwenden können. Nehmen Sie an, dass die folgenden Daten in einer durch Trennzeichen getrennten Textdatei namens products.csv im Ordner Files/data Ihres Lakehouse vorhanden sind:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Ableiten eines Schemas
In einem Spark-Notebook könnten Sie den folgenden PySpark-Code verwenden, um die Dateidaten in einen Dataframe zu laden und die ersten zehn Zeilen anzuzeigen:
%%pyspark
df = spark.read.load('Files/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Die Zeile %%pyspark
am Anfang wird als Magic-Befehl bezeichnet und teilt Spark mit, dass die in dieser Zelle verwendete Sprache PySpark ist. Sie können die Sprache, die Sie standardmäßig verwenden möchten, in der Symbolleiste der Notebook-Benutzeroberfläche auswählen und dann einen Magic-Befehl verwenden, um diese Auswahl für eine bestimmte Zelle außer Kraft zu setzen. Hier ist z. B. der entsprechende Scala-Code für das Beispiel mit den Produktdaten:
%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))
Der Magic-Befehl %%spark
wird verwendet, um Scala anzugeben.
Beide Codebeispiele würden eine Ausgabe wie die folgende erzeugen:
ProductID | ProductName | Kategorie | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Mountainbikes | 3399.9900 |
772 | Mountain-100 Silver, 42 | Mountainbikes | 3399.9900 |
773 | Mountain-100 Silver, 44 | Mountainbikes | 3399.9900 |
... | ... | ... | ... |
Angeben eines expliziten Schemas
Im vorherigen Beispiel enthielt die erste Zeile der CSV-Datei die Spaltennamen, und Spark war in der Lage, den Datentyp jeder Spalte aus den darin enthaltenen Daten abzuleiten. Sie können auch ein explizites Schema für die Daten angeben, was nützlich ist, wenn die Spaltennamen nicht in der Datendatei enthalten sind, wie in diesem CSV-Beispiel:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Das folgende PySpark-Beispiel zeigt, wie Sie ein Schema für den Dataframe angeben, der aus einer Datei namens product-data.csv in diesem Format geladen werden soll:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('Files/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Die Ergebnisse wären wieder ähnlich zu:
ProductID | ProductName | Kategorie | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Mountainbikes | 3399.9900 |
772 | Mountain-100 Silver, 42 | Mountainbikes | 3399.9900 |
773 | Mountain-100 Silver, 44 | Mountainbikes | 3399.9900 |
... | ... | ... | ... |
Tipp
Die Angabe eines expliziten Schemas verbessert auch die Leistung.
Filtern und Gruppieren von Dataframes
Sie können die Methoden der Dataframe-Klasse verwenden, um die darin enthaltenen Daten zu filtern, zu sortieren, zu gruppieren und anderweitig zu bearbeiten. Das folgende Codebeispiel verwendet z. B. die select-Methode, um die Spalten ProductID und ListPrice aus dem df-Dataframe mit den Produktdaten des vorherigen Beispiels abzurufen:
pricelist_df = df.select("ProductID", "ListPrice")
Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
Wie die meisten Methoden zur Datenbearbeitung gibt select ein neues Dataframe-Objekt zurück.
Tipp
Die Auswahl einer Teilmenge von Spalten aus einem Dataframe ist ein gängiger Vorgang, der auch mithilfe der folgenden kürzeren Syntax erreicht werden kann:
pricelist_df = df["ProductID", "ListPrice"]
Sie können Methoden miteinander „verketten“, um eine Reihe von Bearbeitungen durchzuführen, die zu einem transformierten Dataframe führen. In diesem Beispielcode werden z. B. die Methoden select und where miteinander verknüpft, um einen neuen Dataframe zu erstellen, der die Spalten ProductName und ListPrice für Produkte mit der Kategorie Mountain Bikes oder Road Bikes enthält:
bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:
ProductName | Kategorie | ListPrice |
---|---|---|
Mountain-100 Silver, 38 | Mountainbikes | 3399.9900 |
Road-750 Black, 52 | Rennräder | 539,9900 |
... | ... | ... |
Um Daten zu gruppieren und zu aggregieren, können Sie die groupBy-Methode und Aggregatfunktionen verwenden. Der folgende PySpark-Code zählt z. B. die Anzahl der Produkte für jede Kategorie:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:
Category | count |
---|---|
Lenkköpfe | 3 |
Räder | 14 |
Mountainbikes | 32 |
... | ... |
Speichern eines Dataframes
Die Verwendung von Spark empfiehlt sich häufig, um Rohdaten zu transformieren und die Ergebnisse für die weitere Analyse oder nachgelagerte Verarbeitung zu speichern. Im folgenden Codebeispiel wird der DataFrame in einer Parquet-Datei im Data Lake gespeichert, wobei jede vorhandene Datei mit demselben Namen ersetzt wird.
bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')
Hinweis
Das Parquet-Format wird in der Regel für Datendateien verwendet, die Sie für die weitere Analyse oder Erfassung in einem Analysespeicher verwenden. Parquet ist ein sehr effizientes Format, das von den meisten umfangreichen Datenanalysesystemen unterstützt wird. Tatsächlich kann Ihre Datentransformation manchmal einfach darin bestehen, Daten aus einem anderen Format (z. B. CSV) in Parquet zu konvertieren!
Partitionieren der Ausgabedatei
Die Partitionierung ist eine Optimierungsmethode, mit der Spark die Leistung auf den Workerknoten maximieren kann. Beim Filtern von Daten in Abfragen können weitere Leistungssteigerungen erzielt werden, indem unnötige Datenträger-E/A-Vorgänge eliminiert werden.
Um einen Dataframe als partitionierte Gruppe von Dateien zu speichern, verwenden Sie beim Schreiben der Daten die partitionBy-Methode. Im folgenden Beispiel wird der Dataframe bikes_df (der die Produktdaten für die Kategorien Mountain Bikes und Road Bikes enthält) gespeichert, und die Daten werden nach Kategorie partitioniert:
bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")
Die Ordnernamen, die beim Partitionieren eines Dataframes generiert werden, enthalten den Namen und Wert der Partitionierungsspalte im Format Spalte=Wert. Im Codebeispiel wird also ein Ordner namens bike_data erstellt, der die folgenden Unterordner enthält:
- Category=Mountain Bikes
- Category=Road Bikes
Jeder Unterordner enthält mindestens eine Parquet-Datei mit den Produktdaten für die entsprechende Kategorie.
Hinweis
Sie können die Daten nach mehreren Spalten partitionieren, wodurch eine Hierarchie von Ordnern für jeden Partitionsschlüssel entsteht. Beispielsweise könnten Sie die Bestelldaten nach Jahr und Monat partitionieren, sodass die Ordnerhierarchie einen Ordner für jeden Jahreswert enthält, der wiederum einen Unterordner für jeden Monatswert enthält.
Laden partitionierter Daten
Beim Lesen partitionierter Daten in einen Dataframe können Sie die Daten aus jedem beliebigen Ordner innerhalb der Hierarchie laden, indem Sie explizite Werte oder Platzhalter für die partitionierten Felder angeben. Im folgenden Beispiel werden Daten für Produkte in der Kategorie Road Bikes geladen:
road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))
Hinweis
Die im Dateipfad angegebenen Partitionierungsspalten werden im resultierenden Dataframe nicht angegeben. Die von der Beispielabfrage zurückgegebenen Ergebnisse enthalten keine Category-Spalte. Die Kategorie aller Zeilen würde Road Bikes lauten.