Esercitazione: Eseguire una pipeline di analisi end-to-end lakehouse

Questa esercitazione illustra come configurare una pipeline di analisi end-to-end per un lakehouse di Azure Databricks.

Importante

Questa esercitazione usa notebook interattivi per completare attività ETL comuni in Python su cluster abilitati del catalogo Unity. Se non si usa il catalogo Unity, vedere Eseguire il primo carico di lavoro ETL in Azure Databricks.

Attività in questa esercitazione

Al termine di questo articolo, si avrà familiarità con:

  1. Avvio di un cluster di elaborazione abilitato per catalogo Unity.
  2. Creazione di un notebook di Databricks.
  3. Scrittura e lettura di dati da una posizione esterna del catalogo Unity.
  4. Configurazione dell'inserimento incrementale dei dati in una tabella del catalogo Unity con il caricatore automatico.
  5. Esecuzione di celle del notebook per elaborare, esecuzione di query e visualizzazione dei dati in anteprima.
  6. Pianificazione di un notebook come processo di Databricks.
  7. Esecuzione di query sulle tabelle del catalogo Unity da Databricks SQL

Azure Databricks offre una suite di strumenti pronti per la produzione che consentono ai professionisti dei dati di sviluppare e distribuire rapidamente pipeline di estrazione, trasformazione e caricamento (ETL). Il catalogo Unity consente agli amministratori dei dati di configurare e proteggere le credenziali di archiviazione, i percorsi esterni e gli oggetti di database per gli utenti in un'organizzazione. Databricks SQL consente agli analisti di eseguire query SQL sulle stesse tabelle usate nei carichi di lavoro ETL di produzione, consentendo funzionalità di business intelligence in tempo reale su larga scala.

È anche possibile usare tabelle live Delta per compilare pipeline ETL. Databricks ha creato tabelle live Delta per ridurre la complessità della compilazione, della distribuzione e della gestione delle pipeline ETL di produzione. Consultare Esercitazione: Eseguire la prima pipeline di tabelle live Delta.

Requisiti

Nota

Se non si dispone dei privilegi di controllo del cluster, è comunque possibile completare la maggior parte dei passaggi seguenti purché si abbia accesso a un cluster.

Passaggio 1: Creare un cluster

Per eseguire l'analisi esplorativa dei dati e la progettazione dei dati, creare un cluster per fornire le risorse di calcolo necessarie per eseguire i comandi.

  1. Nella barra laterale fare clic su icona dell’ambiente di calcolo Ambiente di calcolo.
  2. Fare clic su Nuova icona Nuovo nella barra laterale e quindi selezionare Cluster. In questo modo si apre la pagina Nuovo cluster/Calcolo.
  3. Specificare un nome univoco per il cluster.
  4. Selezionare il pulsante di opzione Nodo singolo .
  5. Selezionare Utente singolo dall'elenco a discesa Modalità di accesso.
  6. Assicurarsi che l'indirizzo di posta elettronica sia visibile nel campo Utente singolo.
  7. Selezionare la versione desiderata del Databricks runtime , 11.1 o versione successiva per usare il catalogo Unity.
  8. Per creare il cluster fare clic su Crea elaborazione.

Per altre informazioni sui cluster Databricks, consultare Ambiente di calcolo.

Passaggio 2: Creare un notebook in Databricks

Per creare un Notebook nell'area di lavoro, fare clic su Nuova icona Nuovo nella barra laterale e quindi su Notebook. Viene aperto un Notebook vuoto nell'area di lavoro.

Per altre informazioni sulla creazione e la gestione dei Notebook, vedere Gestire i Notebook.

Passaggio 3: Scrivere e leggere dati da una posizione esterna gestita dal catalogo Unity

Databricks consiglia di usare il caricatore automatico per l'inserimento incrementale dei dati. Il caricatore automatico rileva ed elabora automaticamente nuovi file quando arrivano nell'archiviazione di oggetti cloud.

Usare il catalogo Unity per gestire l'accesso sicuro a posizioni esterne. Gli utenti o le entità servizio con READ FILES autorizzazioni per una posizione esterna possono usare il caricatore automatico per inserire i dati.

In genere, i dati arriveranno in una posizione esterna a causa delle scritture da altri sistemi. In questa demo è possibile simulare l'arrivo dei dati scrivendo file JSON in un percorso esterno.

Copiare il codice seguente in una cella del notebook. Sostituire il valore stringa per catalog con il nome di un catalogo con CREATE CATALOG le autorizzazioni e USE CATALOG . Sostituire il valore stringa per external_location con il percorso di un percorso esterno con READ FILESle autorizzazioni , WRITE FILESe CREATE EXTERNAL TABLE .

I percorsi esterni possono essere definiti come un intero contenitore di archiviazione, ma spesso puntano a una directory annidata in un contenitore.

Il formato corretto per un percorso esterno è "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location".


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

L'esecuzione di questa cella deve stampare una riga che legge 12 byte, stampare la stringa "Hello world!" e visualizzare tutti i database presenti nel catalogo fornito. Se non è possibile eseguire questa cella, verificare di entrare in un'area di lavoro abilitata per catalogo Unity e richiedere le autorizzazioni appropriate all'amministratore dell'area di lavoro per completare questa esercitazione.

Il codice Python seguente usa l'indirizzo di posta elettronica per creare un database univoco nel catalogo fornito e un percorso di archiviazione univoco nella posizione esterna fornita. L'esecuzione di questa cella rimuoverà tutti i dati associati a questa esercitazione, consentendo di eseguire questo esempio in modo idempotente. Viene definita e creata un'istanza di una classe che verrà usata per simulare batch di dati in arrivo da un sistema connesso alla posizione esterna di origine.

Copiare questo codice in una nuova cella del notebook ed eseguirlo per configurare l'ambiente.

Nota

Le variabili definite in questo codice devono consentire di eseguirla in modo sicuro senza rischiare conflitti con gli asset dell'area di lavoro esistenti o altri utenti. Le autorizzazioni di rete o archiviazione limitate genereranno errori durante l'esecuzione di questo codice; contattare l'amministratore dell'area di lavoro per risolvere questi problemi.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

È ora possibile inserire un batch di dati copiando il codice seguente in una cella ed eseguendolo. È possibile eseguire manualmente questa cella fino a 60 volte per attivare l'arrivo di nuovi dati.

RawData.land_batch()

Passaggio 4: Configurare il caricatore automatico per inserire dati nel catalogo Unity

Databricks consiglia di archiviare i dati con Delta Lake. Delta Lake è un livello di archiviazione open source che fornisce transazioni ACID e abilita data lakehouse. Delta Lake è il formato predefinito per le tabelle create in Databricks.

Per configurare il caricatore automatico per inserire i dati in una tabella del catalogo Unity, copiare e incollare il codice seguente nella cella vuota del notebook:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(source)
  .select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

Per altre informazioni sul caricatore automatico, consultare Che cos'è il caricatore automatico?.

Per altre informazioni sullo streaming strutturato con il catalogo Unity, vedere Uso del catalogo Unity con Structured Streaming.

Passaggio 5: Elaborare e interagire con i dati

I notebook eseguono la logica cell-by-cell. Seguire questi passaggi per eseguire la logica nella cella:

  1. Per eseguire la cella completata nel passaggio precedente, selezionare la cella e premere MAIUSC+INVIO.

  2. Per eseguire una query sulla tabella appena creata, copiare e incollare il seguente codice in una cella vuota, quindi premere MAIUSC+INVIO per eseguire la cella.

    df = spark.read.table(table)
    
  3. Per visualizzare in anteprima i dati nel dataframe, copiare e incollare il seguente codice in una cella vuota, quindi premere MAIUSC+INVIO per eseguire la cella.

    display(df)
    

Per altre informazioni sulle opzioni interattive per la visualizzazione dei dati, consultare Visualizzazioni nei notebook di Databricks.

Passaggio 6: Pianificare un lavoro

È possibile eseguire notebook di Databricks come script di produzione aggiungendoli come attività in un processo di Databricks. In questo passaggio verrà creato un nuovo processo che è possibile attivare manualmente.

Per pianificare il notebook come attività:

  1. Fare clic su Pianifica sul lato destro della barra di intestazione.
  2. Immettere un nome univoco in Nome del processo.
  3. Fare clic su Manuale.
  4. Nell'elenco a discesa Cluster selezionare il cluster creato nel passaggio 1.
  5. Cliccare su Crea.
  6. Nella finestra visualizzata fare clic su Esegui adesso.
  7. Per visualizzare i risultati dell'esecuzione del processo, fare clic sull'icona Collegamento esterno accanto al timestamp Ultima esecuzione.

Per altre informazioni sui processi, consultare Che cosa sono i processi di Databricks?.

Passaggio 7: Eseguire query sulla tabella da Databricks SQL

Chiunque disponga dell'autorizzazione USE CATALOG per il catalogo corrente, l'autorizzazione USE SCHEMA per lo schema corrente e SELECT le autorizzazioni per la tabella possono eseguire query sul contenuto della tabella dall'API Databricks preferita.

È necessario accedere a un'istanza di SQL Warehouse in esecuzione per eseguire query in Databricks SQL.

La tabella creata in precedenza in questa esercitazione ha il nome target_table. È possibile eseguire una query usando il catalogo fornito nella prima cella e il database con il padre e2e_lakehouse_<your-username>. È possibile usare Esplora cataloghi per trovare gli oggetti dati creati.

Integrazioni aggiuntive

Altre informazioni sulle integrazioni e sugli strumenti per la progettazione dei dati con Azure Databricks: