Transformace dat pomocí dynamických tabulek Delta
Tento článek popisuje, jak můžete pomocí rozdílových živých tabulek deklarovat transformace datových sad a určit způsob zpracování záznamů pomocí logiky dotazu. Obsahuje také příklady běžných transformačních vzorů pro vytváření kanálů Delta Live Tables.
Datovou sadu můžete definovat pro jakýkoli dotaz, který vrací datový rámec. Jako transformace v kanálu Delta Live Tables můžete použít integrované operace Apache Sparku, funkce definované uživatelem, vlastní logiku a modely MLflow. Po ingestování dat do kanálu Delta Live Tables můžete definovat nové datové sady pro nadřazené zdroje a vytvářet nové streamované tabulky, materializovaná zobrazení a zobrazení.
Informace o efektivním provádění stavového zpracování pomocí rozdílových živých tabulek najdete v tématu Optimalizace stavového zpracování v rozdílových živých tabulkách s vodoznaky.
Kdy použít zobrazení, materializovaná zobrazení a streamované tabulky
Při implementaci dotazů kanálu zvolte nejlepší typ datové sady, abyste zajistili, že jsou efektivní a udržovatelné.
Zvažte použití zobrazení k provedení následujících kroků:
- Rozdělte velký nebo složitý dotaz, který chcete použít k jednodušší správě dotazů.
- Ověřte průběžné výsledky pomocí očekávání.
- Snižte náklady na úložiště a výpočetní prostředky pro výsledky, které nemusíte uchovávat. Vzhledem k tomu, že tabulky jsou materializované, vyžadují další výpočetní prostředky a prostředky úložiště.
Zvažte použití materializovaného zobrazení v následujících případech:
- Tabulku spotřebovávají více podřízených dotazů. Vzhledem k tomu, že zobrazení se počítají na vyžádání, je zobrazení znovu vypočteno při každém dotazování zobrazení.
- Tabulku spotřebovávají jiné kanály, úlohy nebo dotazy. Vzhledem k tomu, že zobrazení nejsou materializovaná, můžete je použít pouze ve stejném kanálu.
- Chcete zobrazit výsledky dotazu během vývoje. Vzhledem k tomu, že tabulky jsou materializované a dají se zobrazit a dotazovat mimo kanál, může použití tabulek během vývoje pomoct ověřit správnost výpočtů. Po ověření převeďte dotazy, které nevyžadují materializaci do zobrazení.
Zvažte použití streamované tabulky v následujících případech:
- Dotaz je definován proti zdroji dat, který se nepřetržitě nebo přírůstkově zvětšuje.
- Výsledky dotazu by se měly vypočítat přírůstkově.
- Kanál potřebuje vysokou propustnost a nízkou latenci.
Poznámka:
Streamované tabulky jsou vždy definovány proti zdrojům streamování. K instalaci aktualizací z informačních kanálů CDC můžete použít také zdroje APPLY CHANGES INTO
streamování. Viz rozhraní API APPLY CHANGES: Zjednodušení zachytávání dat změn pomocí rozdílových živých tabulek.
Vyloučení tabulek z cílového schématu
Pokud je nutné vypočítat zprostředkující tabulky, které nejsou určené pro externí spotřebu, můžete zabránit jejich publikování do schématu pomocí klíčového TEMPORARY
slova. Dočasné tabulky stále ukládají a zpracovávají data podle sémantiky Delta Live Tables, ale neměly by být přístupné mimo aktuální kanál. Dočasná tabulka zůstane po celou dobu životnosti kanálu, který ji vytvoří. K deklaraci dočasných tabulek použijte následující syntaxi:
SQL
CREATE TEMPORARY STREAMING TABLE temp_table
AS SELECT ... ;
Python
@dlt.table(
temporary=True)
def temp_table():
return ("...")
Kombinování streamovaných tabulek a materializovaných zobrazení v jednom kanálu
Streamované tabulky dědí záruky zpracování strukturovaného streamování Apache Sparku a jsou nakonfigurované tak, aby zpracovávaly dotazy ze zdrojů dat jen pro připojení, kde se nové řádky vždy vkládají do zdrojové tabulky, a ne upravovat.
Poznámka:
I když streamované tabulky ve výchozím nastavení vyžadují zdroje dat jen pro připojení, pokud je zdrojem streamování jiná streamovací tabulka, která vyžaduje aktualizace nebo odstranění, můžete toto chování přepsat příznakem skipChangeCommits.
Běžný model streamování zahrnuje ingestování zdrojových dat k vytvoření počátečních datových sad v kanálu. Tyto počáteční datové sady se běžně označují jako bronzové tabulky a často provádějí jednoduché transformace.
Naproti tomu konečné tabulky v kanálu, běžně označované jako zlaté tabulky, často vyžadují složité agregace nebo čtení z cílů APPLY CHANGES INTO
operace. Vzhledem k tomu, že tyto operace vytvářejí aktualizace spíše než připojení, nejsou podporovány jako vstupy do streamovaných tabulek. Tyto transformace jsou vhodnější pro materializovaná zobrazení.
Kombinováním streamovaných tabulek a materializovaných zobrazení do jednoho kanálu můžete kanál zjednodušit, vyhnout se nákladnému opakovanému příjmu dat nebo opětovnému zpracování nezpracovaných dat a mít plný výkon SQL k výpočtu složitých agregací přes efektivně zakódovanou a filtrovanou datovou sadu. Následující příklad ukazuje tento typ smíšeného zpracování:
Poznámka:
Tyto příklady používají k načtení souborů z cloudového úložiště automatický zavaděč. Pokud chcete načíst soubory s automatickým zavaděčem v kanálu s povoleným katalogem Unity, musíte použít externí umístění. Další informace o používání katalogu Unity s dynamickými tabulkami Delta najdete v tématu Použití katalogu Unity s kanály Delta Live Tables.
Python
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("abfss://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return dlt.read_stream("streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return dlt.read("streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING TABLE streaming_bronze
AS SELECT * FROM read_files(
"abfss://path/to/raw/data", "json"
)
CREATE OR REFRESH STREAMING TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...
CREATE OR REFRESH MATERIALIZED VIEW live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id
Přečtěte si další informace o použití automatického zavaděče k přírůstkové ingestování souborů JSON z úložiště Azure.
Statické spojení streamu
Statické spojení datových proudů jsou dobrou volbou při denormalizaci souvislého proudu dat jen pro připojení s primárně tabulkou statických dimenzí.
Při každé aktualizaci kanálu se nové záznamy ze streamu připojí k nejaktuálnějšímu snímku statické tabulky. Pokud jsou záznamy přidány nebo aktualizovány ve statické tabulce po zpracování odpovídajících dat z tabulky streamování, výsledné záznamy se nepřepočítávají, pokud se neprovede úplná aktualizace.
V kanálech nakonfigurovaných pro aktivované spuštění vrátí statická tabulka výsledky po spuštění aktualizace. V kanálech nakonfigurovaných pro průběžné spouštění se při každém zpracování aktualizace dotazuje nejnovější verze statické tabulky.
Následuje příklad statického spojení streamu:
Python
@dlt.table
def customer_sales():
return dlt.read_stream("sales").join(dlt.read("customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
INNER JOIN LEFT LIVE.customers USING (customer_id)
Efektivní výpočet agregací
Streamované tabulky můžete použít k přírůstkové výpočtu jednoduchých distribuačních agregací, jako je počet, minimum, maximum nebo součet a algebraické agregace, jako jsou průměr nebo směrodatná odchylka. Databricks doporučuje přírůstkovou agregaci pro dotazy s omezeným počtem skupin, jako je dotaz s klauzulí GROUP BY country
. Při každé aktualizaci se čtou jenom nová vstupní data.
Další informace o psaní dotazů Delta Live Tables, které provádějí přírůstkové agregace, najdete v tématu Provádění agregací s vodoznaky v otevírně.
Použití modelů MLflow v kanálu Delta Live Tables
Poznámka:
Pokud chcete používat modely MLflow v kanálu s podporou katalogu Unity, musí být váš kanál nakonfigurovaný tak, aby používal kanál preview
. Pokud chcete kanál použít current
, musíte nakonfigurovat kanál tak, aby se publikoval do metastoru Hive.
V kanálech Delta Live Tables můžete používat modely natrénované pomocí MLflow. Modely MLflow se v Azure Databricks považují za transformace, což znamená, že pracují se vstupem datového rámce Sparku a vrací výsledky jako datový rámec Sparku. Vzhledem k tomu, že rozdílové živé tabulky definují datové sady proti datovým rámcům, můžete převést úlohy Apache Sparku, které používají MLflow, na rozdílové živé tabulky s pouhými několika řádky kódu. Další informace o MLflow najdete v tématu Správa životního cyklu ML pomocí MLflow.
Pokud už máte poznámkový blok Pythonu, který volá model MLflow, můžete tento kód přizpůsobit rozdílovým živým tabulkám pomocí dekorátoru @dlt.table
a zajistit, aby funkce byly definovány tak, aby vracely výsledky transformace. Delta Live Tables ve výchozím nastavení neinstaluje MLflow, proto zkontrolujte, jestli jste vy %pip install mlflow
a import mlflow
a dlt
v horní části poznámkového bloku. Úvod do syntaxe rozdílových živých tabulek najdete v tématu Implementace kanálu Delta Live Tables s Pythonem.
Pokud chcete používat modely MLflow v Delta Live Tables, proveďte následující kroky:
- Získejte ID spuštění a název modelu MLflow. ID spuštění a název modelu se používají k vytvoření identifikátoru URI modelu MLflow.
- Pomocí identifikátoru URI definujte UDF Sparku pro načtení modelu MLflow.
- Zavolejte UDF v definicích tabulky, aby se použil model MLflow.
Následující příklad ukazuje základní syntaxi pro tento vzor:
%pip install mlflow
import dlt
import mlflow
run_id= "<mlflow-run-id>"
model_name = "<the-model-name-in-run>"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def model_predictions():
return dlt.read(<input-data>)
.withColumn("prediction", loaded_model_udf(<model-features>))
Jako úplný příklad následující kód definuje UDF Sparku s názvem loaded_model_udf
, který načte model MLflow natrénovaný na data úvěrového rizika. Datové sloupce použité k předpovědím se předávají jako argument funkce definované uživatelem. Tabulka vypočítá předpovědi pro každý řádek v loan_risk_input_data
tabulce loan_risk_predictions
.
%pip install mlflow
import dlt
import mlflow
from pyspark.sql.functions import struct
run_id = "mlflow_run_id"
model_name = "the_model_name_in_run"
model_uri = f"runs:/{run_id}/{model_name}"
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
categoricals = ["term", "home_ownership", "purpose",
"addr_state","verification_status","application_type"]
numerics = ["loan_amnt", "emp_length", "annual_inc", "dti", "delinq_2yrs",
"revol_util", "total_acc", "credit_length_in_years"]
features = categoricals + numerics
@dlt.table(
comment="GBT ML predictions of loan risk",
table_properties={
"quality": "gold"
}
)
def loan_risk_predictions():
return dlt.read("loan_risk_input_data")
.withColumn('predictions', loaded_model_udf(struct(features)))
Zachování ručních odstranění nebo aktualizací
Delta Live Tables umožňuje ručně odstranit nebo aktualizovat záznamy z tabulky a provést operaci aktualizace pro přepočítání podřízených tabulek.
Delta Live Tables ve výchozím nastavení přepočítá výsledky tabulky na základě vstupních dat při každé aktualizaci kanálu, takže je nutné zajistit, aby odstraněný záznam nebyl znovu načten ze zdrojových dat. pipelines.reset.allowed
Nastavením vlastnosti tabulky zabráníte false
aktualizacím tabulky, ale nezabráníte přírůstkovým zápisům do tabulek nebo novým datům v toku do tabulky.
Následující diagram znázorňuje příklad pomocí dvou streamovaných tabulek:
raw_user_table
ingestuje nezpracovaná uživatelská data ze zdroje.bmi_table
přírůstkově vypočítá skóre BMI pomocí váhy a výšky zraw_user_table
.
Chcete ručně odstranit nebo aktualizovat záznamy uživatele z raw_user_table
a znovu zkompilovat bmi_table
.
Následující kód ukazuje nastavení pipelines.reset.allowed
vlastnosti tabulky tak, aby false
byla zakázána úplná aktualizace raw_user_table
tak, aby se požadované změny uchovály v průběhu času, ale podřízené tabulky se přepočítají při spuštění aktualizace kanálu:
CREATE OR REFRESH STREAMING TABLE raw_user_table
TBLPROPERTIES(pipelines.reset.allowed = false)
AS SELECT * FROM read_files("/databricks-datasets/iot-stream/data-user", "csv");
CREATE OR REFRESH STREAMING TABLE bmi_table
AS SELECT userid, (weight/2.2) / pow(height*0.0254,2) AS bmi FROM STREAM(LIVE.raw_user_table);