Delta Live Tables Python-språkreferens
Den här artikeln innehåller information om Programmeringsgränssnittet för Delta Live Tables Python.
Information om SQL-API:et finns i sql-språkreferensen Delta Live Tables.
Mer information om hur du konfigurerar automatisk inläsning finns i Vad är automatisk inläsning?.
Innan du börjar
Följande är viktiga överväganden när du implementerar pipelines med Delta Live Tables Python-gränssnittet:
- Eftersom Python
table()
ochview()
funktionerna anropas flera gånger under planeringen och körningen av en pipelineuppdatering ska du inte inkludera kod i någon av dessa funktioner som kan ha biverkningar (till exempel kod som ändrar data eller skickar ett e-postmeddelande). För att undvika oväntat beteende bör dina Python-funktioner som definierar datauppsättningar endast innehålla den kod som krävs för att definiera tabellen eller vyn. - Om du vill utföra åtgärder som att skicka e-post eller integrera med en extern övervakningstjänst, särskilt i funktioner som definierar datauppsättningar, använder du händelsekrokar. Om du implementerar dessa åtgärder i de funktioner som definierar dina datauppsättningar kan det orsaka oväntat beteende.
- Python
table
ochview
funktionerna måste returnera en DataFrame. Vissa funktioner som körs på DataFrames returnerar inte DataFrames och bör inte användas. Dessa åtgärder omfattar funktioner somcollect()
,count()
,toPandas()
,save()
ochsaveAsTable()
. Eftersom DataFrame-transformeringar körs efter att det fullständiga dataflödesdiagrammet har lösts kan användning av sådana åtgärder ha oavsiktliga biverkningar.
dlt
Importera Python-modulen
Python-funktioner för Delta Live Tables definieras i modulen dlt
. Dina pipelines som implementeras med Python-API:et måste importera den här modulen:
import dlt
Skapa en materialiserad vy eller en strömmande tabell för Delta Live Tables
I Python avgör Delta Live Tables om en datauppsättning ska uppdateras som en materialiserad vy eller en strömmande tabell baserat på den definierande frågan. Dekoratören @table
kan användas för att definiera både materialiserade vyer och strömmande tabeller.
Om du vill definiera en materialiserad vy i Python gäller du @table
för en fråga som utför en statisk läsning mot en datakälla. Om du vill definiera en strömmande tabell gäller du @table
för en fråga som utför en direktuppspelningsläsning mot en datakälla eller använder funktionen create_streaming_table(). Båda datauppsättningstyperna har samma syntaxspecifikation enligt följande:
Kommentar
Om du vill använda cluster_by
argumentet för att aktivera flytande klustring måste din pipeline konfigureras för att använda förhandsgranskningskanalen.
import dlt
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
path="<storage-location-path>",
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
schema="schema-definition",
row_filter = "row-filter-clause",
temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Skapa en Delta Live Tables-vy
Om du vill definiera en vy i Python använder du dekoratören @view
. Precis som dekoratören @table
kan du använda vyer i Delta Live Tables för antingen statiska eller strömmande datauppsättningar. Följande är syntaxen för att definiera vyer med Python:
import dlt
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
return (<query>)
Exempel: Definiera tabeller och vyer
Om du vill definiera en tabell eller vy i Python använder du dekoratören @dlt.view
eller @dlt.table
på en funktion. Du kan använda funktionsnamnet eller parametern name
för att tilldela tabellen eller visningsnamnet. I följande exempel definieras två olika datauppsättningar: en vy med namnet taxi_raw
som tar en JSON-fil som indatakälla och en tabell med namnet filtered_data
som tar taxi_raw
vyn som indata:
import dlt
@dlt.view
def taxi_raw():
return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")
# Use the function name as the table name
@dlt.table
def filtered_data():
return dlt.read("taxi_raw").where(...)
# Use the name parameter as the table name
@dlt.table(
name="filtered_data")
def create_filtered_data():
return dlt.read("taxi_raw").where(...)
Exempel: Få åtkomst till en datauppsättning som definierats i samma pipeline
Förutom att läsa från externa datakällor kan du komma åt datauppsättningar som definierats i samma pipeline med funktionen Delta Live Tables read()
. I följande exempel visas hur du skapar en customers_filtered
datauppsättning med hjälp av read()
funktionen:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredA():
return dlt.read("customers_raw").where(...)
Du kan också använda spark.table()
funktionen för att komma åt en datauppsättning som definierats i samma pipeline. När du använder spark.table()
funktionen för att komma åt en datauppsättning som definierats i pipelinen, förbereder nyckelordet i funktionsargumentet LIVE
till datamängdens namn:
@dlt.table
def customers_raw():
return spark.read.format("csv").load("/data/customers.csv")
@dlt.table
def customers_filteredB():
return spark.table("LIVE.customers_raw").where(...)
Exempel: Läsa från en tabell som är registrerad i ett metaarkiv
Om du vill läsa data från en tabell som är registrerad i Hive-metaarkivet utelämnar du nyckelordet i funktionsargumentet LIVE
och kvalificerar eventuellt tabellnamnet med databasnamnet:
@dlt.table
def customers():
return spark.table("sales.customers").where(...)
Ett exempel på läsning från en Unity Catalog-tabell finns i Mata in data i en Unity Catalog-pipeline.
Exempel: Få åtkomst till en datauppsättning med hjälp av spark.sql
Du kan också returnera en datauppsättning med ett spark.sql
uttryck i en frågefunktion. Om du vill läsa från en intern datauppsättning förbereder du LIVE.
till datamängdens namn:
@dlt.table
def chicago_customers():
return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")
Skapa en tabell som ska användas som mål för strömningsåtgärder
create_streaming_table()
Använd funktionen för att skapa en måltabell för poster som matas ut av strömningsåtgärder, inklusive apply_changes(), apply_changes_from_snapshot()och @append_flow utdataposter.
Kommentar
Funktionerna create_target_table()
och create_streaming_live_table()
är inaktuella. Databricks rekommenderar att du uppdaterar befintlig kod för att använda create_streaming_table()
funktionen.
Kommentar
Om du vill använda cluster_by
argumentet för att aktivera flytande klustring måste din pipeline konfigureras för att använda förhandsgranskningskanalen.
create_streaming_table(
name = "<table-name>",
comment = "<comment>"
spark_conf={"<key>" : "<value", "<key" : "<value>"},
table_properties={"<key>" : "<value>", "<key>" : "<value>"},
partition_cols=["<partition-column>", "<partition-column>"],
cluster_by = ["<clustering-column>", "<clustering-column>"],
path="<storage-location-path>",
schema="schema-definition",
expect_all = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
row_filter = "row-filter-clause"
)
Argument |
---|
name Typ: str Tabellnamnet. Den här parametern krävs. |
comment Typ: str En valfri beskrivning för tabellen. |
spark_conf Typ: dict En valfri lista över Spark-konfigurationer för körning av den här frågan. |
table_properties Typ: dict En valfri lista över tabellegenskaper för tabellen. |
partition_cols Typ: array En valfri lista över en eller flera kolumner som ska användas för partitionering av tabellen. |
cluster_by Typ: array Du kan också aktivera flytande klustring i tabellen och definiera de kolumner som ska användas som klustringsnycklar. Se Använda flytande klustring för Delta-tabeller. |
path Typ: str En valfri lagringsplats för tabelldata. Om det inte anges är systemet standard för lagringsplatsen för pipelinen. |
schema Typ: str eller StructType En valfri schemadefinition för tabellen. Scheman kan definieras som en SQL DDL-sträng eller med en Python StructType . |
expect_all expect_all_or_drop expect_all_or_fail Typ: dict Valfria datakvalitetsbegränsningar för tabellen. Se flera förväntningar. |
row_filter (Offentlig förhandsversion)Typ: str En valfri radfiltersats för tabellen. Se Publicera tabeller med radfilter och kolumnmasker. |
Kontrollera hur tabeller materialiseras
Tabeller ger också ytterligare kontroll över materialiseringen:
- Ange hur tabeller partitioneras med .
partition_cols
Du kan använda partitionering för att påskynda frågor. - Du kan ange tabellegenskaper när du definierar en vy eller tabell. Se Tabellegenskaper för Delta Live Tables.
- Ange en lagringsplats för tabelldata med hjälp av inställningen
path
. Som standard lagras tabelldata på lagringsplatsen för pipelinen ompath
de inte har angetts. - Du kan använda genererade kolumner i schemadefinitionen. Se Exempel: Ange ett schema och partitionskolumner.
Kommentar
För tabeller som är mindre än 1 TB i storlek rekommenderar Databricks att Delta Live Tables kan styra dataorganisationen. Du bör inte ange partitionskolumner om du inte förväntar dig att tabellen ska växa mer än en terabyte.
Exempel: Ange ett schema och partitionskolumner
Du kan också ange ett tabellschema med hjälp av en Python StructType
- eller SQL DDL-sträng. När den anges med en DDL-sträng kan definitionen innehålla genererade kolumner.
I följande exempel skapas en tabell med namnet sales
med ett schema som angetts med hjälp av en Python StructType
:
sales_schema = StructType([
StructField("customer_id", StringType(), True),
StructField("customer_name", StringType(), True),
StructField("number_of_line_items", StringType(), True),
StructField("order_datetime", StringType(), True),
StructField("order_number", LongType(), True)]
)
@dlt.table(
comment="Raw data on sales",
schema=sales_schema)
def sales():
return ("...")
I följande exempel anges schemat för en tabell med hjälp av en DDL-sträng, definierar en genererad kolumn och definierar en partitionskolumn:
@dlt.table(
comment="Raw data on sales",
schema="""
customer_id STRING,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
""",
partition_cols = ["order_day_of_week"])
def sales():
return ("...")
Som standard härleder Delta Live Tables schemat från table
definitionen om du inte anger något schema.
Konfigurera en strömmande tabell för att ignorera ändringar i en källströmningstabell
Kommentar
- Flaggan
skipChangeCommits
fungerar bara medspark.readStream
funktionenoption()
. Du kan inte använda den här flaggan i endlt.read_stream()
funktion. - Du kan inte använda
skipChangeCommits
flaggan när källuppspelningstabellen definieras som mål för en apply_changes() -funktion.
Som standard kräver strömmande tabeller tilläggskällor. När en strömmande tabell använder en annan strömmande tabell som källa, och källströmningstabellen kräver uppdateringar eller borttagningar, till exempel GDPR-bearbetningen skipChangeCommits
"rätt att bli bortglömd", kan flaggan anges när du läser källströmningstabellen för att ignorera dessa ändringar. Mer information om den här flaggan finns i Ignorera uppdateringar och borttagningar.
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")
Exempel: Definiera tabellbegränsningar
Viktigt!
Tabellbegränsningar finns i offentlig förhandsversion.
När du anger ett schema kan du definiera primära och externa nycklar. Begränsningarna är informationsmässiga och tillämpas inte. Se BEGRÄNSNINGssatsen i SQL-språkreferensen.
I följande exempel definieras en tabell med en primär och sekundär nyckelbegränsning:
@dlt.table(
schema="""
customer_id STRING NOT NULL PRIMARY KEY,
customer_name STRING,
number_of_line_items STRING,
order_datetime STRING,
order_number LONG,
order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
"""
def sales():
return ("...")
Exempel: Definiera ett radfilter och en kolumnmask
Viktigt!
Radfilter och kolumnmasker finns i offentlig förhandsversion.
Om du vill skapa en materialiserad vy eller en strömmande tabell med ett radfilter och en kolumnmask använder du ROW FILTER-satsen och MASK-satsen. I följande exempel visas hur du definierar en materialiserad vy och en strömmande tabell med både ett radfilter och en kolumnmask:
@dlt.table(
schema="""
id int COMMENT 'This is the customer ID',
name string COMMENT 'This is the customer full name',
region string,
ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
""",
row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
return ("...")
Mer information om radfilter och kolumnmasker finns i Publicera tabeller med radfilter och kolumnmasker.
Egenskaper för Python Delta Live Tables
Följande tabeller beskriver de alternativ och egenskaper som du kan ange när du definierar tabeller och vyer med Delta Live Tables:
Kommentar
Om du vill använda cluster_by
argumentet för att aktivera flytande klustring måste din pipeline konfigureras för att använda förhandsgranskningskanalen.
@table eller @view |
---|
name Typ: str Ett valfritt namn för tabellen eller vyn. Om det inte har definierats används funktionsnamnet som tabell- eller vynamn. |
comment Typ: str En valfri beskrivning för tabellen. |
spark_conf Typ: dict En valfri lista över Spark-konfigurationer för körning av den här frågan. |
table_properties Typ: dict En valfri lista över tabellegenskaper för tabellen. |
path Typ: str En valfri lagringsplats för tabelldata. Om det inte anges är systemet standard för lagringsplatsen för pipelinen. |
partition_cols Typ: a collection of str En valfri samling, till exempel en list av en eller flera kolumner som ska användas för partitionering av tabellen. |
cluster_by Typ: array Du kan också aktivera flytande klustring i tabellen och definiera de kolumner som ska användas som klustringsnycklar. Se Använda flytande klustring för Delta-tabeller. |
schema Typ: str eller StructType En valfri schemadefinition för tabellen. Scheman kan definieras som en SQL DDL-sträng eller med en Python StructType . |
temporary Typ: bool Skapa en tabell, men publicera inte metadata för tabellen. Nyckelordet temporary instruerar Delta Live Tables att skapa en tabell som är tillgänglig för pipelinen men som inte ska nås utanför pipelinen. För att minska bearbetningstiden bevaras en tillfällig tabell under pipelinens livslängd som skapar den, och inte bara en enda uppdatering.Standardvärdet är "False". |
row_filter (Offentlig förhandsversion)Typ: str En valfri radfiltersats för tabellen. Se Publicera tabeller med radfilter och kolumnmasker. |
Tabell- eller vydefinition |
---|
def <function-name>() En Python-funktion som definierar datauppsättningen. Om parametern name inte har angetts <function-name> används den som måldatauppsättningens namn. |
query En Spark SQL-instruktion som returnerar en Spark Dataset eller Koalas DataFrame. Använd dlt.read() eller spark.table() för att utföra en fullständig läsning från en datauppsättning som definierats i samma pipeline. När du använder spark.table() funktionen för att läsa från en datauppsättning som definierats i samma pipeline, förbereder du nyckelordet LIVE till datamängdens namn i funktionsargumentet. Om du till exempel vill läsa från en datauppsättning med namnet customers :spark.table("LIVE.customers") Du kan också använda spark.table() funktionen för att läsa från en tabell som är registrerad i metaarkivet genom att utelämna nyckelordet LIVE och eventuellt kvalificera tabellnamnet med databasnamnet:spark.table("sales.customers") Använd dlt.read_stream() för att utföra en direktuppspelningsläsning från en datauppsättning som definierats i samma pipeline.spark.sql Använd funktionen för att definiera en SQL-fråga för att skapa returdatauppsättningen.Använd PySpark-syntax för att definiera Delta Live Tables-frågor med Python. |
Förväntningar |
---|
@expect("description", "constraint") Deklarera en datakvalitetsbegränsning som identifieras av description . Om en rad bryter mot förväntningarna inkluderar du raden i måldatauppsättningen. |
@expect_or_drop("description", "constraint") Deklarera en datakvalitetsbegränsning som identifieras av description . Om en rad bryter mot förväntningarna släpper du raden från måldatauppsättningen. |
@expect_or_fail("description", "constraint") Deklarera en datakvalitetsbegränsning som identifieras av description . Om en rad bryter mot förväntningarna stoppar du omedelbart körningen. |
@expect_all(expectations) Deklarera en eller flera datakvalitetsbegränsningar. expectations är en Python-ordlista, där nyckeln är förväntansbeskrivningen och värdet är förväntansbegränsningen. Om en rad bryter mot någon av förväntningarna ska du inkludera raden i måldatauppsättningen. |
@expect_all_or_drop(expectations) Deklarera en eller flera datakvalitetsbegränsningar. expectations är en Python-ordlista, där nyckeln är förväntansbeskrivningen och värdet är förväntansbegränsningen. Om en rad bryter mot någon av förväntningarna släpper du raden från måldatauppsättningen. |
@expect_all_or_fail(expectations) Deklarera en eller flera datakvalitetsbegränsningar. expectations är en Python-ordlista, där nyckeln är förväntansbeskrivningen och värdet är förväntansbegränsningen. Om en rad bryter mot någon av förväntningarna stoppar du omedelbart körningen. |
Ändra datainsamling från en ändringsfeed med Python i Delta Live Tables
apply_changes()
Använd funktionen i Python-API:et för att använda funktionen Delta Live Tables change data capture (CDC) för att bearbeta källdata från ett ändringsdataflöde (CDF).
Viktigt!
Du måste deklarera en måluppspelningstabell för att tillämpa ändringar i. Du kan också ange schemat för måltabellen. När du anger schemat för måltabellen apply_changes()
måste du inkludera kolumnerna __START_AT
och __END_AT
med samma datatyp som fälten sequence_by
.
Om du vill skapa den obligatoriska måltabellen kan du använda funktionen create_streaming_table() i Python-gränssnittet Delta Live Tables.
apply_changes(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = False,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
Kommentar
För APPLY CHANGES
bearbetning är standardbeteendet för INSERT
och UPDATE
händelser till upsert CDC-händelser från källan: uppdatera alla rader i måltabellen som matchar de angivna nycklarna eller infoga en ny rad när en matchande post inte finns i måltabellen. Hantering av DELETE
händelser kan anges med villkoret APPLY AS DELETE WHEN
.
Mer information om CDC-bearbetning med ett ändringsflöde finns i API:er för TILLÄMPA ÄNDRINGAR: Förenkla insamling av ändringsdata med Delta Live Tables. Ett exempel på hur du använder apply_changes()
funktionen finns i Exempel: SCD-typ 1 och SCD typ 2-bearbetning med CDF-källdata.
Viktigt!
Du måste deklarera en måluppspelningstabell för att tillämpa ändringar i. Du kan också ange schemat för måltabellen. När du anger apply_changes
måltabellschemat måste du inkludera kolumnerna __START_AT
och __END_AT
med samma datatyp som fältet sequence_by
.
Se API:er för TILLÄMPA ÄNDRINGAR: Förenkla insamling av ändringsdata med Delta Live Tables.
Argument |
---|
target Typ: str Namnet på tabellen som ska uppdateras. Du kan använda funktionen create_streaming_table() för att skapa måltabellen innan du apply_changes() kör funktionen.Den här parametern krävs. |
source Typ: str Datakällan som innehåller CDC-poster. Den här parametern krävs. |
keys Typ: list Kolumnen eller kombinationen av kolumner som unikt identifierar en rad i källdata. Detta används för att identifiera vilka CDC-händelser som gäller för specifika poster i måltabellen. Du kan ange något av följande: – En lista med strängar: ["userId", "orderId"] – En lista över Spark SQL-funktioner col() : [col("userId"), col("orderId"] Argument till col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId) , men du kan inte använda col(source.userId) .Den här parametern krävs. |
sequence_by Typ: str eller col() Kolumnnamnet som anger den logiska ordningen för CDC-händelser i källdata. Delta Live Tables använder den här sekvenseringen för att hantera ändringshändelser som kommer i fel ordning. Du kan ange något av följande: – En sträng: "sequenceNum" – En Spark SQL-funktion col() : col("sequenceNum") Argument till col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId) , men du kan inte använda col(source.userId) .Den angivna kolumnen måste vara en sorterbar datatyp. Den här parametern krävs. |
ignore_null_updates Typ: bool Tillåt inmatning av uppdateringar som innehåller en delmängd av målkolumnerna. När en CDC-händelse matchar en befintlig rad och ignore_null_updates är True , behåller kolumnerna null sina befintliga värden i målet. Detta gäller även kapslade kolumner med värdet null . När ignore_null_updates är False skrivs befintliga värden över med null värden.Den här parametern är valfri. Standardvärdet är False . |
apply_as_deletes Typ: str eller expr() Anger när en CDC-händelse ska behandlas som en DELETE i stället för en upsert. För att hantera oordnade data behålls den borttagna raden tillfälligt som en gravsten i den underliggande Delta-tabellen och en vy skapas i metaarkivet som filtrerar bort dessa gravstenar. Kvarhållningsintervallet kan konfigureras medpipelines.cdc.tombstoneGCThresholdInSeconds tabellegenskap.Du kan ange något av följande: – En sträng: "Operation = 'DELETE'" – En Spark SQL-funktion expr() : expr("Operation = 'DELETE'") Den här parametern är valfri. |
apply_as_truncates Typ: str eller expr() Anger när en CDC-händelse ska behandlas som en fullständig tabell TRUNCATE . Eftersom den här satsen utlöser en fullständig trunkering av måltabellen bör den endast användas för specifika användningsfall som kräver den här funktionen.Parametern apply_as_truncates stöds endast för SCD-typ 1. SCD-typ 2 stöder inte trunkeringsåtgärder.Du kan ange något av följande: – En sträng: "Operation = 'TRUNCATE'" – En Spark SQL-funktion expr() : expr("Operation = 'TRUNCATE'") Den här parametern är valfri. |
column_list except_column_list Typ: list En delmängd av kolumner som ska inkluderas i måltabellen. Använd column_list för att ange den fullständiga listan över kolumner som ska inkluderas. Använd except_column_list för att ange vilka kolumner som ska undantas. Du kan deklarera antingen värdet som en lista med strängar eller som Spark SQL-funktioner col() :- column_list = ["userId", "name", "city"] .- column_list = [col("userId"), col("name"), col("city")] - except_column_list = ["operation", "sequenceNum"] - except_column_list = [col("operation"), col("sequenceNum") Argument till col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId) , men du kan inte använda col(source.userId) .Den här parametern är valfri. Standardvärdet är att inkludera alla kolumner i måltabellen när inget column_list eller except_column_list argument skickas till funktionen. |
stored_as_scd_type Typ: str eller int Om poster ska lagras som SCD-typ 1 eller SCD typ 2. Ange till 1 för SCD typ 1 eller 2 för SCD typ 2.Den här satsen är valfri. Standardvärdet är SCD typ 1. |
track_history_column_list track_history_except_column_list Typ: list En delmängd av utdatakolumner som ska spåras för historik i måltabellen. Använd track_history_column_list för att ange den fullständiga listan över kolumner som ska spåras. Användtrack_history_except_column_list för att ange vilka kolumner som ska undantas från spårning. Du kan deklarera antingen värdet som en lista med strängar eller som Spark SQL-funktioner col() :- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Argument till col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId) , men du kan inte använda col(source.userId) .Den här parametern är valfri. Standardvärdet är att inkludera alla kolumner i måltabellen när nej track_history_column_list ellertrack_history_except_column_list skickas argumentet till funktionen. |
Ändra datainsamling från databasögonblicksbilder med Python i Delta Live Tables
Viktigt!
API:et APPLY CHANGES FROM SNAPSHOT
finns i offentlig förhandsversion.
apply_changes_from_snapshot()
Använd funktionen i Python-API:et för att använda funktionen Delta Live Tables change data capture (CDC) för att bearbeta källdata från databasögonblicksbilder.
Viktigt!
Du måste deklarera en måluppspelningstabell för att tillämpa ändringar i. Du kan också ange schemat för måltabellen. När du anger schemat för måltabellen apply_changes_from_snapshot()
måste du även inkludera kolumnerna __START_AT
och __END_AT
med samma datatyp som fältet sequence_by
.
Om du vill skapa den obligatoriska måltabellen kan du använda funktionen create_streaming_table() i Python-gränssnittet Delta Live Tables.
apply_changes_from_snapshot(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
) -> None
Kommentar
För APPLY CHANGES FROM SNAPSHOT
bearbetning är standardbeteendet att infoga en ny rad när en matchande post med samma nyckel(er) inte finns i målet. Om det finns en matchande post uppdateras den endast om något av värdena på raden har ändrats. Rader med nycklar som finns i målet men som inte längre finns i källan tas bort.
Mer information om CDC-bearbetning med ögonblicksbilder finns i API:er för TILLÄMPA ÄNDRINGAR: Förenkla insamling av ändringsdata med Delta Live Tables. Exempel på hur du använder apply_changes_from_snapshot()
funktionen finns i exemplen med periodisk inmatning av ögonblicksbilder och historisk inmatning av ögonblicksbilder.
Argument |
---|
target Typ: str Namnet på tabellen som ska uppdateras. Du kan använda funktionen create_streaming_table() för att skapa måltabellen innan du apply_changes() kör funktionen.Den här parametern krävs. |
source Typ: str eller lambda function Antingen namnet på en tabell eller vy som ska ögonblicksbildas regelbundet eller en Python lambda-funktion som returnerar den ögonblicksbild av DataFrame som ska bearbetas och ögonblicksbildversionen. Se Implementera källargumentet. Den här parametern krävs. |
keys Typ: list Kolumnen eller kombinationen av kolumner som unikt identifierar en rad i källdata. Detta används för att identifiera vilka CDC-händelser som gäller för specifika poster i måltabellen. Du kan ange något av följande: – En lista med strängar: ["userId", "orderId"] – En lista över Spark SQL-funktioner col() : [col("userId"), col("orderId"] Argument till col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId) , men du kan inte använda col(source.userId) .Den här parametern krävs. |
stored_as_scd_type Typ: str eller int Om poster ska lagras som SCD-typ 1 eller SCD typ 2. Ange till 1 för SCD typ 1 eller 2 för SCD typ 2.Den här satsen är valfri. Standardvärdet är SCD typ 1. |
track_history_column_list track_history_except_column_list Typ: list En delmängd av utdatakolumner som ska spåras för historik i måltabellen. Använd track_history_column_list för att ange den fullständiga listan över kolumner som ska spåras. Användtrack_history_except_column_list för att ange vilka kolumner som ska undantas från spårning. Du kan deklarera antingen värdet som en lista med strängar eller som Spark SQL-funktioner col() :- track_history_column_list = ["userId", "name", "city"] .- track_history_column_list = [col("userId"), col("name"), col("city")] - track_history_except_column_list = ["operation", "sequenceNum"] - track_history_except_column_list = [col("operation"), col("sequenceNum") Argument till col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId) , men du kan inte använda col(source.userId) .Den här parametern är valfri. Standardvärdet är att inkludera alla kolumner i måltabellen när nej track_history_column_list ellertrack_history_except_column_list skickas argumentet till funktionen. |
source
Implementera argumentet
Funktionen apply_changes_from_snapshot()
innehåller source
argumentet . För bearbetning av historiska ögonblicksbilder source
förväntas argumentet vara en Python lambda-funktion som returnerar två värden till apply_changes_from_snapshot()
funktionen: en Python DataFrame som innehåller de ögonblicksbildsdata som ska bearbetas och en ögonblicksbildversion.
Följande är signaturen för lambda-funktionen:
lambda Any => Optional[(DataFrame, Any)]
- Argumentet till lambda-funktionen är den senast bearbetade ögonblicksbildversionen.
- Returvärdet för lambda-funktionen är
None
eller en tuppeln med två värden: Det första värdet för tuppeln är en DataFrame som innehåller ögonblicksbilden som ska bearbetas. Det andra värdet för tuppeln är den ögonblicksbildsversion som representerar ögonblicksbildens logiska ordning.
Ett exempel som implementerar och anropar lambda-funktionen:
def next_snapshot_and_version(latest_snapshot_version):
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
apply_changes_from_snapshot(
# ...
source = next_snapshot_and_version,
# ...
)
Delta Live Tables-körningen utför följande steg varje gång pipelinen som innehåller apply_changes_from_snapshot()
funktionen utlöses:
next_snapshot_and_version
Kör funktionen för att läsa in nästa ögonblicksbild av DataFrame och motsvarande ögonblicksbildversion.- Om ingen DataFrame returnerar avslutas körningen och pipelineuppdateringen markeras som slutförd.
- Identifierar ändringarna i den nya ögonblicksbilden och tillämpar dem stegvis på måltabellen.
- Återgår till steg 1 för att läsa in nästa ögonblicksbild och dess version.
Begränsningar
Python-gränssnittet i Delta Live Tables har följande begränsning:
Funktionen pivot()
stöds inte. Åtgärden pivot
i Spark kräver ivrig inläsning av indata för att beräkna schemat för utdata. Den här funktionen stöds inte i Delta Live Tables.