Arbeiten mit Daten in einem Spark-Dataframe

Abgeschlossen

In der vorherigen Lerneinheit haben Sie erfahren, wie Sie eine Verbindung mit einer Datenquelle herstellen, Daten in einen Dataframe laden und optional den Dataframe als Datei oder Tabelle in einem Lakehouse speichern. Sehen wir uns nun den Dataframe etwas ausführlicher an.

Spark verwendet nativ eine Datenstruktur, die als resilientes verteiltes Dataset (Resilient Distributed Dataset, RDD) bezeichnet wird. Sie können zwar Code schreiben, der direkt mit RDDs interagiert, aber die am häufigsten verwendete Datenstruktur für die Arbeit mit strukturierten Daten in Spark ist der Dataframe, der als Teil der Spark SQL-Bibliothek bereitgestellt wird. Dataframes in Spark ähneln denen der allgegenwärtigen Python-Bibliothek Pandas, sind aber für die verteilte Verarbeitungsumgebung von Spark optimiert.

Hinweis

Zusätzlich zur Dataframe-API bietet Spark SQL eine stark typisierte Dataset-API, die in Java und Scala unterstützt wird. In diesem Modul werden wir uns auf die Dataframe-API konzentrieren.

Laden von Daten in einen Dataframe

Lassen Sie uns an einem hypothetischen Beispiel untersuchen, wie Sie einen Dataframe für die Arbeit mit Daten verwenden können. Nehmen Sie an, dass die folgenden Daten in einer durch Trennzeichen getrennten Textdatei namens products.csv im Ordner Files/data Ihres Lakehouse vorhanden sind:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Ableiten eines Schemas

In einem Spark-Notebook könnten Sie den folgenden PySpark-Code verwenden, um die Dateidaten in einen Dataframe zu laden und die ersten zehn Zeilen anzuzeigen:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Wie Sie bereits erfahren haben, wird die Zeile %%pyspark am Anfang als Magic-Befehl bezeichnet und teilt Spark mit, dass die in dieser Zelle verwendete Sprache PySpark ist. In den meisten Fällen ist PySpark die Standardsprache und wir behalten das in den Beispielen in diesem Modul in der Regel bei. Der Vollständigkeit halber ist hier aber z. B. der entsprechende Scala-Code für das Beispiel mit den Produktdaten:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

Der Magic-Befehl %%spark wird verwendet, um Scala anzugeben. Beachten Sie, dass sich die Scala-Implementierung des Dataframes ähnlich wie die PySpark-Version verhält.

Beide Codebeispiele würden eine Ausgabe wie die folgende erzeugen:

ProductID ProductName Kategorie ListPrice
771 Mountain-100 Silver, 38 Mountainbikes 3399.9900
772 Mountain-100 Silver, 42 Mountainbikes 3399.9900
773 Mountain-100 Silver, 44 Mountainbikes 3399.9900
... ... ... ...

Angeben eines expliziten Schemas

Im vorherigen Beispiel enthielt die erste Zeile der CSV-Datei die Spaltennamen, und Spark war in der Lage, den Datentyp jeder Spalte aus den darin enthaltenen Daten abzuleiten. Sie können auch ein explizites Schema für die Daten angeben, was nützlich ist, wenn die Spaltennamen nicht in der Datendatei enthalten sind, wie in diesem CSV-Beispiel:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Das folgende PySpark-Beispiel zeigt, wie Sie ein Schema für den Dataframe angeben, der aus einer Datei namens product-data.csv in diesem Format geladen werden soll:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Die Ergebnisse wären wieder ähnlich zu:

ProductID ProductName Kategorie ListPrice
771 Mountain-100 Silver, 38 Mountainbikes 3399.9900
772 Mountain-100 Silver, 42 Mountainbikes 3399.9900
773 Mountain-100 Silver, 44 Mountainbikes 3399.9900
... ... ... ...

Tipp

Die Angabe eines expliziten Schemas verbessert auch die Leistung.

Filtern und Gruppieren von Dataframes

Sie können die Methoden der Dataframe-Klasse verwenden, um die darin enthaltenen Daten zu filtern, zu sortieren, zu gruppieren und anderweitig zu bearbeiten. Das folgende Codebeispiel verwendet z. B. die select-Methode, um die Spalten ProductID und ListPrice aus dem df-Dataframe mit den Produktdaten des vorherigen Beispiels abzurufen:

pricelist_df = df.select("ProductID", "ListPrice")

Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Wie die meisten Methoden zur Datenbearbeitung gibt select ein neues Dataframe-Objekt zurück.

Tipp

Die Auswahl einer Teilmenge von Spalten aus einem Dataframe ist ein gängiger Vorgang, der auch mithilfe der folgenden kürzeren Syntax erreicht werden kann:

pricelist_df = df["ProductID", "ListPrice"]

Sie können Methoden miteinander „verketten“, um eine Reihe von Bearbeitungen durchzuführen, die zu einem transformierten Dataframe führen. In diesem Beispielcode werden z. B. die Methoden select und where miteinander verknüpft, um einen neuen Dataframe zu erstellen, der die Spalten ProductName und ListPrice für Produkte mit der Kategorie Mountain Bikes oder Road Bikes enthält:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:

ProductName Kategorie ListPrice
Mountain-100 Silver, 38 Mountainbikes 3399.9900
Road-750 Black, 52 Rennräder 539,9900
... ... ...

Um Daten zu gruppieren und zu aggregieren, können Sie die groupBy-Methode und Aggregatfunktionen verwenden. Der folgende PySpark-Code zählt z. B. die Anzahl der Produkte für jede Kategorie:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:

Category count
Lenkköpfe 3
Räder 14
Mountainbikes 32
... ...