Analizzare i dati con Spark
Uno dei vantaggi dell'uso di Spark consiste nella possibilità di scrivere ed eseguire codice in vari linguaggi di programmazione, consentendo quindi di usare le competenze di programmazione già disponibili e di usare il linguaggio più appropriato per una determinata attività. Il linguaggio predefinito in un nuovo notebook Spark di Azure Synapse Analytics è PySpark, una versione di Python ottimizzata per Spark, che viene comunemente usata da data scientist e analisti grazie al supporto avanzato per la manipolazione e la visualizzazione dei dati. È inoltre possibile usare linguaggi quali Scala, un linguaggio derivato da Java che può essere usato in modo interattivo, e SQL, una variante del linguaggio SQL comunemente usato inclusa nella libreria Spark SQL per l'uso di strutture dei dati relazionali. Gli ingegneri software possono anche creare soluzioni compilate in esecuzione su Spark con framework quali Java e Microsoft .NET.
Esplorazione dei dati con dataframe
Spark usa in modalità nativa una struttura dei dati denominata RDD (Resilient Distributed Dataset), ma anche se è possibile scrivere codice che funziona direttamente con RDD, la struttura dei dati usata più comunemente per lavorare con dati strutturati in Spark è il dataframe, che viene fornito come parte della libreria Spark SQL. I dataframe in Spark sono simili a quelli disponibili nella libreria Python Pandas molto diffusa, ma sono ottimizzati per l'uso nell'ambiente di elaborazione distribuita di Spark.
Nota
Oltre all'API Dataframe, Spark SQL fornisce un'API Dataset fortemente tipizzata che è supportata in Java e Scala. In questo modulo ci si concentrerà sull'API Dataframe.
Caricamento dei dati in un dataframe
Verrà ora esaminato un esempio ipotetico per illustrare l'uso di un dataframe per lavorare con i dati. Si supponga di avere i dati seguenti in un file di testo delimitato da virgole denominato products.csv nell'account di archiviazione primario per un'area di lavoro di Azure Synapse Analytics:
ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
In un notebook Spark è possibile usare il codice PySpark seguente per caricare i dati in un dataframe e visualizzare le prime 10 righe:
%%pyspark
df = spark.read.load('abfss://container@store.dfs.core.windows.net/products.csv',
format='csv',
header=True
)
display(df.limit(10))
La riga %%pyspark
all'inizio è detta magic e indica a Spark che il linguaggio usato in questa cella è PySpark. È possibile selezionare il linguaggio da usare come predefinito nella barra degli strumenti dell'interfaccia del notebook e quindi usare una riga magic per eseguire l'override di tale scelta per una cella specifica. Di seguito è riportato, ad esempio, il codice Scala equivalente per i dati dei prodotti:
%%spark
val df = spark.read.format("csv").option("header", "true").load("abfss://container@store.dfs.core.windows.net/products.csv")
display(df.limit(10))
La riga magic %%spark
viene usata per specificare Scala.
Entrambi questi esempi di codice generano un output simile al seguente:
ProductID | ProductName | Categoria | ListPrice |
---|---|---|---|
771 | Mountain-100 argento, 38 | Mountain Bikes | 3399,9900 |
772 | Mountain-100 Silver, 42 | Mountain Bikes | 3399,9900 |
773 | Mountain-100 argento, 44 | Mountain Bikes | 3399,9900 |
... | ... | ... | ... |
Specificare uno schema del dataframe
Nell'esempio precedente la prima riga del file CSV contiene i nomi delle colonne e Spark è riuscito a dedurre il tipo di dati di ogni colonna dai dati contenuti. È anche possibile specificare uno schema esplicito per i dati, che risulta utile quando i nomi delle colonne non sono inclusi nel file di dati, come nell'esempio di file CSV seguente:
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...
Nell'esempio di PySpark seguente viene illustrato come specificare uno schema per il dataframe da caricare da un file denominato product-data.csv in questo formato:
from pyspark.sql.types import *
from pyspark.sql.functions import *
productSchema = StructType([
StructField("ProductID", IntegerType()),
StructField("ProductName", StringType()),
StructField("Category", StringType()),
StructField("ListPrice", FloatType())
])
df = spark.read.load('abfss://container@store.dfs.core.windows.net/product-data.csv',
format='csv',
schema=productSchema,
header=False)
display(df.limit(10))
I risultati saranno ancora una volta simili ai seguenti:
ProductID | ProductName | Categoria | ListPrice |
---|---|---|---|
771 | Mountain-100 argento, 38 | Mountain Bikes | 3399,9900 |
772 | Mountain-100 Silver, 42 | Mountain Bikes | 3399,9900 |
773 | Mountain-100 argento, 44 | Mountain Bikes | 3399,9900 |
... | ... | ... | ... |
Filtro e raggruppamento di dataframe
È possibile usare i metodi della classe Dataframe per filtrare, ordinare, raggruppare e modificare in altro modo i dati inclusi. L'esempio di codice seguente, ad esempio, usa il metodo select per recuperare le colonne ProductName e ListPrice dal dataframe df contenente dati relativi ai prodotti nell'esempio precedente:
pricelist_df = df.select("ProductID", "ListPrice")
I risultati di questo esempio di codice avranno un aspetto simile al seguente:
ProductID | ListPrice |
---|---|
771 | 3399,9900 |
772 | 3399,9900 |
773 | 3399,9900 |
... | ... |
Analogamente alla maggior parte dei metodi di manipolazione dei dati, select restituisce un nuovo oggetto dataframe.
Suggerimento
La selezione di un subset di colonne da un dataframe è un'operazione comune, che può essere completata anche usando la sintassi più breve seguente:
pricelist_df = df["ProductID", "ListPrice"]
È possibile "concatenare" i metodi per eseguire una serie di manipolazioni che generano un dataframe trasformato. Questo codice di esempio concatena ad esempio i metodi select e where per creare un nuovo dataframe contenente le colonne ProductName e ListPrice per i prodotti con categoria Mountain Bikes o Road Bikes:
bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)
I risultati di questo esempio di codice avranno un aspetto simile al seguente:
ProductName | ListPrice |
---|---|
Mountain-100 argento, 38 | 3399,9900 |
Road-750 Black, 52 | 539.9900 |
... | ... |
Per raggruppare e aggregare i dati, è possibile usare il metodo groupBy e le funzioni di aggregazione. Il codice PySpark seguente conta ad esempio il numero di prodotti per ogni categoria:
counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)
I risultati di questo esempio di codice avranno un aspetto simile al seguente:
Category | numero. |
---|---|
Headsets | 3 |
Ruote | 14 |
Mountain Bikes | 32 |
... | ... |
Uso delle espressioni SQL in Spark
L'API Dataframe fa parte di una libreria Spark denominata Spark SQL, che consente agli analisti dei dati di usare espressioni SQL per eseguire query e modificare i dati.
Creazione di oggetti di database nel catalogo Spark
Il catalogo Spark è un metastore per oggetti dati relazionali, ad esempio viste e tabelle. Il runtime di Spark può usare il catalogo per integrare facilmente il codice scritto in qualsiasi linguaggio supportato da Spark con espressioni SQL che potrebbero risultare più naturali per alcuni analisti o sviluppatori di dati.
Uno dei modi più semplici per rendere disponibili i dati in un dataframe per l'esecuzione di query nel catalogo Spark consiste nel creare una visualizzazione temporanea, come illustrato nell'esempio di codice seguente:
df.createOrReplaceTempView("products")
Una vista è temporanea, ovvero viene eliminata automaticamente alla fine della sessione corrente. È anche possibile creare tabelle che vengono salvate in modo permanente nel catalogo per definire un database su cui è possibile eseguire query usando Spark SQL.
Nota
Le tabelle del catalogo Spark non verranno esaminate in modo approfondito in questo modulo, ma vale la pena evidenziare alcuni punti chiave:
- È possibile creare una tabella vuota usando il metodo
spark.catalog.createTable
. Le tabelle sono strutture di metadati che archiviano i dati sottostanti nel percorso di archiviazione associato al catalogo. L'eliminazione di una tabella comporta anche l'eliminazione dei dati sottostanti. - È possibile salvare un dataframe come tabella usando il relativo metodo
saveAsTable
. - È possibile creare una tabella esterna usando il metodo
spark.catalog.createExternalTable
. Le tabelle esterne definiscono i metadati nel catalogo, ma ottengono i dati sottostanti da una posizione di archiviazione esterna; in genere una cartella in un data lake. L'eliminazione di una tabella esterna non comporta l'eliminazione dei dati sottostanti.
Uso dell'API Spark SQL per l'esecuzione di query sui dati
È possibile usare l'API Spark SQL nel codice scritto in qualsiasi linguaggio per eseguire query sui dati nel catalogo. Il codice PySpark seguente usa ad esempio una query SQL per restituire dati dalla vista products come dataframe.
bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
FROM products \
WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)
I risultati dell'esempio di codice sono simili alla tabella seguente:
ProductID | ProductName | ListPrice |
---|---|---|
38 | Mountain-100 argento, 38 | 3399,9900 |
52 | Road-750 Black, 52 | 539.9900 |
... | ... | ... |
Uso di codice SQL
L'esempio precedente ha illustrato come usare l'API Spark SQL per incorporare espressioni SQL nel codice Spark. In un notebook è anche possibile usare la riga magic %%sql
per eseguire codice SQL che esegue query sugli oggetti nel catalogo, come illustrato di seguito:
%%sql
SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category
L'esempio di codice SQL restituisce un set di risultati visualizzato automaticamente nel notebook come tabella, analogamente a quanto riportato di seguito:
Categoria | ProductCount |
---|---|
Bib-Shorts | 3 |
Bike Racks | 1 |
Bike Stands | 1 |
... | ... |