Självstudie: Kör din första Delta Live Tables-pipeline
Den här självstudien visar hur du konfigurerar en Delta Live Tables-pipeline från kod i en Databricks-notebook-fil och kör pipelinen genom att utlösa en pipelineuppdatering. Den här självstudien innehåller en exempelpipeline för att mata in och bearbeta en exempeldatauppsättning med exempelkod med hjälp av Python- och SQL-gränssnitten. Du kan också använda anvisningarna i den här självstudien för att skapa en pipeline med alla notebook-filer med korrekt definierad Delta Live Tables-syntax.
Du kan konfigurera Delta Live Tables-pipelines och utlösa uppdateringar med hjälp av användargränssnittet för Azure Databricks-arbetsytan eller automatiserade verktygsalternativ som API, CLI, Databricks Asset Bundles eller som en uppgift i ett Databricks-arbetsflöde. För att bekanta dig med funktionerna och funktionerna i Delta Live Tables rekommenderar Databricks att du först använder användargränssnittet för att skapa och köra pipelines. När du konfigurerar en pipeline i användargränssnittet genererar Delta Live Tables dessutom en JSON-konfiguration för din pipeline som kan användas för att implementera dina programmatiska arbetsflöden.
För att demonstrera Delta Live Tables-funktioner hämtar exemplen i den här självstudien en offentligt tillgänglig datauppsättning. Databricks har dock flera sätt att ansluta till datakällor och mata in data som pipelines som implementerar verkliga användningsfall använder. Se Mata in data med Delta Live Tables.
Krav
Om du vill starta en pipeline måste du ha behörighet att skapa kluster eller komma åt en klusterprincip som definierar ett Delta Live Tables-kluster. Delta Live Tables-körningen skapar ett kluster innan din pipeline körs och misslyckas om du inte har rätt behörighet.
Om du vill använda exemplen i den här självstudien måste din arbetsyta ha Unity Catalog aktiverat.
Du måste ha följande behörigheter i Unity Catalog:
READ VOLUME
ochWRITE VOLUME
, ellerALL PRIVILEGES
, förmy-volume
volymen.USE SCHEMA
ellerALL PRIVILEGES
fördefault
schemat.USE CATALOG
ellerALL PRIVILEGES
förmain
katalogen.
Information om hur du anger dessa behörigheter finns i Behörigheter för Databricks-administratören eller Unity Catalog och skyddsbara objekt.
Exemplen i den här självstudien använder en Unity Catalog-volym för att lagra exempeldata. Om du vill använda dessa exempel skapar du en volym och använder volymens katalog-, schema- och volymnamn för att ange den volymsökväg som används av exemplen.
Kommentar
Om din arbetsyta inte har Unity Catalog aktiverat bifogas notebook-filer med exempel som inte kräver Unity Catalog i den här artikeln. Om du vill använda de här exemplen väljer du Hive metastore
som lagringsalternativ när du skapar pipelinen.
Var kör du Delta Live Tables-frågor?
Delta Live Tables-frågor implementeras främst i Databricks-notebook-filer, men Delta Live Tables är inte utformat för att köras interaktivt i notebook-celler. Om du kör en cell som innehåller Delta Live Tables-syntax i en Databricks-notebook-fil visas ett felmeddelande. Om du vill köra dina frågor måste du konfigurera dina notebook-filer som en del av en pipeline.
Viktigt!
- Du kan inte förlita dig på körningsordningen cell för cell för notebook-filer när du skriver frågor för Delta Live Tables. Delta Live Tables utvärderar och kör all kod som definierats i notebook-filer men har en annan körningsmodell än kommandot Kör alla i notebook-filer.
- Du kan inte blanda språk i en enda Delta Live Tables-källkodsfil. En notebook-fil kan till exempel bara innehålla Python-frågor eller SQL-frågor. Om du måste använda flera språk i en pipeline använder du flera språkspecifika notebook-filer eller filer i pipelinen.
Du kan också använda Python-kod som lagras i filer. Du kan till exempel skapa en Python-modul som kan importeras till dina Python-pipelines eller definiera användardefinierade Python-funktioner (UDF: er) som ska användas i SQL-frågor. Mer information om hur du importerar Python-moduler finns i Importera Python-moduler från Git-mappar eller arbetsytefiler. Mer information om hur du använder Python-UDF:er finns i Användardefinierade skalärfunktioner – Python.
Exempel: Mata in och bearbeta data om babynamn i New York
Exemplet i den här artikeln använder en offentligt tillgänglig datauppsättning som innehåller poster med babynamn i New York State. De här exemplen visar hur du använder en Delta Live Tables-pipeline för att:
- Läs rådata från en offentligt tillgänglig datauppsättning i en tabell.
- Läs posterna från rådatatabellen och använd Delta Live Tables-förväntningar för att skapa en ny tabell som innehåller rensade data.
- Använd de rensade posterna som indata till Delta Live Tables-frågor som skapar härledda datauppsättningar.
Den här koden visar ett förenklat exempel på medaljongarkitekturen. Se Vad är medallion lakehouse arkitektur?.
Implementeringar av det här exemplet tillhandahålls för Python - och SQL-gränssnitten . Du kan följa stegen för att skapa nya notebook-filer som innehåller exempelkoden, eller så kan du gå vidare till Skapa en pipeline och använda någon av de anteckningsböcker som finns på den här sidan.
Implementera en Delta Live Tables-pipeline med Python
Python-kod som skapar Delta Live Tables-datauppsättningar måste returnera DataFrames. För användare som inte är bekanta med Python och DataFrames rekommenderar Databricks att du använder SQL-gränssnittet. Se Implementera en Delta Live Tables-pipeline med SQL.
Alla Delta Live Tables Python-API:er implementeras i modulen dlt
. Din Delta Live Tables-pipelinekod som implementeras med Python måste uttryckligen importera modulen dlt
överst i Python-notebook-filer och -filer. Delta Live Tables skiljer sig från många Python-skript på ett viktigt sätt: du anropar inte de funktioner som utför datainmatning och transformering för att skapa Delta Live Tables-datauppsättningar. I stället tolkar Delta Live Tables dekoratörsfunktionerna från modulen dlt
i alla filer som läses in i en pipeline och skapar ett dataflödesdiagram.
Om du vill implementera exemplet i den här självstudien kopierar och klistrar du in följande Python-kod i en ny Python-anteckningsbok. Lägg till varje exempelkodfragment i sin egen cell i notebook-filen i den ordning som beskrivs. Mer information om hur du skapar notebook-filer finns i Skapa en notebook-fil.
När du skapar en pipeline med Python-gränssnittet definieras tabellnamn som standard av funktionsnamn. I följande Python-exempel skapas till exempel tre tabeller med namnet baby_names_raw
, baby_names_prepared
och top_baby_names_2021
. Du kan åsidosätta tabellnamnet med hjälp av parametern name
. Se Skapa en materialiserad vy eller en strömmande tabell i Delta Live Tables.
Viktigt!
Om du vill undvika oväntat beteende när pipelinen körs ska du inte inkludera kod som kan ha biverkningar i dina funktioner som definierar datauppsättningar. Mer information finns i Python-referensen.
Importera Delta Live Tables-modulen
Alla Delta Live Tables Python-API:er implementeras i modulen dlt
. Importera uttryckligen modulen dlt
överst i Python-notebook-filer och -filer.
I följande exempel visas den här importen, tillsammans med importinstruktioner för pyspark.sql.functions
.
import dlt
from pyspark.sql.functions import *
Ladda ned data
Om du vill hämta data för det här exemplet laddar du ned en CSV-fil och lagrar den på volymen enligt följande:
import os
os.environ["UNITY_CATALOG_VOLUME_PATH"] = "/Volumes/<catalog-name>/<schema-name>/<volume-name>/"
os.environ["DATASET_DOWNLOAD_URL"] = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
os.environ["DATASET_DOWNLOAD_FILENAME"] = "rows.csv"
dbutils.fs.cp(f"{os.environ.get('DATASET_DOWNLOAD_URL')}", f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}")
Ersätt <catalog-name>
, <schema-name>
och <volume-name>
med katalog-, schema- och volymnamnen för en Unity Catalog-volym.
Skapa en tabell från filer i objektlagring
Delta Live Tables stöder inläsning av data från alla format som stöds av Azure Databricks. Se Alternativ för dataformat.
Dekoratören @dlt.table
uppmanar Delta Live Tables att skapa en tabell som innehåller resultatet av en DataFrame
returnerad av en funktion. Lägg till dekoratören @dlt.table
före en Python-funktionsdefinition som returnerar en Spark DataFrame för att registrera en ny tabell i Delta Live Tables. I följande exempel visas hur du använder funktionsnamnet som tabellnamn och lägger till en beskrivande kommentar i tabellen:
@dlt.table(
comment="Popular baby first names in New York. This data was ingested from the New York State Department of Health."
)
def baby_names_raw():
df = spark.read.csv(f"{os.environ.get('UNITY_CATALOG_VOLUME_PATH')}{os.environ.get('DATASET_DOWNLOAD_FILENAME')}", header=True, inferSchema=True)
df_renamed_column = df.withColumnRenamed("First Name", "First_Name")
return df_renamed_column
Lägga till en tabell från en överordnad datauppsättning i pipelinen
Du kan använda dlt.read()
för att läsa data från andra datauppsättningar som deklarerats i din aktuella Delta Live Tables-pipeline. Om du deklarerar nya tabeller på det här sättet skapas ett beroende som Delta Live Tables automatiskt löser innan uppdateringar körs. Följande kod innehåller också exempel på övervakning och framtvingande av datakvalitet med förväntningar. Se Hantera datakvalitet med Delta Live Tables.
@dlt.table(
comment="New York popular baby first name data cleaned and prepared for analysis."
)
@dlt.expect("valid_first_name", "First_Name IS NOT NULL")
@dlt.expect_or_fail("valid_count", "Count > 0")
def baby_names_prepared():
return (
dlt.read("baby_names_raw")
.withColumnRenamed("Year", "Year_Of_Birth")
.select("Year_Of_Birth", "First_Name", "Count")
)
Skapa en tabell med berikade datavyer
Eftersom Delta Live Tables bearbetar uppdateringar av pipelines som en serie beroendediagram kan du deklarera mycket berikade vyer som driver instrumentpaneler, BI och analys genom att deklarera tabeller med specifik affärslogik.
Tabeller i Delta Live Tables motsvarar konceptuellt materialiserade vyer. Till skillnad från traditionella vyer på Spark som kör logik varje gång vyn efterfrågas, lagrar en Delta Live Tables-tabell den senaste versionen av frågeresultatet i datafiler. Eftersom Delta Live Tables hanterar uppdateringar för alla datauppsättningar i en pipeline kan du schemalägga pipelineuppdateringar för att matcha svarstidskraven för materialiserade vyer och veta att frågor mot dessa tabeller innehåller den senaste tillgängliga versionen av data.
Tabellen som definieras av följande kod visar den konceptuella likheten med en materialiserad vy som härleds från överordnade data i pipelinen:
@dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
dlt.read("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
.limit(10)
)
Information om hur du konfigurerar en pipeline som använder notebook-filen finns i Skapa en pipeline.
Implementera en Delta Live Tables-pipeline med SQL
Databricks rekommenderar Delta Live Tables med SQL som det bästa sättet för SQL-användare att skapa nya ETL-, inmatnings- och transformeringspipelines på Azure Databricks. SQL-gränssnittet för Delta Live Tables utökar Standard Spark SQL med många nya nyckelord, konstruktioner och tabellvärdesfunktioner. Dessa tillägg till standard-SQL gör det möjligt för användare att deklarera beroenden mellan datauppsättningar och distribuera infrastruktur i produktionsklass utan att lära sig nya verktyg eller ytterligare begrepp.
För användare som är bekanta med Spark DataFrames och som behöver stöd för mer omfattande testning och åtgärder som är svåra att implementera med SQL, till exempel metaprogramåtgärder, rekommenderar Databricks att använda Python-gränssnittet. Se Implementera en Delta Live Tables-pipeline med Python.
Ladda ned data
Om du vill hämta data för det här exemplet kopierar du följande kod, klistrar in den i en ny notebook-fil och kör sedan notebook-filen. Mer information om hur du skapar notebook-filer finns i Skapa en notebook-fil.
%sh
wget -O "/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv" "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
Ersätt <catalog-name>
, <schema-name>
och <volume-name>
med katalog-, schema- och volymnamnen för en Unity Catalog-volym.
Skapa en tabell från filer i Unity Catalog
I resten av det här exemplet kopierar du följande SQL-kodfragment och klistrar in dem i en ny SQL-notebook-fil, separat från notebook-filen i föregående avsnitt. Lägg till varje SQL-exempelfragment i sin egen cell i notebook-filen i den ordning som beskrivs.
Delta Live Tables stöder inläsning av data från alla format som stöds av Azure Databricks. Se Alternativ för dataformat.
Alla Delta Live Tables SQL-instruktioner använder CREATE OR REFRESH
syntax och semantik. När du uppdaterar en pipeline avgör Delta Live Tables om det logiskt korrekta resultatet för tabellen kan uppnås genom inkrementell bearbetning eller om fullständig omberäkning krävs.
I följande exempel skapas en tabell genom att data läses in från CSV-filen som lagras i Unity Catalog-volymen:
CREATE OR REFRESH MATERIALIZED VIEW baby_names_sql_raw
COMMENT "Popular baby first names in New York. This data was ingested from the New York State Department of Health."
AS SELECT Year, `First Name` AS First_Name, County, Sex, Count FROM read_files(
'/Volumes/<catalog-name>/<schema-name>/<volume-name>/babynames.csv',
format => 'csv',
header => true,
mode => 'FAILFAST')
Ersätt <catalog-name>
, <schema-name>
och <volume-name>
med katalog-, schema- och volymnamnen för en Unity Catalog-volym.
Lägga till en tabell från en överordnad datauppsättning i pipelinen
Du kan använda det live
virtuella schemat för att fråga efter data från andra datauppsättningar som deklarerats i din aktuella Delta Live Tables-pipeline. Om du deklarerar nya tabeller på det här sättet skapas ett beroende som Delta Live Tables automatiskt löser innan uppdateringar körs. Schemat live
är ett anpassat nyckelord implementerat i Delta Live Tables som kan ersättas med ett målschema om du vill publicera dina datauppsättningar. Se Använda Unity Catalog med dina Delta Live Tables-pipelines och Använd Delta Live Tables-pipelines med äldre Hive-metaarkiv.
Följande kod innehåller också exempel på övervakning och framtvingande av datakvalitet med förväntningar. Se Hantera datakvalitet med Delta Live Tables.
CREATE OR REFRESH MATERIALIZED VIEW baby_names_sql_prepared(
CONSTRAINT valid_first_name EXPECT (First_Name IS NOT NULL),
CONSTRAINT valid_count EXPECT (Count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "New York popular baby first name data cleaned and prepared for analysis."
AS SELECT
Year AS Year_Of_Birth,
First_Name,
Count
FROM live.baby_names_sql_raw;
Skapa en utökad datavy
Eftersom Delta Live Tables bearbetar uppdateringar av pipelines som en serie beroendediagram kan du deklarera mycket berikade vyer som driver instrumentpaneler, BI och analys genom att deklarera tabeller med specifik affärslogik.
Följande fråga använder en materialiserad vy för att skapa en berikad vy från uppströmsdata. Till skillnad från traditionella vyer på Spark som kör logik varje gång vyn efterfrågas lagrar materialiserade vyer den senaste versionen av frågeresultatet i datafiler. Eftersom Delta Live Tables hanterar uppdateringar för alla datauppsättningar i en pipeline kan du schemalägga pipelineuppdateringar för att matcha svarstidskraven för materialiserade vyer och veta att frågor mot dessa tabeller innehåller den senaste tillgängliga versionen av data.
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_sql_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM live.baby_names_sql_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
LIMIT 10;
Om du vill konfigurera en pipeline som använder notebook-filen fortsätter du till Skapa en pipeline.
Skapa en pipeline
Kommentar
- Eftersom beräkningsresurser hanteras fullständigt för serverlösa DLT-pipelines är beräkningsinställningarna inte tillgängliga när du väljer Serverlös för en pipeline.
- Information om berättigande och aktivering för serverlösa DLT-pipelines finns i Aktivera serverlös beräkning.
Delta Live Tables skapar pipelines genom att matcha beroenden som definierats i notebook-filer (kallas källkod) med hjälp av Delta Live Tables-syntax. Varje källkodsfil kan bara innehålla ett språk, men du kan blanda källkoden för olika språk i pipelinen.
- Klicka på Delta Live Tables (Delta Live Tables ) i sidofältet och klicka på Skapa pipeline.
- Ge pipelinen ett namn.
- (Valfritt) Om du vill köra pipelinen med serverlösa DLT-pipelines markerar du kryssrutan Serverlös . När du väljer Serverlös tas beräkningsinställningarna bort från användargränssnittet. Se Konfigurera en serverlös Delta Live Tables-pipeline.
- (Valfritt) Välj en produktutgåva.
- Välj Utlös för Pipeline-läge.
- Konfigurera en eller flera notebook-filer som innehåller källkoden för pipelinen. I textrutan Sökvägar anger du sökvägen till en anteckningsbok eller klickar på för att välja en notebook-fil.
- Välj ett mål för datauppsättningar som publiceras av pipelinen, antingen Hive-metaarkivet eller Unity-katalogen. Se Publicera datauppsättningar.
- Hive-metaarkiv:
- (Valfritt) Ange en lagringsplats för utdata från pipelinen. Systemet använder en standardplats om du lämnar lagringsplatsen tom.
- (Valfritt) Ange ett målschema för att publicera datamängden till Hive-metaarkivet.
- Unity Catalog: Ange en katalog och ett målschema för att publicera datamängden till Unity Catalog.
- Hive-metaarkiv:
- (Valfritt) Om du inte har valt Serverlös kan du konfigurera beräkningsinställningar för pipelinen. Mer information om alternativ för beräkningsinställningar finns i Konfigurera beräkning för en Delta Live Tables-pipeline.
- (Valfritt) Klicka på Lägg till meddelande för att konfigurera en eller flera e-postadresser för att ta emot meddelanden om pipelinehändelser. Se Lägga till e-postaviseringar för pipelinehändelser.
- (Valfritt) Konfigurera avancerade inställningar för pipelinen. Mer information om alternativ för avancerade inställningar finns i Konfigurera en Delta Live Tables-pipeline.
- Klicka på Skapa.
Sidan Pipelineinformation visas när du har klickat på Skapa. Du kan också komma åt din pipeline genom att klicka på pipelinenamnet på fliken Delta Live Tables (Delta Live Tables ).
Starta en pipelineuppdatering
Om du vill starta en uppdatering för en pipeline klickar du på knappen i den övre panelen. Systemet returnerar ett meddelande som bekräftar att pipelinen startas.
När uppdateringen har startats är Delta Live Tables-systemet:
- Startar ett kluster med en klusterkonfiguration som skapats av Delta Live Tables-systemet. Du kan också ange en anpassad klusterkonfiguration.
- Skapar tabeller som inte finns och ser till att schemat är korrekt för alla befintliga tabeller.
- Uppdaterar tabeller med de senaste tillgängliga data.
- Stänger av klustret när uppdateringen är klar.
Kommentar
Körningsläget är inställt på Produktion som standard, vilket distribuerar tillfälliga beräkningsresurser för varje uppdatering. Du kan använda utvecklingsläget för att ändra det här beteendet så att samma beräkningsresurser kan användas för flera pipelineuppdateringar under utveckling och testning. Se Utvecklings- och produktionslägen.
Publicera datauppsättningar
Du kan göra Delta Live Tables-datauppsättningar tillgängliga för frågor genom att publicera tabeller till Hive-metaarkivet eller Unity Catalog. Om du inte anger ett mål för publicering av data kan tabeller som skapats i Delta Live Tables-pipelines endast nås av andra åtgärder i samma pipeline. Se Använda Delta Live Tables-pipelines med äldre Hive-metaarkiv och Använd Unity Catalog med dina Delta Live Tables-pipelines.
Exempel på notebook-filer för källkod
Du kan importera dessa notebook-filer till en Azure Databricks-arbetsyta och använda dem för att distribuera en Delta Live Tables-pipeline. Se Skapa en pipeline.
Kom igång med Delta Live Tables Python Notebook
Kom igång med Delta Live Tables SQL Notebook
Exempel på källkodsanteckningsböcker för arbetsytor utan Unity Catalog
Du kan importera dessa notebook-filer till en Azure Databricks-arbetsyta utan att Unity Catalog är aktiverat och använda dem för att distribuera en Delta Live Tables-pipeline. Se Skapa en pipeline.