Introducción a Delta Lake de Linux Foundation

Con el fin de proporcionar más claridad, este artículo se ha adaptado de su homólogo original, que se encuentra aquí. Este artículo le ayuda a explorar rápidamente las principales características de Delta Lake. En el artículo se proporcionan fragmentos de código que muestran cómo leer tablas de Delta Lake y escribir en ellas desde consultas interactivas, por lotes y de streaming. Los fragmentos de código también están disponibles en un conjunto de cuadernos PySpark aquí, Scala aquí y C# aquí

Esto es lo que trataremos:

  • Creación de una tabla
  • Lectura de datos
  • Actualización de los datos de una tabla
  • Sobrescritura de los datos de una tabla
  • Actualización condicional sin sobrescribir
  • Lectura de las versiones anteriores de datos mediante Viaje en el tiempo
  • Escritura de un flujo de datos en una tabla
  • Lectura de un flujo de cambios de una tabla
  • Soporte técnico de SQL

Configuración

Asegúrese de modificar el siguiente código según corresponda a su entorno.

import random

session_id = random.randint(0,1000000)
delta_table_path = "/delta/delta-table-{0}".format(session_id)

delta_table_path
var sessionId = (new Random()).Next(10000000);
var deltaTablePath = $"/delta/delta-table-{sessionId}";

deltaTablePath
val sessionId = scala.util.Random.nextInt(1000000)
val deltaTablePath = s"/delta/delta-table-$sessionId";

El resultado es:

"/delta/delta-table-335323"

Creación de una tabla

Para crear una tabla de Delta Lake, escriba un DataFrame en el formato Delta. Puede cambiar el formato Parquet, CSV, JSON, etc., a Delta.

El siguiente código muestra cómo crear una tabla de Delta Lake mediante el esquema deducido de su DataFrame.

data = spark.range(0,5)
data.show()
data.write.format("delta").save(delta_table_path)
var data = spark.Range(0,5);
data.Show();
data.Write().Format("delta").Save(deltaTablePath);
val data = spark.range(0, 5)
data.show
data.write.format("delta").save(deltaTablePath)

El resultado es:

ID
0
1
2
3
4

Lectura de datos

Para leer los datos de su tabla de Delta Lake, especifique la ruta de acceso a los archivos y el formato Delta.

df = spark.read.format("delta").load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Load(deltaTablePath);
df.Show()
val df = spark.read.format("delta").load(deltaTablePath)
df.show()

El resultado es:

ID
1
3
4
0
2

El orden de los resultados es distinto del anterior, ya que antes de generar los resultados no había ningún orden especificado explícitamente.

Actualización de los datos de una tabla

Delta Lake admite varias operaciones para modificar tablas mediante las API DataFrame estándar. Estas operaciones son una de las mejoras que agrega el formato delta. En el ejemplo siguiente se ejecuta un trabajo por lotes para sobrescribir los datos de la tabla.

data = spark.range(5,10)
data.write.format("delta").mode("overwrite").save(delta_table_path)
df.show()
var data = spark.Range(5,10);
data.Write().Format("delta").Mode("overwrite").Save(deltaTablePath);
df.Show();
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save(deltaTablePath)
df.show()

El resultado es:

ID
7
8
5
9
6

Aquí se puede ver que los cinco registros se han actualizado para contener nuevos valores.

Guardar como tablas de catálogo

Delta Lake puede escribir en tablas de catálogo administradas o externas.

data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql("CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{0}'".format(delta_table_path))
spark.sql("SHOW TABLES").show()
data.Write().Format("delta").SaveAsTable("ManagedDeltaTable");
spark.Sql($"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '{deltaTablePath}'");
spark.Sql("SHOW TABLES").Show();
data.write.format("delta").saveAsTable("ManagedDeltaTable")
spark.sql(s"CREATE TABLE ExternalDeltaTable USING DELTA LOCATION '$deltaTablePath'")
spark.sql("SHOW TABLES").show

El resultado es:

database tableName isTemporary
default externaldeltatable false
default manageddeltatable false

Con este código, ha creado una tabla en el catálogo a partir de una dataframe existente, que se denomina tabla administrada. Luego, ha definido una nueva tabla externa en el catálogo que utiliza una ubicación existente, que se denomina tabla externa. En la salida puede ver que ambas tablas, con independencia de cómo se hayan creado, se muestran en el catálogo.

Ahora puede ver las propiedades extendidas de las dos tablas

spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ManagedDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ManagedDeltaTable").show(truncate=false)

El resultado es:

col_name data_type comment
id bigint null
Información detallada de la tabla
Base de datos default
Tabla manageddeltatable
Propietario trusted-service-user
Hora de creación Sábado, 25 de abril 00:35:34 UTC 2020
Último acceso Jueves, 01 de enero 00:00:00 UTC 1970
Creado por Spark 2.4.4.2.6.99.201-11401300
Tipo ADMINISTRADO
Proveedor delta
Propiedades de tabla [transient_lastDdlTime=1587774934]
Estadísticas 2407 bytes
Location abfss://data@<data lake>.dfs.core.windows.net/synapse/workspaces/<workspace name>/warehouse/manageddeltatable
Biblioteca Serde org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Propiedades de almacenamiento [serialization.format=1]
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=False)
spark.Sql("DESCRIBE EXTENDED ExternalDeltaTable").Show(truncate: 0);
spark.sql("DESCRIBE EXTENDED ExternalDeltaTable").show(truncate=false)

El resultado es:

col_name data_type comment
id bigint null
Información detallada de la tabla
Base de datos default
Tabla externaldeltatable
Propietario trusted-service-user
Hora de creación Sábado, 25 de abril 00:35:38 UTC 2020
Último acceso Jueves, 01 de enero 00:00:00 UTC 1970
Creado por Spark 2.4.4.2.6.99.201-11401300
Tipo EXTERNAL
Proveedor DELTA
Propiedades de tabla [transient_lastDdlTime=1587774938]
Location abfss://data@<data lake>.dfs.core.windows.net/delta/delta-table-587152
Biblioteca Serde org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat org.apache.hadoop.mapred.SequenceFileInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
Propiedades de almacenamiento [serialization.format=1]

Actualización condicional sin sobrescribir

Delta Lake proporciona API de programación para la actualización condicional, la eliminación y la combinación (este comando se conoce normalmente como upsert) de datos en las tablas.

from delta.tables import *
from pyspark.sql.functions import *

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
delta_table.toDF().show()
using Microsoft.Spark.Extensions.Delta;
using Microsoft.Spark.Extensions.Delta.Tables;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.Functions;

var deltaTable = DeltaTable.ForPath(deltaTablePath);

deltaTable.Update(
  condition: Expr("id % 2 == 0"),
  set: new Dictionary<string, Column>(){{ "id", Expr("id + 100") }});
deltaTable.ToDF().Show();
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath(deltaTablePath)

// Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = Map("id" -> expr("id + 100")))
deltaTable.toDF.show

El resultado es:

ID
106
108
5
7
9

Acaba de agregar 100 a cada identificador par.

delta_table.delete("id % 2 == 0")
delta_table.toDF().show()
deltaTable.Delete(condition: Expr("id % 2 == 0"));
deltaTable.ToDF().Show();
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF.show

El resultado es:

ID
5
7
9

Observe que se han eliminado todas las filas pares.

new_data = spark.range(0,20).alias("newData")

delta_table.alias("oldData")\
    .merge(new_data.alias("newData"), "oldData.id = newData.id")\
    .whenMatchedUpdate(set = { "id": lit("-1")})\
    .whenNotMatchedInsert(values = { "id": col("newData.id") })\
    .execute()

delta_table.toDF().show(100)
var newData = spark.Range(20).As("newData");

deltaTable
    .As("oldData")
    .Merge(newData, "oldData.id = newData.id")
    .WhenMatched()
        .Update(new Dictionary<string, Column>() {{"id", Lit("-1")}})
    .WhenNotMatched()
        .Insert(new Dictionary<string, Column>() {{"id", Col("newData.id")}})
    .Execute();

deltaTable.ToDF().Show(100);
val newData = spark.range(0, 20).toDF

deltaTable.as("oldData").
  merge(
    newData.as("newData"),
    "oldData.id = newData.id").
  whenMatched.
  update(Map("id" -> lit(-1))).
  whenNotMatched.
  insert(Map("id" -> col("newData.id"))).
  execute()

deltaTable.toDF.show()

El resultado es:

ID
18
15
19
2
1
6
8
3
-1
10
13
0
16
4
-1
12
11
14
-1
17

Aquí tiene una combinación de los datos existentes. A los datos existentes se les ha asignado el valor-1 en la ruta de acceso del código de actualización (WhenMatched). También se agregaron los nuevos datos que se crearon en al principio del fragmento a través de la ruta de acceso al código de inserción (WhenNotMatched).

Historial

Delta Lake tiene la capacidad de permitir examinar el historial de una tabla. Es decir, los cambios que se realizaron en la tabla Delta subyacente. En la celda siguiente se muestra lo sencillo que es inspeccionar el historial.

delta_table.history().show(20, 1000, False)
deltaTable.History().Show(20, 1000, false);
deltaTable.history.show(false)

El resultado es:

version timestamp userId userName operation operationParameters trabajo notebook clusterId readVersion isolationLevel isBlindAppend
4 25-04-2020 00:36:27 null null MERGE [predicate -> (oldData.ID = newData.ID)] null null null 3 null false
3 25-04-2020 00:36:08 null null Delete [predicate -> ["((ID % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] null null null 2 null false
2 25-04-2020 00:35:51 null null UPDATE [predicate -> ((ID#744L % cast(2 as bigint)) = cast(0 as bigint))] null null null 1 null false
1 25-04-2020 00:35:05 null null WRITE [mode -> Overwrite, partitionBy -> []] null null null 0 null false
0 25-04-2020 00:34:34 null null WRITE [mode -> ErrorIfExists, partitionBy -> []] null null null null null true

Aquí puede ver todas las modificaciones realizadas en los fragmentos de código anteriores.

Lectura de las versiones anteriores de datos mediante Viaje en el tiempo

La característica Viaje en el tiempo permite consultar instantáneas anteriores de la tabla de Delta Lake. Si desea acceder a los datos que ha sobrescrito, puede consultar una instantánea de la tabla antes de sobrescribir el primer conjunto de datos mediante la opción versionAsOf.

Una vez que ejecute la celda siguiente, debería ver el primer conjunto de datos de antes de que se sobrescribiera. Time Travel es una característica eficaz que aprovecha la eficacia del registro de transacciones de Delta Lake para acceder a los datos que ya no están en la tabla. La eliminación de la opción de la versión 0 (o la especificación de la versión 1) le permitiría volver a ver los datos más recientes. Para más información, consulte Consulta de una instantánea anterior de una tabla.

df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
df.show()
var df = spark.Read().Format("delta").Option("versionAsOf", 0).Load(deltaTablePath);
df.Show();
val df = spark.read.format("delta").option("versionAsOf", 0).load(deltaTablePath)
df.show()

El resultado es:

ID
0
1
4
3
2

Aquí puede ver que ha vuelto a la versión más antigua de los datos.

Escritura de un flujo de datos en una tabla

También puede escribir en una tabla de Delta Lake mediante Structured Streaming de Spark. El registro de transacciones de Delta Lake garantiza un procesamiento exactamente una vez, incluso cuando haya otras secuencias o consultas por lotes que se ejecutan simultáneamente en la tabla. De forma predeterminada, las secuencias se ejecutan en modo Anexar, que agrega nuevos registros a la tabla.

Para más información sobre la integración de Delta Lake en Structured Streaming, consulte Lecturas y escrituras de streaming de tablas.

En las celdas siguientes, esto es lo que hacemos:

  • En la celda 30, mostrar los datos recién anexados
  • En la celda 31, comprobar el historial
  • En la celda 32, detener el trabajo de Structured Streaming
  • En la celda 33, inspeccionar el historial <--Observará que los anexos se han detenido

En primer lugar, va a configurar un sencillo trabajo de Spark Streaming para generar una secuencia y hacer que el trabajo escriba en una tabla Delta.

streaming_df = spark.readStream.format("rate").load()
stream = streaming_df\
    .selectExpr("value as id")\
    .writeStream\
    .format("delta")\
    .option("checkpointLocation", "/tmp/checkpoint-{0}".format(session_id))\
    .start(delta_table_path)
var streamingDf = spark.ReadStream().Format("rate").Load();
var stream = streamingDf.SelectExpr("value as id").WriteStream().Format("delta").Option("checkpointLocation", $"/tmp/checkpoint-{sessionId}").Start(deltaTablePath);
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", s"/tmp/checkpoint-$sessionId").start(deltaTablePath)

Lectura de un flujo de cambios de una tabla

Mientras el flujo se escribe en la tabla de Delta Lake, también puede leer dicha tabla como un origen de streaming. Por ejemplo, puede iniciar otra consulta de streaming que imprima todos los cambios realizados en la tabla de Delta Lake.

delta_table.toDF().sort(col("id").desc()).show(100)
deltaTable.ToDF().Sort(Col("id").Desc()).Show(100);
deltaTable.toDF.sort($"id".desc).show

El resultado es:

ID
19
18
17
16
15
14
13
12
11
10
8
6
4
3
2
1
0
-1
-1
-1
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(20, 1000, False)
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(20, 1000, false);
deltaTable.history.show

El resultado es:

version timestamp operation operationParameters readVersion
5 25-04-2020 00:37:09 STREAMING UPDATE [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 25-04-2020 00:36:27 MERGE [predicate -> (oldData.id = newData.id)] 3
3 25-04-2020 00:36:08 Delete [predicate -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 25-04-2020 00:35:51 UPDATE [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 25-04-2020 00:35:05 WRITE [mode -> Overwrite, partitionBy -> []] 0
0 25-04-2020 00:34:34 WRITE [mode -> ErrorIfExists, partitionBy -> []] null

Aquí va a quitar algunas de las columnas menos interesantes para simplificar la experiencia de visualización de la vista del historial.

stream.stop()
delta_table.history().drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").show(100, 1000, False)
stream.Stop();
deltaTable.History().Drop("userId", "userName", "job", "notebook", "clusterId", "isolationLevel", "isBlindAppend").Show(100, 1000, false);
stream.stop
deltaTable.history.show

El resultado es:

version timestamp operation operationParameters readVersion
5 25-04-2020 00:37:09 STREAMING UPDATE [outputMode -> Append, queryId -> d26b4f8a-7e5a-44f2-a5fb-23a7bd02aef7, epochId -> 0] 4
4 25-04-2020 00:36:27 MERGE [predicate -> (oldData.id = newData.id)] 3
3 25-04-2020 00:36:08 Delete [predicate -> ["((id % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]] 2
2 25-04-2020 00:35:51 UPDATE [predicate -> ((id#744L % cast(2 as bigint)) = cast(0 as bigint))] 1
1 25-04-2020 00:35:05 WRITE [mode -> Overwrite, partitionBy -> []] 0
0 25-04-2020 00:34:34 WRITE [mode -> ErrorIfExists, partitionBy -> []] null

Conversión de Parquet en Delta

Puede realizar una conversión en contexto desde el formato Parquet a Delta.

Aquí va a probar si la tabla existente está en formato Delta o no.

parquet_path = "/parquet/parquet-table-{0}".format(session_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetPath = $"/parquet/parquet-table-{sessionId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath)
val parquetPath = s"/parquet/parquet-table-$sessionId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)

El resultado es:

Falso

Ahora va a convertir los datos al formato Delta y a comprobar que ha funcionado.

DeltaTable.convertToDelta(spark, "parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
DeltaTable.ConvertToDelta(spark, $"parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath)
DeltaTable.convertToDelta(spark, s"parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

El resultado es:

Verdadero

Soporte técnico de SQL

Delta admite comandos de la utilidad de tablas a través de SQL. SQL se puede usar para:

  • Obtener el historial de una instancia de DeltaTable.
  • Crear el vacío en una instancia de DeltaTable.
  • Convertir un archivo Parquet en Delta.
spark.sql("DESCRIBE HISTORY delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"DESCRIBE HISTORY delta.`{deltaTablePath}`").Show();
spark.sql(s"DESCRIBE HISTORY delta.`$deltaTablePath`").show()

El resultado es:

version timestamp userId userName operation operationParameters trabajo notebook clusterId readVersion isolationLevel isBlindAppend
5 25-04-2020 00:37:09 null null STREAMING UPDATE [outputMode -> Ap... null null null 4 null true
4 25-04-2020 00:36:27 null null MERGE [predicate -> (ol... null null null 3 null false
3 25-04-2020 00:36:08 null null Delete [predicate -> ["(... null null null 2 null false
2 25-04-2020 00:35:51 null null UPDATE [predicate -> ((i... null null null 1 null false
1 25-04-2020 00:35:05 null null WRITE [mode -> Overwrit... null null null 0 null false
0 25-04-2020 00:34:34 null null WRITE [mode -> ErrorIfE... null null null null null true
spark.sql("VACUUM delta.`{0}`".format(delta_table_path)).show()
spark.Sql($"VACUUM delta.`{deltaTablePath}`").Show();
spark.sql(s"VACUUM delta.`$deltaTablePath`").show()

El resultado es:

path
abfss://data@arca...

Ahora, comprobará que una tabla no es una tabla de formato Delta. A continuación, convertirá la tabla al formato Delta mediante Spark SQL y confirmará que se ha convertido correctamente.

parquet_id = random.randint(0,1000)
parquet_path = "/parquet/parquet-table-{0}-{1}".format(session_id, parquet_id)
data = spark.range(0,5)
data.write.parquet(parquet_path)
DeltaTable.isDeltaTable(spark, parquet_path)
spark.sql("CONVERT TO DELTA parquet.`{0}`".format(parquet_path))
DeltaTable.isDeltaTable(spark, parquet_path)
var parquetId =  (new Random()).Next(10000000);
var parquetPath = $"/parquet/parquet-table-{sessionId}-{parquetId}";
var data = spark.Range(0,5);
data.Write().Parquet(parquetPath);
DeltaTable.IsDeltaTable(parquetPath);
spark.Sql($"CONVERT TO DELTA parquet.`{parquetPath}`");
DeltaTable.IsDeltaTable(parquetPath);
val parquetId = scala.util.Random.nextInt(1000)
val parquetPath = s"/parquet/parquet-table-$sessionId-$parquetId"
val data = spark.range(0,5)
data.write.parquet(parquetPath)
DeltaTable.isDeltaTable(parquetPath)
spark.sql(s"CONVERT TO DELTA parquet.`$parquetPath`")
DeltaTable.isDeltaTable(parquetPath)

El resultado es:

Verdadero

Para ver la documentación completa, consulte la página de la documentación de Delta Lake.

Para obtener más información, consulte Proyecto Delta Lake.

Pasos siguientes