Delta Live Tables Python-språkreferens

Den här artikeln innehåller information om Programmeringsgränssnittet för Delta Live Tables Python.

Information om SQL-API:et finns i sql-språkreferensen Delta Live Tables.

Mer information om hur du konfigurerar automatisk inläsning finns i Vad är automatisk inläsning?.

Innan du börjar

Följande är viktiga överväganden när du implementerar pipelines med Delta Live Tables Python-gränssnittet:

  • Eftersom Python table() och view() funktionerna anropas flera gånger under planeringen och körningen av en pipelineuppdatering ska du inte inkludera kod i någon av dessa funktioner som kan ha biverkningar (till exempel kod som ändrar data eller skickar ett e-postmeddelande). För att undvika oväntat beteende bör dina Python-funktioner som definierar datauppsättningar endast innehålla den kod som krävs för att definiera tabellen eller vyn.
  • Om du vill utföra åtgärder som att skicka e-post eller integrera med en extern övervakningstjänst, särskilt i funktioner som definierar datauppsättningar, använder du händelsekrokar. Om du implementerar dessa åtgärder i de funktioner som definierar dina datauppsättningar kan det orsaka oväntat beteende.
  • Python table och view funktionerna måste returnera en DataFrame. Vissa funktioner som körs på DataFrames returnerar inte DataFrames och bör inte användas. Dessa åtgärder omfattar funktioner som collect(), count(), toPandas(), save()och saveAsTable(). Eftersom DataFrame-transformeringar körs efter att det fullständiga dataflödesdiagrammet har lösts kan användning av sådana åtgärder ha oavsiktliga biverkningar.

dlt Importera Python-modulen

Python-funktioner för Delta Live Tables definieras i modulen dlt . Dina pipelines som implementeras med Python-API:et måste importera den här modulen:

import dlt

Skapa en materialiserad vy eller en strömmande tabell för Delta Live Tables

I Python avgör Delta Live Tables om en datauppsättning ska uppdateras som en materialiserad vy eller en strömmande tabell baserat på den definierande frågan. Dekoratören @table kan användas för att definiera både materialiserade vyer och strömmande tabeller.

Om du vill definiera en materialiserad vy i Python gäller du @table för en fråga som utför en statisk läsning mot en datakälla. Om du vill definiera en strömmande tabell gäller du @table för en fråga som utför en direktuppspelningsläsning mot en datakälla eller använder funktionen create_streaming_table(). Båda datauppsättningstyperna har samma syntaxspecifikation enligt följande:

Kommentar

Om du vill använda cluster_by argumentet för att aktivera flytande klustring måste din pipeline konfigureras för att använda förhandsgranskningskanalen.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Skapa en Delta Live Tables-vy

Om du vill definiera en vy i Python använder du dekoratören @view . Precis som dekoratören @table kan du använda vyer i Delta Live Tables för antingen statiska eller strömmande datauppsättningar. Följande är syntaxen för att definiera vyer med Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Exempel: Definiera tabeller och vyer

Om du vill definiera en tabell eller vy i Python använder du dekoratören @dlt.view eller @dlt.table på en funktion. Du kan använda funktionsnamnet eller parametern name för att tilldela tabellen eller visningsnamnet. I följande exempel definieras två olika datauppsättningar: en vy med namnet taxi_raw som tar en JSON-fil som indatakälla och en tabell med namnet filtered_data som tar taxi_raw vyn som indata:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return dlt.read("taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return dlt.read("taxi_raw").where(...)

Exempel: Få åtkomst till en datauppsättning som definierats i samma pipeline

Förutom att läsa från externa datakällor kan du komma åt datauppsättningar som definierats i samma pipeline med funktionen Delta Live Tables read() . I följande exempel visas hur du skapar en customers_filtered datauppsättning med hjälp av read() funktionen:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return dlt.read("customers_raw").where(...)

Du kan också använda spark.table() funktionen för att komma åt en datauppsättning som definierats i samma pipeline. När du använder spark.table() funktionen för att komma åt en datauppsättning som definierats i pipelinen, förbereder nyckelordet i funktionsargumentet LIVE till datamängdens namn:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredB():
  return spark.table("LIVE.customers_raw").where(...)

Exempel: Läsa från en tabell som är registrerad i ett metaarkiv

Om du vill läsa data från en tabell som är registrerad i Hive-metaarkivet utelämnar du nyckelordet i funktionsargumentet LIVE och kvalificerar eventuellt tabellnamnet med databasnamnet:

@dlt.table
def customers():
  return spark.table("sales.customers").where(...)

Ett exempel på läsning från en Unity Catalog-tabell finns i Mata in data i en Unity Catalog-pipeline.

Exempel: Få åtkomst till en datauppsättning med hjälp av spark.sql

Du kan också returnera en datauppsättning med ett spark.sql uttryck i en frågefunktion. Om du vill läsa från en intern datauppsättning förbereder du LIVE. till datamängdens namn:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Skapa en tabell som ska användas som mål för strömningsåtgärder

create_streaming_table() Använd funktionen för att skapa en måltabell för poster som matas ut av strömningsåtgärder, inklusive apply_changes(), apply_changes_from_snapshot()och @append_flow utdataposter.

Kommentar

Funktionerna create_target_table() och create_streaming_live_table() är inaktuella. Databricks rekommenderar att du uppdaterar befintlig kod för att använda create_streaming_table() funktionen.

Kommentar

Om du vill använda cluster_by argumentet för att aktivera flytande klustring måste din pipeline konfigureras för att använda förhandsgranskningskanalen.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
Argument
name

Typ: str

Tabellnamnet.

Den här parametern krävs.
comment

Typ: str

En valfri beskrivning för tabellen.
spark_conf

Typ: dict

En valfri lista över Spark-konfigurationer för körning av den här frågan.
table_properties

Typ: dict

En valfri lista över tabellegenskaper för tabellen.
partition_cols

Typ: array

En valfri lista över en eller flera kolumner som ska användas för partitionering av tabellen.
cluster_by

Typ: array

Du kan också aktivera flytande klustring i tabellen och definiera de kolumner som ska användas som klustringsnycklar.

Se Använda flytande klustring för Delta-tabeller.
path

Typ: str

En valfri lagringsplats för tabelldata. Om det inte anges är systemet standard för lagringsplatsen för pipelinen.
schema

Typ: str eller StructType

En valfri schemadefinition för tabellen. Scheman kan definieras som en SQL DDL-sträng eller med en Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Typ: dict

Valfria datakvalitetsbegränsningar för tabellen. Se flera förväntningar.
row_filter (Offentlig förhandsversion)

Typ: str

En valfri radfiltersats för tabellen. Se Publicera tabeller med radfilter och kolumnmasker.

Kontrollera hur tabeller materialiseras

Tabeller ger också ytterligare kontroll över materialiseringen:

  • Ange hur tabeller partitioneras med .partition_cols Du kan använda partitionering för att påskynda frågor.
  • Du kan ange tabellegenskaper när du definierar en vy eller tabell. Se Tabellegenskaper för Delta Live Tables.
  • Ange en lagringsplats för tabelldata med hjälp av inställningen path . Som standard lagras tabelldata på lagringsplatsen för pipelinen om path de inte har angetts.
  • Du kan använda genererade kolumner i schemadefinitionen. Se Exempel: Ange ett schema och partitionskolumner.

Kommentar

För tabeller som är mindre än 1 TB i storlek rekommenderar Databricks att Delta Live Tables kan styra dataorganisationen. Du bör inte ange partitionskolumner om du inte förväntar dig att tabellen ska växa mer än en terabyte.

Exempel: Ange ett schema och partitionskolumner

Du kan också ange ett tabellschema med hjälp av en Python StructType - eller SQL DDL-sträng. När den anges med en DDL-sträng kan definitionen innehålla genererade kolumner.

I följande exempel skapas en tabell med namnet sales med ett schema som angetts med hjälp av en Python StructType:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

I följande exempel anges schemat för en tabell med hjälp av en DDL-sträng, definierar en genererad kolumn och definierar en partitionskolumn:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Som standard härleder Delta Live Tables schemat från table definitionen om du inte anger något schema.

Konfigurera en strömmande tabell för att ignorera ändringar i en källströmningstabell

Kommentar

  • Flaggan skipChangeCommits fungerar bara med spark.readStream funktionen option() . Du kan inte använda den här flaggan i en dlt.read_stream() funktion.
  • Du kan inte använda skipChangeCommits flaggan när källuppspelningstabellen definieras som mål för en apply_changes() -funktion.

Som standard kräver strömmande tabeller tilläggskällor. När en strömmande tabell använder en annan strömmande tabell som källa, och källströmningstabellen kräver uppdateringar eller borttagningar, till exempel GDPR-bearbetningen skipChangeCommits "rätt att bli bortglömd", kan flaggan anges när du läser källströmningstabellen för att ignorera dessa ändringar. Mer information om den här flaggan finns i Ignorera uppdateringar och borttagningar.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Exempel: Definiera tabellbegränsningar

Viktigt!

Tabellbegränsningar finns i offentlig förhandsversion.

När du anger ett schema kan du definiera primära och externa nycklar. Begränsningarna är informationsmässiga och tillämpas inte. Se BEGRÄNSNINGssatsen i SQL-språkreferensen.

I följande exempel definieras en tabell med en primär och sekundär nyckelbegränsning:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Exempel: Definiera ett radfilter och en kolumnmask

Viktigt!

Radfilter och kolumnmasker finns i offentlig förhandsversion.

Om du vill skapa en materialiserad vy eller en strömmande tabell med ett radfilter och en kolumnmask använder du ROW FILTER-satsen och MASK-satsen. I följande exempel visas hur du definierar en materialiserad vy och en strömmande tabell med både ett radfilter och en kolumnmask:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

Mer information om radfilter och kolumnmasker finns i Publicera tabeller med radfilter och kolumnmasker.

Egenskaper för Python Delta Live Tables

Följande tabeller beskriver de alternativ och egenskaper som du kan ange när du definierar tabeller och vyer med Delta Live Tables:

Kommentar

Om du vill använda cluster_by argumentet för att aktivera flytande klustring måste din pipeline konfigureras för att använda förhandsgranskningskanalen.

@table eller @view
name

Typ: str

Ett valfritt namn för tabellen eller vyn. Om det inte har definierats används funktionsnamnet som tabell- eller vynamn.
comment

Typ: str

En valfri beskrivning för tabellen.
spark_conf

Typ: dict

En valfri lista över Spark-konfigurationer för körning av den här frågan.
table_properties

Typ: dict

En valfri lista över tabellegenskaper för tabellen.
path

Typ: str

En valfri lagringsplats för tabelldata. Om det inte anges är systemet standard för lagringsplatsen för pipelinen.
partition_cols

Typ: a collection of str

En valfri samling, till exempel en list av en eller flera kolumner som ska användas för partitionering av tabellen.
cluster_by

Typ: array

Du kan också aktivera flytande klustring i tabellen och definiera de kolumner som ska användas som klustringsnycklar.

Se Använda flytande klustring för Delta-tabeller.
schema

Typ: str eller StructType

En valfri schemadefinition för tabellen. Scheman kan definieras som en SQL DDL-sträng eller med en Python StructType.
temporary

Typ: bool

Skapa en tabell, men publicera inte metadata för tabellen. Nyckelordet temporary instruerar Delta Live Tables att skapa en tabell som är tillgänglig för pipelinen men som inte ska nås utanför pipelinen. För att minska bearbetningstiden bevaras en tillfällig tabell under pipelinens livslängd som skapar den, och inte bara en enda uppdatering.

Standardvärdet är "False".
row_filter (Offentlig förhandsversion)

Typ: str

En valfri radfiltersats för tabellen. Se Publicera tabeller med radfilter och kolumnmasker.
Tabell- eller vydefinition
def <function-name>()

En Python-funktion som definierar datauppsättningen. Om parametern name inte har angetts <function-name> används den som måldatauppsättningens namn.
query

En Spark SQL-instruktion som returnerar en Spark Dataset eller Koalas DataFrame.

Använd dlt.read() eller spark.table() för att utföra en fullständig läsning från en datauppsättning som definierats i samma pipeline. När du använder spark.table() funktionen för att läsa från en datauppsättning som definierats i samma pipeline, förbereder du nyckelordet LIVE till datamängdens namn i funktionsargumentet. Om du till exempel vill läsa från en datauppsättning med namnet customers:

spark.table("LIVE.customers")

Du kan också använda spark.table() funktionen för att läsa från en tabell som är registrerad i metaarkivet genom att utelämna nyckelordet LIVE och eventuellt kvalificera tabellnamnet med databasnamnet:

spark.table("sales.customers")

Använd dlt.read_stream() för att utföra en direktuppspelningsläsning från en datauppsättning som definierats i samma pipeline.

spark.sql Använd funktionen för att definiera en SQL-fråga för att skapa returdatauppsättningen.

Använd PySpark-syntax för att definiera Delta Live Tables-frågor med Python.
Förväntningar
@expect("description", "constraint")

Deklarera en datakvalitetsbegränsning som identifieras av
description. Om en rad bryter mot förväntningarna inkluderar du raden i måldatauppsättningen.
@expect_or_drop("description", "constraint")

Deklarera en datakvalitetsbegränsning som identifieras av
description. Om en rad bryter mot förväntningarna släpper du raden från måldatauppsättningen.
@expect_or_fail("description", "constraint")

Deklarera en datakvalitetsbegränsning som identifieras av
description. Om en rad bryter mot förväntningarna stoppar du omedelbart körningen.
@expect_all(expectations)

Deklarera en eller flera datakvalitetsbegränsningar.
expectations är en Python-ordlista, där nyckeln är förväntansbeskrivningen och värdet är förväntansbegränsningen. Om en rad bryter mot någon av förväntningarna ska du inkludera raden i måldatauppsättningen.
@expect_all_or_drop(expectations)

Deklarera en eller flera datakvalitetsbegränsningar.
expectations är en Python-ordlista, där nyckeln är förväntansbeskrivningen och värdet är förväntansbegränsningen. Om en rad bryter mot någon av förväntningarna släpper du raden från måldatauppsättningen.
@expect_all_or_fail(expectations)

Deklarera en eller flera datakvalitetsbegränsningar.
expectations är en Python-ordlista, där nyckeln är förväntansbeskrivningen och värdet är förväntansbegränsningen. Om en rad bryter mot någon av förväntningarna stoppar du omedelbart körningen.

Ändra datainsamling från en ändringsfeed med Python i Delta Live Tables

apply_changes() Använd funktionen i Python-API:et för att använda funktionen Delta Live Tables change data capture (CDC) för att bearbeta källdata från ett ändringsdataflöde (CDF).

Viktigt!

Du måste deklarera en måluppspelningstabell för att tillämpa ändringar i. Du kan också ange schemat för måltabellen. När du anger schemat för måltabellen apply_changes() måste du inkludera kolumnerna __START_AT och __END_AT med samma datatyp som fälten sequence_by .

Om du vill skapa den obligatoriska måltabellen kan du använda funktionen create_streaming_table() i Python-gränssnittet Delta Live Tables.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Kommentar

För APPLY CHANGES bearbetning är standardbeteendet för INSERT och UPDATE händelser till upsert CDC-händelser från källan: uppdatera alla rader i måltabellen som matchar de angivna nycklarna eller infoga en ny rad när en matchande post inte finns i måltabellen. Hantering av DELETE händelser kan anges med villkoret APPLY AS DELETE WHEN .

Mer information om CDC-bearbetning med ett ändringsflöde finns i API:er för TILLÄMPA ÄNDRINGAR: Förenkla insamling av ändringsdata med Delta Live Tables. Ett exempel på hur du använder apply_changes() funktionen finns i Exempel: SCD-typ 1 och SCD typ 2-bearbetning med CDF-källdata.

Viktigt!

Du måste deklarera en måluppspelningstabell för att tillämpa ändringar i. Du kan också ange schemat för måltabellen. När du anger apply_changes måltabellschemat måste du inkludera kolumnerna __START_AT och __END_AT med samma datatyp som fältet sequence_by .

Se API:er för TILLÄMPA ÄNDRINGAR: Förenkla insamling av ändringsdata med Delta Live Tables.

Argument
target

Typ: str

Namnet på tabellen som ska uppdateras. Du kan använda funktionen create_streaming_table() för att skapa måltabellen innan du apply_changes() kör funktionen.

Den här parametern krävs.
source

Typ: str

Datakällan som innehåller CDC-poster.

Den här parametern krävs.
keys

Typ: list

Kolumnen eller kombinationen av kolumner som unikt identifierar en rad i källdata. Detta används för att identifiera vilka CDC-händelser som gäller för specifika poster i måltabellen.

Du kan ange något av följande:

– En lista med strängar: ["userId", "orderId"]
– En lista över Spark SQL-funktioner col() : [col("userId"), col("orderId"]

Argument till col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId), men du kan inte använda col(source.userId).

Den här parametern krävs.
sequence_by

Typ: str eller col()

Kolumnnamnet som anger den logiska ordningen för CDC-händelser i källdata. Delta Live Tables använder den här sekvenseringen för att hantera ändringshändelser som kommer i fel ordning.

Du kan ange något av följande:

– En sträng: "sequenceNum"
– En Spark SQL-funktion col() : col("sequenceNum")

Argument till col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId), men du kan inte använda col(source.userId).

Den angivna kolumnen måste vara en sorterbar datatyp.

Den här parametern krävs.
ignore_null_updates

Typ: bool

Tillåt inmatning av uppdateringar som innehåller en delmängd av målkolumnerna. När en CDC-händelse matchar en befintlig rad och ignore_null_updates är True, behåller kolumnerna null sina befintliga värden i målet. Detta gäller även kapslade kolumner med värdet null. När ignore_null_updates är Falseskrivs befintliga värden över med null värden.

Den här parametern är valfri.

Standardvärdet är False.
apply_as_deletes

Typ: str eller expr()

Anger när en CDC-händelse ska behandlas som en DELETE i stället för en upsert. För att hantera oordnade data behålls den borttagna raden tillfälligt som en gravsten i den underliggande Delta-tabellen och en vy skapas i metaarkivet som filtrerar bort dessa gravstenar. Kvarhållningsintervallet kan konfigureras med
pipelines.cdc.tombstoneGCThresholdInSeconds tabellegenskap.

Du kan ange något av följande:

– En sträng: "Operation = 'DELETE'"
– En Spark SQL-funktion expr() : expr("Operation = 'DELETE'")

Den här parametern är valfri.
apply_as_truncates

Typ: str eller expr()

Anger när en CDC-händelse ska behandlas som en fullständig tabell TRUNCATE. Eftersom den här satsen utlöser en fullständig trunkering av måltabellen bör den endast användas för specifika användningsfall som kräver den här funktionen.

Parametern apply_as_truncates stöds endast för SCD-typ 1. SCD-typ 2 stöder inte trunkeringsåtgärder.

Du kan ange något av följande:

– En sträng: "Operation = 'TRUNCATE'"
– En Spark SQL-funktion expr() : expr("Operation = 'TRUNCATE'")

Den här parametern är valfri.
column_list

except_column_list

Typ: list

En delmängd av kolumner som ska inkluderas i måltabellen. Använd column_list för att ange den fullständiga listan över kolumner som ska inkluderas. Använd except_column_list för att ange vilka kolumner som ska undantas. Du kan deklarera antingen värdet som en lista med strängar eller som Spark SQL-funktioner col() :

- column_list = ["userId", "name", "city"].
- column_list = [col("userId"), col("name"), col("city")]
- except_column_list = ["operation", "sequenceNum"]
- except_column_list = [col("operation"), col("sequenceNum")

Argument till col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId), men du kan inte använda col(source.userId).

Den här parametern är valfri.

Standardvärdet är att inkludera alla kolumner i måltabellen när inget column_list eller except_column_list argument skickas till funktionen.
stored_as_scd_type

Typ: str eller int

Om poster ska lagras som SCD-typ 1 eller SCD typ 2.

Ange till 1 för SCD typ 1 eller 2 för SCD typ 2.

Den här satsen är valfri.

Standardvärdet är SCD typ 1.
track_history_column_list

track_history_except_column_list

Typ: list

En delmängd av utdatakolumner som ska spåras för historik i måltabellen. Använd track_history_column_list för att ange den fullständiga listan över kolumner som ska spåras. Använd
track_history_except_column_list för att ange vilka kolumner som ska undantas från spårning. Du kan deklarera antingen värdet som en lista med strängar eller som Spark SQL-funktioner col() :
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Argument till col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId), men du kan inte använda col(source.userId).

Den här parametern är valfri.

Standardvärdet är att inkludera alla kolumner i måltabellen när nej track_history_column_list eller
track_history_except_column_list skickas argumentet till funktionen.

Ändra datainsamling från databasögonblicksbilder med Python i Delta Live Tables

Viktigt!

API:et APPLY CHANGES FROM SNAPSHOT finns i offentlig förhandsversion.

apply_changes_from_snapshot() Använd funktionen i Python-API:et för att använda funktionen Delta Live Tables change data capture (CDC) för att bearbeta källdata från databasögonblicksbilder.

Viktigt!

Du måste deklarera en måluppspelningstabell för att tillämpa ändringar i. Du kan också ange schemat för måltabellen. När du anger schemat för måltabellen apply_changes_from_snapshot() måste du även inkludera kolumnerna __START_AT och __END_AT med samma datatyp som fältet sequence_by .

Om du vill skapa den obligatoriska måltabellen kan du använda funktionen create_streaming_table() i Python-gränssnittet Delta Live Tables.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Kommentar

För APPLY CHANGES FROM SNAPSHOT bearbetning är standardbeteendet att infoga en ny rad när en matchande post med samma nyckel(er) inte finns i målet. Om det finns en matchande post uppdateras den endast om något av värdena på raden har ändrats. Rader med nycklar som finns i målet men som inte längre finns i källan tas bort.

Mer information om CDC-bearbetning med ögonblicksbilder finns i API:er för TILLÄMPA ÄNDRINGAR: Förenkla insamling av ändringsdata med Delta Live Tables. Exempel på hur du använder apply_changes_from_snapshot() funktionen finns i exemplen med periodisk inmatning av ögonblicksbilder och historisk inmatning av ögonblicksbilder.

Argument
target

Typ: str

Namnet på tabellen som ska uppdateras. Du kan använda funktionen create_streaming_table() för att skapa måltabellen innan du apply_changes() kör funktionen.

Den här parametern krävs.
source

Typ: str eller lambda function

Antingen namnet på en tabell eller vy som ska ögonblicksbildas regelbundet eller en Python lambda-funktion som returnerar den ögonblicksbild av DataFrame som ska bearbetas och ögonblicksbildversionen. Se Implementera källargumentet.

Den här parametern krävs.
keys

Typ: list

Kolumnen eller kombinationen av kolumner som unikt identifierar en rad i källdata. Detta används för att identifiera vilka CDC-händelser som gäller för specifika poster i måltabellen.

Du kan ange något av följande:

– En lista med strängar: ["userId", "orderId"]
– En lista över Spark SQL-funktioner col() : [col("userId"), col("orderId"]

Argument till col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId), men du kan inte använda col(source.userId).

Den här parametern krävs.
stored_as_scd_type

Typ: str eller int

Om poster ska lagras som SCD-typ 1 eller SCD typ 2.

Ange till 1 för SCD typ 1 eller 2 för SCD typ 2.

Den här satsen är valfri.

Standardvärdet är SCD typ 1.
track_history_column_list

track_history_except_column_list

Typ: list

En delmängd av utdatakolumner som ska spåras för historik i måltabellen. Använd track_history_column_list för att ange den fullständiga listan över kolumner som ska spåras. Använd
track_history_except_column_list för att ange vilka kolumner som ska undantas från spårning. Du kan deklarera antingen värdet som en lista med strängar eller som Spark SQL-funktioner col() :
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Argument till col() funktioner kan inte innehålla kvalificerare. Du kan till exempel använda col(userId), men du kan inte använda col(source.userId).

Den här parametern är valfri.

Standardvärdet är att inkludera alla kolumner i måltabellen när nej track_history_column_list eller
track_history_except_column_list skickas argumentet till funktionen.

source Implementera argumentet

Funktionen apply_changes_from_snapshot() innehåller source argumentet . För bearbetning av historiska ögonblicksbilder source förväntas argumentet vara en Python lambda-funktion som returnerar två värden till apply_changes_from_snapshot() funktionen: en Python DataFrame som innehåller de ögonblicksbildsdata som ska bearbetas och en ögonblicksbildversion.

Följande är signaturen för lambda-funktionen:

lambda Any => Optional[(DataFrame, Any)]
  • Argumentet till lambda-funktionen är den senast bearbetade ögonblicksbildversionen.
  • Returvärdet för lambda-funktionen är None eller en tuppeln med två värden: Det första värdet för tuppeln är en DataFrame som innehåller ögonblicksbilden som ska bearbetas. Det andra värdet för tuppeln är den ögonblicksbildsversion som representerar ögonblicksbildens logiska ordning.

Ett exempel som implementerar och anropar lambda-funktionen:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Delta Live Tables-körningen utför följande steg varje gång pipelinen som innehåller apply_changes_from_snapshot() funktionen utlöses:

  1. next_snapshot_and_version Kör funktionen för att läsa in nästa ögonblicksbild av DataFrame och motsvarande ögonblicksbildversion.
  2. Om ingen DataFrame returnerar avslutas körningen och pipelineuppdateringen markeras som slutförd.
  3. Identifierar ändringarna i den nya ögonblicksbilden och tillämpar dem stegvis på måltabellen.
  4. Återgår till steg 1 för att läsa in nästa ögonblicksbild och dess version.

Begränsningar

Python-gränssnittet i Delta Live Tables har följande begränsning:

Funktionen pivot() stöds inte. Åtgärden pivot i Spark kräver ivrig inläsning av indata för att beräkna schemat för utdata. Den här funktionen stöds inte i Delta Live Tables.