Arbeiten mit Daten in einem Spark-Dataframe
In der vorherigen Lerneinheit haben Sie erfahren, wie Sie eine Verbindung mit einer Datenquelle herstellen, Daten in einen Dataframe laden und optional den Dataframe als Datei oder Tabelle in einem Lakehouse speichern. Sehen wir uns nun den Dataframe etwas ausführlicher an.
Spark verwendet nativ eine Datenstruktur, die als resilientes verteiltes Dataset (Resilient Distributed Dataset, RDD) bezeichnet wird. Sie können zwar Code schreiben, der direkt mit RDDs interagiert, aber die am häufigsten verwendete Datenstruktur für die Arbeit mit strukturierten Daten in Spark ist der Dataframe, der als Teil der Spark SQL-Bibliothek bereitgestellt wird. Dataframes in Spark ähneln denen der allgegenwärtigen Python-Bibliothek Pandas, sind aber für die verteilte Verarbeitungsumgebung von Spark optimiert.
Hinweis
Zusätzlich zur Dataframe-API bietet Spark SQL eine stark typisierte Dataset-API, die in Java und Scala unterstützt wird. In diesem Modul werden wir uns auf die Dataframe-API konzentrieren.
Laden von Daten in einen Dataframe
Lassen Sie uns an einem hypothetischen Beispiel untersuchen, wie Sie einen Dataframe für die Arbeit mit Daten verwenden können. Nehmen Sie an, dass die folgenden Daten in einer durch Trennzeichen getrennten Textdatei namens products.csv im Ordner Files/data Ihres Lakehouse vorhanden sind:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Ableiten eines Schemas
In einem Spark-Notebook könnten Sie den folgenden PySpark-Code verwenden, um die Dateidaten in einen Dataframe zu laden und die ersten zehn Zeilen anzuzeigen:
%%pyspark
df = spark.read.load('Files/data/products.csv',
format='csv',
header=True
)
display(df.limit(10))
Wie Sie bereits erfahren haben, wird die Zeile %%pyspark
am Anfang als Magic-Befehl bezeichnet und teilt Spark mit, dass die in dieser Zelle verwendete Sprache PySpark ist. In den meisten Fällen ist PySpark die Standardsprache und wir behalten das in den Beispielen in diesem Modul in der Regel bei. Der Vollständigkeit halber ist hier aber z. B. der entsprechende Scala-Code für das Beispiel mit den Produktdaten:
%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))
Der Magic-Befehl %%spark
wird verwendet, um Scala anzugeben. Beachten Sie, dass sich die Scala-Implementierung des Dataframes ähnlich wie die PySpark-Version verhält.
Beide Codebeispiele würden eine Ausgabe wie die folgende erzeugen:
ProductID | ProductName | Kategorie | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Mountainbikes | 3399.9900 |
772 | Mountain-100 Silver, 42 | Mountainbikes | 3399.9900 |
773 | Mountain-100 Silver, 44 | Mountainbikes | 3399.9900 |
... | ... | ... | ... |
Angeben eines expliziten Schemas
Im vorherigen Beispiel enthielt die erste Zeile der CSV-Datei die Spaltennamen, und Spark war in der Lage, den Datentyp jeder Spalte aus den darin enthaltenen Daten abzuleiten. Sie können auch ein explizites Schema für die Daten angeben, was nützlich ist, wenn die Spaltennamen nicht in der Datendatei enthalten sind, wie in diesem CSV-Beispiel:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Das folgende PySpark-Beispiel zeigt, wie Sie ein Schema für den Dataframe angeben, der aus einer Datei namens product-data.csv in diesem Format geladen werden soll:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('Files/data/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
Die Ergebnisse wären wieder ähnlich zu:
ProductID | ProductName | Kategorie | ListPrice |
---|---|---|---|
771 | Mountain-100 Silver, 38 | Mountainbikes | 3399.9900 |
772 | Mountain-100 Silver, 42 | Mountainbikes | 3399.9900 |
773 | Mountain-100 Silver, 44 | Mountainbikes | 3399.9900 |
... | ... | ... | ... |
Tipp
Die Angabe eines expliziten Schemas verbessert auch die Leistung.
Filtern und Gruppieren von Dataframes
Sie können die Methoden der Dataframe-Klasse verwenden, um die darin enthaltenen Daten zu filtern, zu sortieren, zu gruppieren und anderweitig zu bearbeiten. Das folgende Codebeispiel verwendet z. B. die select-Methode, um die Spalten ProductID und ListPrice aus dem df-Dataframe mit den Produktdaten des vorherigen Beispiels abzurufen:
pricelist_df = df.select("ProductID", "ListPrice")
Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:
ProductID | ListPrice |
---|---|
771 | 3399.9900 |
772 | 3399.9900 |
773 | 3399.9900 |
... | ... |
Wie die meisten Methoden zur Datenbearbeitung gibt select ein neues Dataframe-Objekt zurück.
Tipp
Die Auswahl einer Teilmenge von Spalten aus einem Dataframe ist ein gängiger Vorgang, der auch mithilfe der folgenden kürzeren Syntax erreicht werden kann:
pricelist_df = df["ProductID", "ListPrice"]
Sie können Methoden miteinander „verketten“, um eine Reihe von Bearbeitungen durchzuführen, die zu einem transformierten Dataframe führen. In diesem Beispielcode werden z. B. die Methoden select und where miteinander verknüpft, um einen neuen Dataframe zu erstellen, der die Spalten ProductName und ListPrice für Produkte mit der Kategorie Mountain Bikes oder Road Bikes enthält:
bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:
ProductName | Kategorie | ListPrice |
---|---|---|
Mountain-100 Silver, 38 | Mountainbikes | 3399.9900 |
Road-750 Black, 52 | Rennräder | 539,9900 |
... | ... | ... |
Um Daten zu gruppieren und zu aggregieren, können Sie die groupBy-Methode und Aggregatfunktionen verwenden. Der folgende PySpark-Code zählt z. B. die Anzahl der Produkte für jede Kategorie:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:
Category | count |
---|---|
Lenkköpfe | 3 |
Räder | 14 |
Mountainbikes | 32 |
... | ... |