Arbeiten mit Daten in einem Spark-Dataframe

Abgeschlossen

Spark verwendet nativ eine Datenstruktur, die als resilientes verteiltes Dataset (Resilient Distributed Dataset, RDD) bezeichnet wird. Sie können zwar Code schreiben, der direkt mit RDDs arbeitet, aber die am häufigsten verwendete Datenstruktur für die Arbeit mit strukturierten Daten in Spark ist der Dataframe, der als Teil der Spark SQL-Bibliothek bereitgestellt wird. Dataframes in Spark ähneln denen der allgegenwärtigen Python-Bibliothek Pandas, sind aber für die verteilte Verarbeitungsumgebung von Spark optimiert.

Hinweis

Zusätzlich zur Dataframe-API bietet Spark SQL eine stark typisierte Dataset-API, die in Java und Scala unterstützt wird. In diesem Modul werden wir uns auf die Dataframe-API konzentrieren.

Laden von Daten in einen Dataframe

Lassen Sie uns an einem hypothetischen Beispiel untersuchen, wie Sie einen Dataframe für die Arbeit mit Daten verwenden können. Nehmen Sie an, dass die folgenden Daten in einer durch Trennzeichen getrennten Textdatei namens products.csv im Ordner Files/data Ihres Lakehouse vorhanden sind:

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Ableiten eines Schemas

In einem Spark-Notebook könnten Sie den folgenden PySpark-Code verwenden, um die Dateidaten in einen Dataframe zu laden und die ersten zehn Zeilen anzuzeigen:

%%pyspark
df = spark.read.load('Files/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

Die Zeile %%pyspark am Anfang wird als Magic-Befehl bezeichnet und teilt Spark mit, dass die in dieser Zelle verwendete Sprache PySpark ist. Sie können die Sprache, die Sie standardmäßig verwenden möchten, in der Symbolleiste der Notebook-Benutzeroberfläche auswählen und dann einen Magic-Befehl verwenden, um diese Auswahl für eine bestimmte Zelle außer Kraft zu setzen. Hier ist z. B. der entsprechende Scala-Code für das Beispiel mit den Produktdaten:

%%spark
val df = spark.read.format("csv").option("header", "true").load("Files/data/products.csv")
display(df.limit(10))

Der Magic-Befehl %%spark wird verwendet, um Scala anzugeben.

Beide Codebeispiele würden eine Ausgabe wie die folgende erzeugen:

ProductID ProductName Kategorie ListPrice
771 Mountain-100 Silver, 38 Mountainbikes 3399.9900
772 Mountain-100 Silver, 42 Mountainbikes 3399.9900
773 Mountain-100 Silver, 44 Mountainbikes 3399.9900
... ... ... ...

Angeben eines expliziten Schemas

Im vorherigen Beispiel enthielt die erste Zeile der CSV-Datei die Spaltennamen, und Spark war in der Lage, den Datentyp jeder Spalte aus den darin enthaltenen Daten abzuleiten. Sie können auch ein explizites Schema für die Daten angeben, was nützlich ist, wenn die Spaltennamen nicht in der Datendatei enthalten sind, wie in diesem CSV-Beispiel:

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Das folgende PySpark-Beispiel zeigt, wie Sie ein Schema für den Dataframe angeben, der aus einer Datei namens product-data.csv in diesem Format geladen werden soll:

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('Files/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Die Ergebnisse wären wieder ähnlich zu:

ProductID ProductName Kategorie ListPrice
771 Mountain-100 Silver, 38 Mountainbikes 3399.9900
772 Mountain-100 Silver, 42 Mountainbikes 3399.9900
773 Mountain-100 Silver, 44 Mountainbikes 3399.9900
... ... ... ...

Tipp

Die Angabe eines expliziten Schemas verbessert auch die Leistung.

Filtern und Gruppieren von Dataframes

Sie können die Methoden der Dataframe-Klasse verwenden, um die darin enthaltenen Daten zu filtern, zu sortieren, zu gruppieren und anderweitig zu bearbeiten. Das folgende Codebeispiel verwendet z. B. die select-Methode, um die Spalten ProductID und ListPrice aus dem df-Dataframe mit den Produktdaten des vorherigen Beispiels abzurufen:

pricelist_df = df.select("ProductID", "ListPrice")

Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Wie die meisten Methoden zur Datenbearbeitung gibt select ein neues Dataframe-Objekt zurück.

Tipp

Die Auswahl einer Teilmenge von Spalten aus einem Dataframe ist ein gängiger Vorgang, der auch mithilfe der folgenden kürzeren Syntax erreicht werden kann:

pricelist_df = df["ProductID", "ListPrice"]

Sie können Methoden miteinander „verketten“, um eine Reihe von Bearbeitungen durchzuführen, die zu einem transformierten Dataframe führen. In diesem Beispielcode werden z. B. die Methoden select und where miteinander verknüpft, um einen neuen Dataframe zu erstellen, der die Spalten ProductName und ListPrice für Produkte mit der Kategorie Mountain Bikes oder Road Bikes enthält:

bikes_df = df.select("ProductName", "Category", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:

ProductName Kategorie ListPrice
Mountain-100 Silver, 38 Mountainbikes 3399.9900
Road-750 Black, 52 Rennräder 539,9900
... ... ...

Um Daten zu gruppieren und zu aggregieren, können Sie die groupBy-Methode und Aggregatfunktionen verwenden. Der folgende PySpark-Code zählt z. B. die Anzahl der Produkte für jede Kategorie:

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Die Ergebnisse dieses Codebeispiels würden etwa wie folgt aussehen:

Category count
Lenkköpfe 3
Räder 14
Mountainbikes 32
... ...

Speichern eines Dataframes

Die Verwendung von Spark empfiehlt sich häufig, um Rohdaten zu transformieren und die Ergebnisse für die weitere Analyse oder nachgelagerte Verarbeitung zu speichern. Im folgenden Codebeispiel wird der DataFrame in einer Parquet-Datei im Data Lake gespeichert, wobei jede vorhandene Datei mit demselben Namen ersetzt wird.

bikes_df.write.mode("overwrite").parquet('Files/product_data/bikes.parquet')

Hinweis

Das Parquet-Format wird in der Regel für Datendateien verwendet, die Sie für die weitere Analyse oder Erfassung in einem Analysespeicher verwenden. Parquet ist ein sehr effizientes Format, das von den meisten umfangreichen Datenanalysesystemen unterstützt wird. Tatsächlich kann Ihre Datentransformation manchmal einfach darin bestehen, Daten aus einem anderen Format (z. B. CSV) in Parquet zu konvertieren!

Partitionieren der Ausgabedatei

Die Partitionierung ist eine Optimierungsmethode, mit der Spark die Leistung auf den Workerknoten maximieren kann. Beim Filtern von Daten in Abfragen können weitere Leistungssteigerungen erzielt werden, indem unnötige Datenträger-E/A-Vorgänge eliminiert werden.

Um einen Dataframe als partitionierte Gruppe von Dateien zu speichern, verwenden Sie beim Schreiben der Daten die partitionBy-Methode. Im folgenden Beispiel wird der Dataframe bikes_df (der die Produktdaten für die Kategorien Mountain Bikes und Road Bikes enthält) gespeichert, und die Daten werden nach Kategorie partitioniert:

bikes_df.write.partitionBy("Category").mode("overwrite").parquet("Files/bike_data")

Die Ordnernamen, die beim Partitionieren eines Dataframes generiert werden, enthalten den Namen und Wert der Partitionierungsspalte im Format Spalte=Wert. Im Codebeispiel wird also ein Ordner namens bike_data erstellt, der die folgenden Unterordner enthält:

  • Category=Mountain Bikes
  • Category=Road Bikes

Jeder Unterordner enthält mindestens eine Parquet-Datei mit den Produktdaten für die entsprechende Kategorie.

Hinweis

Sie können die Daten nach mehreren Spalten partitionieren, wodurch eine Hierarchie von Ordnern für jeden Partitionsschlüssel entsteht. Beispielsweise könnten Sie die Bestelldaten nach Jahr und Monat partitionieren, sodass die Ordnerhierarchie einen Ordner für jeden Jahreswert enthält, der wiederum einen Unterordner für jeden Monatswert enthält.

Laden partitionierter Daten

Beim Lesen partitionierter Daten in einen Dataframe können Sie die Daten aus jedem beliebigen Ordner innerhalb der Hierarchie laden, indem Sie explizite Werte oder Platzhalter für die partitionierten Felder angeben. Im folgenden Beispiel werden Daten für Produkte in der Kategorie Road Bikes geladen:

road_bikes_df = spark.read.parquet('Files/bike_data/Category=Road Bikes')
display(road_bikes_df.limit(5))

Hinweis

Die im Dateipfad angegebenen Partitionierungsspalten werden im resultierenden Dataframe nicht angegeben. Die von der Beispielabfrage zurückgegebenen Ergebnisse enthalten keine Category-Spalte. Die Kategorie aller Zeilen würde Road Bikes lauten.