Puntos de control de Structured Streaming

Los puntos de control y los registros de escritura anticipada funcionan juntos para proporcionar garantías de procesamiento para cargas de trabajo de Structured Streaming. El punto de control realiza un seguimiento de la información que identifica la consulta, incluida la información de estado y los registros procesados. Al eliminar los archivos de un directorio de punto de control o cambiar a una nueva ubicación de punto de control, la siguiente ejecución de la consulta comienza como nueva.

Cada consulta debe tener una ubicación de punto de control diferente. Nunca use la misma ubicación para varias consultas.

Habilitación de puntos de control mejorados para consultas de Structured Streaming

Debe especificar la opción checkpointLocation antes de ejecutar una consulta de streaming, como en el ejemplo siguiente:

Python

(df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")
)

Scala

df.writeStream
  .option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
  .toTable("catalog.schema.table")

Nota:

Algunos receptores, como la salida de display() en cuadernos y el receptor memory, generan automáticamente una ubicación de punto de control temporal si se omite esta opción. El uso de ubicaciones de los puntos de control temporales no garantiza la tolerancia a errores ni la coherencia de los datos y es posible que no se limpien correctamente. Databricks recomienda especificar siempre una ubicación de punto de control para estos receptores.

Recuperación después de realizar cambios en una consulta de Structured Streaming

Hay limitaciones en cuanto a los cambios de una consulta de streaming que se permiten entre reinicios desde la misma ubicación del punto de control. Estos son algunos cambios que no están permitidos o el efecto del cambio no está bien definido. Para todos ellos:

  • El término permitido significa que puede realizar el cambio especificado, pero si la semántica de su efecto está bien definida depende de la consulta y del cambio.
  • El término no permitido significa que no debe realizar el cambio especificado, ya que es probable que la consulta reiniciada tenga errores impredecibles.
  • sdf representa un dataframe/conjunto de datos de streaming generado con sparkSession.readStream.

Tipos de cambios en las consultas de Structured Streaming

  • Cambios en el número o tipo (es decir, origen diferente) de los orígenes de entrada: no se permite.
  • Cambios en los parámetros de los orígenes de entrada: si se permite y si la semántica del cambio está bien definida depende del origen y de la consulta. Estos son algunos ejemplos.
    • Se permiten la adición, eliminación y modificación de los límites de velocidad:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      to

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Por lo general, no se permiten cambios en los artículos y archivos suscritos, ya que los resultados son impredecibles: spark.readStream.format("kafka").option("subscribe", "article")a spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Cambios en el intervalo del desencadenador: puede cambiar los desencadenadores entre lotes incrementales e intervalos de tiempo. Consulte Cambio de intervalos del desencadenador entre ejecuciones.
  • Cambios en el tipo de receptor de salida: se permiten cambios entre algunas combinaciones específicas de receptores. Esto debe comprobarse caso a caso. Estos son algunos ejemplos.
    • Se permite el paso del receptor de archivos al receptor de Kafka. Kafka solo verá los datos nuevos.
    • No se permite el paso del receptor de Kafka al receptor de archivos.
    • Se ha cambiado el receptor de Kafka a foreach, o viceversa.
  • Cambios en los parámetros del receptor de salida: si se permite y si la semántica del cambio está bien definida dependen del receptor y de la consulta. Estos son algunos ejemplos.
    • No se permiten cambios en el directorio de salida de un receptor de archivos: sdf.writeStream.format("parquet").option("path", "/somePath") a sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Se permiten cambios en el tema de salida: sdf.writeStream.format("kafka").option("topic", "topic1") a sdf.writeStream.format("kafka").option("topic", "topic2")
    • Se permiten cambios en el receptor foreach definido por el usuario (es decir, el códigoForeachWriter), pero la semántica del cambio depende del código.
  • Cambios en las operaciones similares a proyección, filtro o mapa: se permiten algunos casos. Por ejemplo:
    • Se permite la adición o eliminación de filtros: sdf.selectExpr("a") a sdf.where(...).selectExpr("a").filter(...).
    • Se permiten cambios en las proyecciones con el mismo esquema de salida: sdf.selectExpr("stringColumn AS json").writeStream a sdf.select(to_json(...).as("json")).writeStream.
    • Los cambios en las proyecciones con un esquema de salida diferente se permiten condicionalmente: sdf.selectExpr("a").writeStream a sdf.selectExpr("b").writeStream solo se permite si el receptor de salida permite el cambio de esquema de "a" a "b".
  • Cambios en operaciones con estado: algunas operaciones de las consultas de streaming deben mantener los datos de estado en orden para actualizar continuamente el resultado. Structured Streaming crea automáticamente puntos de comprobación de los datos de estado en el almacenamiento tolerante a errores (por ejemplo, DBFS o Azure Blob Storage) y los restaura después del reinicio. Sin embargo, esto supone que el esquema de los datos de estado sigue siendo el mismo en los distintos reinicios, lo que significa que no se permiten cambios (es decir, adiciones, eliminaciones o modificaciones de esquema) en las operaciones con estado de una consulta de streaming entre reinicios. Esta es la lista de operaciones con estado cuyo esquema no se debe cambiar de un reinicio a otro para garantizar la recuperación del estado:
    • Agregación de streaming: por ejemplo, sdf.groupBy("a").agg(...). No se permite ningún cambio en el número o tipo de claves o agregados de agrupación.
    • Desduplicación de streaming: por ejemplo, sdf.dropDuplicates("a"). No se permite ningún cambio en el número o tipo de claves o agregados de agrupación.
    • Combinación flujo-flujo: por ejemplo, sdf1.join(sdf2, ...) (es decir, ambas entradas se generan con sparkSession.readStream). No se permiten cambios en el esquema ni en las columnas de combinación de igualdad. No se permiten cambios en el tipo de combinación (externo o interno). Otros cambios en la condición de combinación están mal definidos.
    • Operación arbitraria con estado: por ejemplo, sdf.groupByKey(...).mapGroupsWithState(...) o sdf.groupByKey(...).flatMapGroupsWithState(...). No se permite ningún cambio en el esquema del estado definido por el usuario y el tipo de tiempo de espera. Se permite cualquier cambio dentro de la función de asignación de estado definida por el usuario, pero el efecto semántico del cambio depende de la lógica definida por el usuario. Si realmente desea admitir cambios de esquema de estado, puede codificar o descodificar explícitamente las estructuras de datos de estado complejo en bytes mediante un esquema de codificación/descodificación que admita la migración de esquemas. Por ejemplo, si guarda el estado como bytes codificados en Avro, puede cambiar el esquema de estado de Avro durante el tiempo entre los reinicios de consulta, ya que esto restaurar el estado binario.