Hantera datakvalitet med Delta Live Tables

Du använder förväntningar för att definiera datakvalitetsbegränsningar för innehållet i en datamängd. Med förväntningar kan du garantera att data som kommer in i tabeller uppfyller datakvalitetskraven och ger insikter om datakvaliteten för varje pipelineuppdatering. Du tillämpar förväntningar på frågor med hjälp av Python-dekoratörer eller SQL-villkorssatser.

Vad är Förväntningar på Delta Live Tables?

Förväntningar är valfria satser som du lägger till i Delta Live Tables-datauppsättningsdeklarationer som tillämpar datakvalitetskontroller på varje post som passerar genom en fråga.

En förväntan består av tre saker:

  • En beskrivning som fungerar som en unik identifierare och gör att du kan spåra mått för villkoret.
  • En boolesk instruktion som alltid returnerar sant eller falskt baserat på ett angivet villkor.
  • En åtgärd att vidta när en post misslyckas med förväntan, vilket innebär att det booleska returnerar false.

Följande matris visar de tre åtgärder som du kan använda för ogiltiga poster:

Åtgärd Resultat
warn (standard) Ogiltiga poster skrivs till målet. fel rapporteras som ett mått för datamängden.
droppe Ogiltiga poster tas bort innan data skrivs till målet. fel rapporteras som ett mått för datamängden.
misslyckas Ogiltiga poster förhindrar att uppdateringen lyckas. Manuella åtgärder krävs före ombearbetning.

Du kan visa datakvalitetsmått, till exempel antalet poster som bryter mot en förväntan genom att fråga händelseloggen Delta Live Tables. Se Övervaka Delta Live Tables-pipelines.

En fullständig referens till Delta Live Tables-datauppsättningsdeklarationssyntax finns i Referens för Python-språkreferens för Delta Live Tables eller SQL-språkreferens för Delta Live Tables.

Kommentar

  • Du kan inkludera flera satser i alla förväntningar, men endast Python har stöd för att definiera åtgärder baserat på flera förväntningar. Se Flera förväntningar.
  • Förväntningar måste definieras med SQL-uttryck. Du kan inte använda icke-SQL-syntax (till exempel Python-funktioner) när du definierar en förväntan.

Behålla ogiltiga poster

Använd operatorn expect när du vill behålla poster som strider mot förväntningarna. Poster som bryter mot förväntningarna läggs till i måldatauppsättningen tillsammans med giltiga poster:

Python

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

Ta bort ogiltiga poster

Använd operatorn expect or drop för att förhindra ytterligare bearbetning av ogiltiga poster. Poster som bryter mot förväntningarna tas bort från måldatauppsättningen:

Python

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

Fel vid ogiltiga poster

När ogiltiga poster är oacceptabla använder du operatorn expect or fail för att stoppa körningen omedelbart när en post misslyckas med valideringen. Om åtgärden är en tabelluppdatering återställer systemet atomiskt transaktionen:

Python

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

Viktigt!

Om du har flera parallella flöden som definierats i en pipeline leder fel i ett enda flöde inte till att andra flöden misslyckas.

När en pipeline misslyckas på grund av en förväntansöverträdelse måste du åtgärda pipelinekoden för att hantera ogiltiga data korrekt innan du kör pipelinen igen.

Förväntade fel ändrar Spark-frågeplanen för dina omvandlingar för att spåra information som krävs för att identifiera och rapportera om överträdelser. För många frågor kan du använda den här informationen för att identifiera vilken indatapost som ledde till överträdelsen. Följande är ett exempel på ett undantag:

Expectation Violated:
{
  "flowName": "a-b",
  "verboseInfo": {
    "expectationsViolated": [
      "x1 is negative"
    ],
    "inputData": {
      "a": {"x1": 1,"y1": "a },
      "b": {
        "x2": 1,
        "y2": "aa"
      }
    },
    "outputRecord": {
      "x1": 1,
      "y1": "a",
      "x2": 1,
      "y2": "aa"
    },
    "missingInputData": false
  }
}

Flera förväntningar

Du kan definiera förväntningar med en eller flera datakvalitetsbegränsningar i Python-pipelines. De här dekoratörerna accepterar en Python-ordlista som argument, där nyckeln är förväntansnamnet och värdet är förväntansbegränsningen.

Använd expect_all för att ange flera datakvalitetsbegränsningar när poster som misslyckas med valideringen ska ingå i måldatauppsättningen:

@dlt.expect_all({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Använd expect_all_or_drop för att ange flera begränsningar för datakvalitet när poster som misslyckas med valideringen ska tas bort från måldatauppsättningen:

@dlt.expect_all_or_drop({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Använd expect_all_or_fail för att ange flera begränsningar för datakvalitet när poster som misslyckas med valideringen ska stoppa pipelinekörningen:

@dlt.expect_all_or_fail({"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"})

Du kan också definiera en samling förväntningar som en variabel och skicka den till en eller flera frågor i pipelinen:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create cleaned and prepared dataset

Ogiltiga data i karantän

I följande exempel används förväntningar i kombination med tillfälliga tabeller och vyer. Det här mönstret ger dig mått för poster som skickar förväntanskontroller under pipelineuppdateringar och ger ett sätt att bearbeta giltiga och ogiltiga poster via olika underordnade sökvägar.

Kommentar

I det här exemplet läss exempeldata som ingår i Databricks-datauppsättningarna. Eftersom Databricks-datauppsättningarna inte stöds med en pipeline som publicerar till Unity Catalog fungerar det här exemplet bara med en pipeline som är konfigurerad att publicera till Hive-metaarkivet. Det här mönstret fungerar dock också med Unity Catalog-aktiverade pipelines, men du måste läsa data från externa platser. Mer information om hur du använder Unity Catalog med Delta Live Tables finns i Använda Unity Catalog med dina Delta Live Tables-pipelines.

import dlt
from pyspark.sql.functions import expr

rules = {}
rules["valid_website"] = "(Website IS NOT NULL)"
rules["valid_location"] = "(Location IS NOT NULL)"
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))

@dlt.table(
  name="raw_farmers_market"
)
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="farmers_market_quarantine",
  temporary=True,
  partition_cols=["is_quarantined"]
)
@dlt.expect_all(rules)
def farmers_market_quarantine():
  return (
    dlt.read("raw_farmers_market")
      .select("MarketName", "Website", "Location", "State",
              "Facebook", "Twitter", "Youtube", "Organic", "updateTime")
      .withColumn("is_quarantined", expr(quarantine_rules))
  )

@dlt.view(
  name="valid_farmers_market"
)
def get_valid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=false")
  )

@dlt.view(
  name="invalid_farmers_market"
)
def get_invalid_farmers_market():
  return (
    dlt.read("farmers_market_quarantine")
      .filter("is_quarantined=true")
  )

Verifiera radantal mellan tabeller

Du kan lägga till ytterligare en tabell i pipelinen som definierar en förväntan att jämföra radantal mellan två materialiserade vyer eller strömmande tabeller. Resultatet av den här förväntan visas i händelseloggen och Delta Live Tables-användargränssnittet. I följande exempel verifieras lika antal rader mellan tabellerna tbla och tblb :

CREATE OR REFRESH MATERIALIZED VIEW count_verification(
  CONSTRAINT no_rows_dropped EXPECT (a_count == b_count)
) AS SELECT * FROM
  (SELECT COUNT(*) AS a_count FROM LIVE.tbla),
  (SELECT COUNT(*) AS b_count FROM LIVE.tblb)

Utföra avancerad validering med förväntningar för Delta Live Tables

Du kan definiera materialiserade vyer med hjälp av aggregerade och anslutna frågor och använda resultatet av dessa frågor som en del av din förväntanskontroll. Detta är användbart om du vill utföra komplexa datakvalitetskontroller, till exempel genom att se till att en härledd tabell innehåller alla poster från källtabellen eller garantera likheten mellan en numerisk kolumn mellan tabeller.

I följande exempel verifieras att alla förväntade poster finns i report tabellen:

CREATE MATERIALIZED VIEW report_compare_tests(
  CONSTRAINT no_missing_records EXPECT (r.key IS NOT NULL)
)
AS SELECT * FROM LIVE.validation_copy v
LEFT OUTER JOIN LIVE.report r ON v.key = r.key

I följande exempel används en aggregering för att säkerställa att en primärnyckel är unik:

CREATE MATERIALIZED VIEW report_pk_tests(
  CONSTRAINT unique_pk EXPECT (num_entries = 1)
)
AS SELECT pk, count(*) as num_entries
FROM LIVE.report
GROUP BY pk

Gör förväntningarna bärbara och återanvändbara

Du kan underhålla datakvalitetsregler separat från dina pipelineimplementeringar.

Databricks rekommenderar att du lagrar reglerna i en Delta-tabell med varje regel kategoriserad efter en tagg. Du använder den här taggen i datamängdsdefinitioner för att avgöra vilka regler som ska tillämpas.

I följande exempel skapas en tabell med namnet rules för att underhålla regler:

CREATE OR REPLACE TABLE
  rules
AS SELECT
  col1 AS name,
  col2 AS constraint,
  col3 AS tag
FROM (
  VALUES
  ("website_not_null","Website IS NOT NULL","validity"),
  ("location_not_null","Location IS NOT NULL","validity"),
  ("state_not_null","State IS NOT NULL","validity"),
  ("fresh_data","to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'","maintained"),
  ("social_media_access","NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)","maintained")
)

I följande Python-exempel definieras datakvalitetsförväntningar baserat på de regler som lagras i rules tabellen. Funktionen get_rules() läser reglerna från rules tabellen och returnerar en Python-ordlista som innehåller regler som matchar argumentet tag som skickas till funktionen. Ordlistan används i dekoratörerna @dlt.expect_all_*() för att framtvinga datakvalitetsbegränsningar. Till exempel kommer alla poster som misslyckas med reglerna som taggats med validity att tas bort från raw_farmers_market tabellen:

Kommentar

I det här exemplet läss exempeldata som ingår i Databricks-datauppsättningarna. Eftersom Databricks-datauppsättningarna inte stöds med en pipeline som publicerar till Unity Catalog fungerar det här exemplet bara med en pipeline som är konfigurerad att publicera till Hive-metaarkivet. Det här mönstret fungerar dock också med Unity Catalog-aktiverade pipelines, men du måste läsa data från externa platser. Mer information om hur du använder Unity Catalog med Delta Live Tables finns i Använda Unity Catalog med dina Delta Live Tables-pipelines.

import dlt
from pyspark.sql.functions import expr, col

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  df = spark.read.table("rules")
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )

I stället för att skapa en tabell med namnet rules för att underhålla regler kan du skapa en Python-modul till huvudregler, till exempel i en fil med namnet rules_module.py i samma mapp som notebook-filen:

def get_rules_as_list_of_dict():
  return [
    {
      "name": "website_not_null",
      "constraint": "Website IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "location_not_null",
      "constraint": "Location IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "state_not_null",
      "constraint": "State IS NOT NULL",
      "tag": "validity"
    },
    {
      "name": "fresh_data",
      "constraint": "to_date(updateTime,'M/d/yyyy h:m:s a') > '2010-01-01'",
      "tag": "maintained"
    },
    {
      "name": "social_media_access",
      "constraint": "NOT(Facebook IS NULL AND Twitter IS NULL AND Youtube IS NULL)",
      "tag": "maintained"
    }
  ]

Ändra sedan föregående notebook-fil genom att importera modulen och ändra get_rules() funktionen till att läsa från modulen i stället för från rules tabellen:

import dlt
from rules_module import *
from pyspark.sql.functions import expr, col

df = spark.createDataFrame(get_rules_as_list_of_dict())

def get_rules(tag):
  """
    loads data quality rules from a table
    :param tag: tag to match
    :return: dictionary of rules that matched the tag
  """
  rules = {}
  for row in df.filter(col("tag") == tag).collect():
    rules[row['name']] = row['constraint']
  return rules

@dlt.table(
  name="raw_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('validity'))
def get_farmers_market_data():
  return (
    spark.read.format('csv').option("header", "true")
      .load('/databricks-datasets/data.gov/farmers_markets_geographic_data/data-001/')
  )

@dlt.table(
  name="organic_farmers_market"
)
@dlt.expect_all_or_drop(get_rules('maintained'))
def get_organic_farmers_market():
  return (
    dlt.read("raw_farmers_market")
      .filter(expr("Organic = 'Y'"))
      .select("MarketName", "Website", "State",
        "Facebook", "Twitter", "Youtube", "Organic",
        "updateTime"
      )
  )