Eventi
Ottieni gratuitamente la certificazione in Microsoft Fabric.
19 nov, 23 - 10 dic, 23
Per un periodo di tempo limitato, il team della community di Microsoft Fabric offre buoni per esami DP-600 gratuiti.
Prepara oraQuesto browser non è più supportato.
Esegui l'aggiornamento a Microsoft Edge per sfruttare i vantaggi di funzionalità più recenti, aggiornamenti della sicurezza e supporto tecnico.
Questa esercitazione mostra come caricare e trasformare i dati usando API DataFrame di Apache Spark Python (PySpark), API DataFrame di Apache Scala o API SparkR SparkDataFrame.
Al termine di questa esercitazione si comprenderà che cos'è un DataFrame e si avrà familiarità con i task seguenti:
Vedere anche le Informazioni di riferimento sulle API PySpark per Apache Spark.
Vedere anche le Informazioni di riferimento sulle API Scala per Apache Spark.
Vedere anche le Informazioni di riferimento sulle API SparkR per Apache Spark.
Il DataFrame è una struttura di dati bidimensionale etichettata con colonne di tipi potenzialmente diversi. Si può pensare a un DataFrame come a un foglio di calcolo, una tabella SQL o un dizionario di oggetti serie. I DataFrame Apache Spark offrono una set completo di funzioni (selezione colonne, filtro, join, aggregazione) che consentono di risolvere in modo efficiente i problemi più comuni di analisi dei dati.
I DataFrames di Apache Spark sono un'astrazione basata su Set di dati distribuiti resilienti (RDD). I DataFrame Spark e Spark SQL usano un motore di pianificazione e ottimizzazione unificato, che consente di ottenere prestazioni quasi identiche in tutti i linguaggi supportati in Azure Databricks (Python, SQL, Scala e R).
Per completare l’esercitazione seguente, è necessario soddisfare questi requisiti:
Per usare gli esempi in questa esercitazione, l'area di lavoro deve avere Unity Catalog abilitato.
Gli esempi in questa esercitazione usano un volume di Unity Catalog per archiviare i dati di esempio. Per usare questi esempi, creare un volume e usare i nomi di catalogo, schema e volume per impostare il percorso del volume usato dagli esempi.
È necessario disporre delle autorizzazioni seguenti in Unity Catalog:
READ VOLUME
e WRITE VOLUME
o ALL PRIVILEGES
per il volume usato per questa esercitazione.USE SCHEMA
oppure ALL PRIVILEGES
per lo schema usato per questa esercitazione.USE CATALOG
oppure ALL PRIVILEGES
per il catalogo usato per questa esercitazione.Per impostare queste autorizzazioni, vedere l'amministratore di Databricks o i privilegi di Unity Catalog e gli oggetti a protezione diretta.
Suggerimento
Per un Notebook completo per questo articolo, vedere Notebook dell'esercitazione sul DataFrame.
Questo passaggio definisce le variabili da usare in questa esercitazione e quindi carica un file CSV contenente i dati dei nomi dei bambini da health.data.ny.gov nel volume di Unity Catalog.
Aprire un nuovo Notebook facendo clic sull'icona . Per informazioni su come esplorare i Notebook di Azure Databricks, vedere Interfaccia e controlli dei Notebook di Databricks.
Copiare e incollare il codice seguente nella nuova cella vuota del Notebook. Sostituire <catalog-name>
, <schema-name>
e <volume-name>
con i nomi di catalogo, schema e volume per un volume di Unity Catalog. Sostituire <table_name>
con il nome tabella desiderato. I dati relativi al nome del bambino verranno caricati in questa tabella più avanti in questa esercitazione.
catalog = "<catalog_name>"
schema = "<schema_name>"
volume = "<volume_name>"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
file_name = "rows.csv"
table_name = "<table_name>"
path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
path_table = catalog + "." + schema
print(path_table) # Show the complete path
print(path_volume) # Show the complete path
val catalog = "<catalog_name>"
val schema = "<schema_name>"
val volume = "<volume_name>"
val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
val fileName = "rows.csv"
val tableName = "<table_name>"
val pathVolume = s"/Volumes/$catalog/$schema/$volume"
val pathTable = s"$catalog.$schema"
print(pathVolume) // Show the complete path
print(pathTable) // Show the complete path
catalog <- "<catalog_name>"
schema <- "<schema_name>"
volume <- "<volume_name>"
download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
file_name <- "rows.csv"
table_name <- "<table_name>"
path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
path_table <- paste(catalog, ".", schema, sep = "")
print(path_volume) # Show the complete path
print(path_table) # Show the complete path
Premere Shift+Enter
per eseguire la cella e creare una nuova cella vuota.
Copiare e incollare il codice seguente nella nuova cella vuota del Notebook. Questo codice copia il file rows.csv
da health.data.ny.gov nel volume di Unity Catalog usando il comando Databricks dbutuils.
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Premere Shift+Enter
per eseguire la cella e quindi passare alla cella successiva.
Questo passaggio crea un DataFrame denominato df1
con dati di test e quindi ne visualizza il contenuto.
Copiare e incollare il codice seguente nella nuova cella vuota del Notebook. Questo codice crea il DataFrame con i dati di test e quindi visualizza il contenuto e lo schema del DataFrame.
data = [[2021, "test", "Albany", "M", 42]]
columns = ["Year", "First_Name", "County", "Sex", "Count"]
df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
# df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
val data = Seq((2021, "test", "Albany", "M", 42))
val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
val df1 = data.toDF(columns: _*)
display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
// df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
# Load the SparkR package that is already preinstalled on the cluster.
library(SparkR)
data <- data.frame(
Year = as.integer(c(2021)),
First_Name = c("test"),
County = c("Albany"),
Sex = c("M"),
Count = as.integer(c(42))
)
df1 <- createDataFrame(data)
display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
# head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Premere Shift+Enter
per eseguire la cella e quindi passare alla cella successiva.
Questo passaggio crea un DataFrame denominato df_csv
dal file CSV caricato in precedenza nel volume di Unity Catalog. Vedere spark.read.csv.
Copiare e incollare il codice seguente nella nuova cella vuota del Notebook. Questo codice carica i dati del nome del bambino nel DataFrame df_csv
dal file CSV e quindi visualizza il contenuto del DataFrame.
df_csv = spark.read.csv(f"{path_volume}/{file_name}",
header=True,
inferSchema=True,
sep=",")
display(df_csv)
val dfCsv = spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ",")
.csv(s"$pathVolume/$fileName")
display(dfCsv)
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
source="csv",
header = TRUE,
inferSchema = TRUE,
delimiter = ",")
display(df_csv)
Premere Shift+Enter
per eseguire la cella e quindi passare alla cella successiva.
È possibile caricare i dati da molti formati di file supportati.
Visualizzare e interagire con i DataFrame dei bambini usando i metodi seguenti.
Informazioni su come visualizzare lo schema di un DataFrame Apache Spark. Apache Spark usa il termine schema per fare riferimento ai nomi e ai tipi di dati delle colonne nel DataFrame.
Nota
Anche Azure Databricks usa il termine schema per descrivere una raccolta di tabelle registrate in un catalogo.
Copiare e incollare il codice seguente in una nuova cella vuota di codice del Notebook. Questo codice mostra lo schema dei DataFrame con il metodo .printSchema()
per visualizzare gli schemi dei due DataFrame, per prepararsi all'unione dei due DataFrame.
df_csv.printSchema()
df1.printSchema()
dfCsv.printSchema()
df1.printSchema()
printSchema(df_csv)
printSchema(df1)
Premere Shift+Enter
per eseguire la cella e quindi passare alla cella successiva.
Informazioni su come rinominare una colonna in un DataFrame.
Copiare e incollare il codice seguente in una nuova cella vuota di codice del Notebook. Questo codice rinomina una colonna nel DataFrame df1_csv
in modo che corrisponda alla rispettiva colonna nel DataFrame df1
. Questo codice usa il metodo Apache Spark withColumnRenamed()
.
df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
dfCsvRenamed.printSchema()
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)
Premere Shift+Enter
per eseguire la cella e quindi passare alla cella successiva.
Informazioni su come creare un nuovo DataFrame che aggiunge le righe di un DataFrame a un altro.
Copiare e incollare il codice seguente in una nuova cella vuota di codice del Notebook. Questo codice usa il metodo Apache Spark union()
per combinare il contenuto del primo DataFrame df
con il DataFrame df_csv
contenente i dati dei nomi dei bambini caricati dal file CSV.
df = df1.union(df_csv)
display(df)
val df = df1.union(dfCsvRenamed)
display(df)
display(df <- union(df1, df_csv))
Premere Shift+Enter
per eseguire la cella e quindi passare alla cella successiva.
Individuare i nomi dei bambini più diffusi nel set di dati filtrando le righe usando i metodi Apache Spark .filter()
o .where()
. Usare il filtro per selezionare un sottoinsieme di righe da restituire o modificare in un DataFrame. Non esiste alcuna differenza nelle prestazioni o nella sintassi, come illustrato negli esempi seguenti.
Copiare e incollare il codice seguente in una nuova cella vuota di codice del Notebook. Questo codice usa il metodo Apache Spark .filter()
per visualizzare tali righe nel DataFrame con un conteggio di più di 50.
display(df.filter(df["Count"] > 50))
display(df.filter(df("Count") > 50))
display(filteredDF <- filter(df, df$Count > 50))
Premere Shift+Enter
per eseguire la cella e quindi passare alla cella successiva.
Copiare e incollare il codice seguente in una nuova cella vuota di codice del Notebook. Questo codice usa il metodo Apache Spark .where()
per visualizzare tali righe nel DataFrame con un conteggio di più di 50.
display(df.where(df["Count"] > 50))
display(df.where(df("Count") > 50))
display(filtered_df <- where(df, df$Count > 50))
Premere Shift+Enter
per eseguire la cella e quindi passare alla cella successiva.
Informazioni sulla frequenza dei nomi dei bambini con il metodo select()
per specificare le colonne del DataFrame da restituire. Usare le funzioni Apache Spark orderby
e desc
per ordinare i risultati.
Il modulo pyspark.sql per Apache Spark fornisce il supporto per le funzioni SQL. Tra queste funzioni usate in questa esercitazione sono le funzioni Apache Spark orderBy()
, desc()
e expr()
. Per abilitare l'uso di queste funzioni, importarle nella sessione in base alle esigenze.
Copiare e incollare il codice seguente in una nuova cella vuota di codice del Notebook. Questo codice importa la funzione desc()
e quindi usa il metodo Apache Spark select()
e Apache Spark orderBy()
e le funzioni desc()
per visualizzare i nomi più comuni e i relativi conteggi in ordine decrescente.
from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Premere Shift+Enter
per eseguire la cella e quindi passare alla cella successiva.
Informazioni su come creare un sottoinsieme di DataFrame da un DataFrame esistente.
Copiare e incollare il codice seguente in una nuova cella vuota di codice del Notebook. Questo codice usa il metodo Apache Spark filter
per creare un nuovo SataFrame limitando i dati per anno, conteggio e sesso. Usare il metodo Apache Spark select()
per limitare le colonne. Usare anche Apache Spark orderBy()
e le funzioni desc()
per ordinare il nuovo DataFrame in base al conteggio.
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)
Premere Shift+Enter
per eseguire la cella e quindi passare alla cella successiva.
Informazioni su come salvare un DataFrame. È possibile salvare il DataFrame in una tabella o scrivere il DataFrame in un file o in più file.
Azure Databricks usa il formato Delta Lake per tutte le tabelle per impostazione predefinita. Per salvare il DataFrame, è necessario disporre dei privilegi di tabella CREATE
per il catalogo e lo schema.
Copiare e incollare il codice seguente in una nuova cella vuota di codice del Notebook. Questo codice salva il contenuto del DaFaframe in una tabella usando la variabile definita all'inizio di questa esercitazione.
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
Premere Shift+Enter
per eseguire la cella e quindi passare alla cella successiva.
La maggior parte delle applicazioni Apache Spark funziona su set di dati di grandi dimensioni e in modo distribuito. Apache Spark scrive una directory di file anziché un singolo file. Delta Lake suddivide le cartelle e i file Parquet. Molti sistemi dati possono leggere queste directory di file. Azure Databricks consiglia di usare tabelle su percorsi di file per la maggior parte delle applicazioni.
Copiare e incollare il codice seguente in una nuova cella vuota di codice del Notebook. Questo codice salva il DataFrame in una directory di file JSON.
df.write.format("json").mode("overwrite").save("/tmp/json_data")
df.write.format("json").mode("overwrite").save("/tmp/json_data")
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Premere Shift+Enter
per eseguire la cella e quindi passare alla cella successiva.
Informazioni su come usare il metodo Apache Spark spark.read.format()
per leggere i dati JSON da una directory in un DataFrame.
Copiare e incollare il codice seguente in una nuova cella vuota di codice del Notebook. Questo codice visualizza i file JSON salvati nell'esempio precedente.
display(spark.read.format("json").json("/tmp/json_data"))
display(spark.read.format("json").json("/tmp/json_data"))
display(read.json("/tmp/json_data"))
Premere Shift+Enter
per eseguire la cella e quindi passare alla cella successiva.
I DataFrame Apache Spark offrono le opzioni seguenti per combinare SQL con PySpark, Scala e R. È possibile eseguire il codice seguente nello stesso Notebook creato per questa esercitazione.
Informazioni su come usare il metodo Apache Spark selectExpr()
. Si tratta di una variante del metodo select()
che accetta espressioni SQL e restituisce un DataFrame aggiornato. Questo metodo consente di usare un'espressione SQL, ad esempio upper
.
Copiare e incollare il codice seguente in una nuova cella vuota di codice del Notebook. Questo codice usa il metodo Apache Spark selectExpr()
e l'espressione SQL upper
per convertire una colonna stringa in lettere maiuscole (e rinominare la colonna).
display(df.selectExpr("Count", "upper(County) as big_name"))
display(df.selectExpr("Count", "upper(County) as big_name"))
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Premere Shift+Enter
per eseguire la cella e quindi passare alla cella successiva.
Informazioni su come importare e usare la funzione Apache Spark expr()
per usare la sintassi SQL in qualsiasi punto in cui sarebbe specificata una colonna.
Copiare e incollare il codice seguente in una nuova cella vuota di codice del Notebook. Questo codice importa la funzione expr()
e quindi usa la funzione Apache Spark expr()
e l'espressione SQL lower
per convertire una colonna stringa in lettere minuscole (e rinominare la colonna).
from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))
import org.apache.spark.sql.functions.{col, expr}
// Scala requires us to import the col() function as well as the expr() function
display(df.select(col("Count"), expr("lower(County) as little_name")))
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Premere Shift+Enter
per eseguire la cella e quindi passare alla cella successiva.
Informazioni su come usare la funzione Apache Spark spark.sql()
per eseguire query SQL arbitrarie.
Copiare e incollare il codice seguente in una nuova cella vuota di codice del Notebook. Questo codice usa la funzione Apache Spark spark.sql()
per eseguire query su una tabella SQL usando la sintassi SQL.
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
Premere Shift+Enter
per eseguire la cella e quindi passare alla cella successiva.
I Notebook seguenti includono le query di esempio di questa esercitazione.
Eventi
Ottieni gratuitamente la certificazione in Microsoft Fabric.
19 nov, 23 - 10 dic, 23
Per un periodo di tempo limitato, il team della community di Microsoft Fabric offre buoni per esami DP-600 gratuiti.
Prepara ora