Kontrollpunkter för strukturerad direktuppspelning

Kontrollpunkter och loggar för framåtskrivning fungerar tillsammans för att ge bearbetningsgarantier för strukturerade strömningsarbetsbelastningar. Kontrollpunkten spårar den information som identifierar frågan, inklusive tillståndsinformation och bearbetade poster. När du tar bort filerna i en kontrollpunktskatalog eller ändrar till en ny kontrollpunktsplats börjar nästa körning av frågan på nytt.

Varje fråga måste ha en annan kontrollpunktsplats. Flera frågor bör aldrig dela samma plats.

Aktivera kontrollpunkter för frågor om strukturerad direktuppspelning

Du måste ange alternativet checkpointLocation innan du kör en direktuppspelningsfråga, som i följande exempel:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Kommentar

Vissa mottagare, till exempel utdata för display() i notebook-filer och memory mottagare, genererar automatiskt en tillfällig kontrollpunktsplats om du utelämnar det här alternativet. Dessa tillfälliga kontrollpunktsplatser garanterar inte någon feltolerans eller datakonsekvens och kanske inte rensas korrekt. Databricks rekommenderar att du alltid anger en kontrollpunktsplats för dessa mottagare.

Återställa efter ändringar i en fråga för strukturerad direktuppspelning

Det finns begränsningar för vilka ändringar i en strömmande fråga som tillåts mellan omstarter från samma kontrollpunktsplats. Här följer några ändringar som antingen inte är tillåtna eller effekten av ändringen är inte väldefinierad. För alla:

  • Termen tillåten innebär att du kan göra den angivna ändringen, men om semantiken för dess effekt är väldefinierad beror på frågan och ändringen.
  • Termen tillåts inte innebär att du inte bör göra den angivna ändringen eftersom den omstartade frågan sannolikt misslyckas med oförutsägbara fel.
  • sdf representerar en strömmande DataFrame/datauppsättning som genererats med sparkSession.readStream.

Typer av ändringar i frågor om strukturerad direktuppspelning

  • Ändringar i antalet eller typen (dvs. en annan källa) för indatakällor: Detta är inte tillåtet.
  • Ändringar i parametrarna för indatakällor: Om detta är tillåtet och om semantiken för ändringen är väldefinierade beror på källan och frågan. Här är några exempel.
    • Tillägg, borttagning och ändring av hastighetsgränser tillåts:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      to

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Ändringar i artiklar och filer som prenumereras är vanligtvis inte tillåtna eftersom resultatet är oförutsägbart: spark.readStream.format("kafka").option("subscribe", "article")spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Ändringar i utlösarintervallet: Du kan ändra utlösare mellan inkrementella batchar och tidsintervall. Se Ändra utlösarintervall mellan körningar.
  • Ändringar i typen av utdatamottagare: Ändringar mellan några specifika kombinationer av mottagare tillåts. Detta måste verifieras från fall till fall. Här är några exempel.
    • Filmottagare till Kafka-mottagare tillåts. Kafka ser endast nya data.
    • Kafka-mottagare till filmottagaren tillåts inte.
    • Kafka-mottagare har ändrats till foreach, eller vice versa tillåts.
  • Ändringar i parametrarna för utdatamottagaren: Om detta tillåts och om semantiken för ändringen är väldefinierade beror på mottagaren och frågan. Här är några exempel.
    • Ändringar i utdatakatalogen för en filmottagare tillåts inte: sdf.writeStream.format("parquet").option("path", "/somePath")sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Ändringar i utdataavsnittet tillåts: sdf.writeStream.format("kafka").option("topic", "topic1") till sdf.writeStream.format("kafka").option("topic", "topic2")
    • Ändringar i den användardefinierade foreach-mottagaren (dvs ForeachWriter . koden) tillåts, men semantiken för ändringen beror på koden.
  • Ändringar i projektion/filter/kartliknande åtgärder: Vissa fall tillåts. Till exempel:
    • Tillägg/borttagning av filter tillåts: sdf.selectExpr("a") till sdf.where(...).selectExpr("a").filter(...).
    • Ändringar i projektioner med samma utdataschema tillåts: sdf.selectExpr("stringColumn AS json").writeStream till sdf.select(to_json(...).as("json")).writeStream.
    • Ändringar i projektioner med olika utdataschema är villkorligt tillåtna: sdf.selectExpr("a").writeStream till sdf.selectExpr("b").writeStream tillåts endast om utdatamottagaren tillåter schemaändring från "a" till "b".
  • Ändringar i tillståndskänsliga åtgärder: Vissa åtgärder i strömmande frågor måste underhålla tillståndsdata för att kontinuerligt uppdatera resultatet. Strukturerad direktuppspelning vägkontrollerar automatiskt tillståndsdata till feltolerant lagring (till exempel DBFS, Azure Blob Storage) och återställer dem efter omstart. Detta förutsätter dock att schemat för tillståndsdata förblir detsamma för omstarter. Det innebär att alla ändringar (det vill säga tillägg, borttagningar eller schemaändringar) i tillståndskänsliga åtgärder för en strömmande fråga inte tillåts mellan omstarter. Här är listan över tillståndskänsliga åtgärder vars schema inte bör ändras mellan omstarter för att säkerställa tillståndsåterställning:
    • Strömningsaggregering: Till exempel sdf.groupBy("a").agg(...). Ändringar i antal eller typ av grupperingsnycklar eller aggregat tillåts inte.
    • Strömmande deduplicering: Till exempel sdf.dropDuplicates("a"). Ändringar i antal eller typ av grupperingsnycklar eller aggregat tillåts inte.
    • Stream-stream-koppling: Till exempel sdf1.join(sdf2, ...) (dvs. båda indata genereras med sparkSession.readStream). Ändringar i schemat eller motsvarande kolumner tillåts inte. Ändringar i kopplingstyp (yttre eller inre) tillåts inte. Andra ändringar i kopplingsvillkoret är dåligt definierade.
    • Godtycklig tillståndskänslig åtgärd: Till exempel sdf.groupByKey(...).mapGroupsWithState(...) eller sdf.groupByKey(...).flatMapGroupsWithState(...). Ändringar i schemat för det användardefinierade tillståndet och typen av tidsgräns tillåts inte. Alla ändringar i den användardefinierade tillståndsmappningsfunktionen tillåts, men den semantiska effekten av ändringen beror på den användardefinierade logiken. Om du verkligen vill stödja tillståndsschemaändringar kan du uttryckligen koda/avkoda dina komplexa tillståndsdatastrukturer till byte med hjälp av ett kodnings-/avkodningsschema som stöder schemamigrering. Om du till exempel sparar ditt tillstånd som Avro-kodade byte kan du ändra Avro-state-schema mellan frågeomstarter när det återställer binärt tillstånd.