Tutorial: Validación de datos mediante SemPy y Great Expectations (GX)

En este tutorial, aprenderá a usar SemPy junto con Great Expectations (GX) para realizar la validación de datos en modelos semánticos de Power BI.

En este tutorial se muestra cómo realizar las siguientes acciones:

  • Valide las restricciones en un conjunto de datos del área de trabajo de Fabric con el origen de datos Fabric de Great Expectations (basado en el vínculo semántico).
    • Configure un contexto de datos de GX, recursos de datos y expectativas.
    • Visualización de los resultados de validación con un punto de control de GX.
  • Use el vínculo semántico para analizar datos sin procesar.

Requisitos previos

  • Seleccione Áreas de trabajo en el panel de navegación izquierdo para buscar y seleccionar el área de trabajo. Esta área de trabajo se convertirá en el área de trabajo actual.
  • Descargue el archivo de ejemplo de análisis de venta minorista PBIX.pbix.
  • En el área de trabajo, use el botón Cargar para cargar el archivo de ejemplo de análisis de venta minorista PBIX.pbix en el área de trabajo.

Seguir en el cuaderno

great_expectations_tutorial.ipynb es el cuaderno que acompaña a este tutorial.

Para abrir el cuaderno complementario para este tutorial, siga las instrucciones en Preparación del sistema para los tutoriales de ciencia de datos para importar el cuaderno en el área de trabajo.

Si prefiere copiar y pegar el código de esta página, puede crear un cuaderno nuevo.

Asegúrese de adjuntar una instancia de LakeHouse al cuaderno antes de empezar a ejecutar código.

Configuración del cuaderno

En esta sección, configurará un entorno de cuaderno con los módulos y los datos necesarios.

  1. Instale SemPy y las bibliotecas Great Expectations pertinentes desde PyPI mediante la funcionalidad de instalación en línea de %pip en el cuaderno.
# install libraries
%pip install semantic-link great-expectations great_expectations_experimental great_expectations_zipcode_expectations

# load %%dax cell magic
%load_ext sempy
  1. Realice las importaciones necesarias de módulos que necesitará más adelante:
import great_expectations as gx
from great_expectations.expectations.expectation import ExpectationConfiguration
from great_expectations_zipcode_expectations.expectations import expect_column_values_to_be_valid_zip5

Configuración del contexto de datos de GX y el origen de datos

Para empezar a trabajar con Great Expectations, primero debe configurar un contexto de datos de GX. El contexto actúa como punto de entrada para las operaciones de GX y contiene todas las configuraciones pertinentes.

context = gx.get_context()

Ahora puede agregar el conjunto de datos de Fabric a este contexto como origen de datos para empezar a interactuar con los datos. En este tutorial se usa un modelo semántico de ejemplo estándar de Power BI archivo de ejemplo de análisis de venta minorista .pbix.

ds = context.sources.add_fabric_powerbi("Retail Analysis Data Source", dataset="Retail Analysis Sample PBIX")

Especificación de recursos de datos

Defina recursos de datos para especificar el subconjunto de datos con los que desea trabajar. El recurso puede ser tan sencillo como tablas completas o ser tan complejo como una consulta personalizada de expresiones de análisis de datos (DAX).

Aquí, agregará varios recursos:

Tabla de Power BI

Agregue una tabla de Power BI como un recurso de datos.

ds.add_powerbi_table_asset("Store Asset", table="Store")

Medida de Power BI

Si el conjunto de datos contiene medidas preconfiguradas, agregue las medidas como recursos siguiendo una API similar a la de SemPy evaluate_measure.

ds.add_powerbi_measure_asset(
    "Total Units Asset",
    measure="TotalUnits",
    groupby_columns=["Time[FiscalYear]", "Time[FiscalMonth]"]
)

DAX

Si desea definir sus propias medidas o tener más control sobre filas específicas, puede agregar un recurso DAX con una consulta DAX personalizada. En este caso, definimos una medida Total Units Ratio dividiendo dos medidas existentes.

ds.add_powerbi_dax_asset(
    "Total Units YoY Asset",
    dax_string=
    """
    EVALUATE SUMMARIZECOLUMNS(
        'Time'[FiscalYear],
        'Time'[FiscalMonth],
        "Total Units Ratio", DIVIDE([Total Units This Year], [Total Units Last Year])
    )    
    """
)

Consulta de DMV

En algunos casos, puede resultar útil usar la vista de administración dinámica (DMV) como parte del proceso de validación de datos. Por ejemplo, puede realizar un seguimiento del número de infracciones de integridad referencial dentro del conjunto de datos. Para obtener más información, consulte Limpiar datos = informes más rápidos.

ds.add_powerbi_dax_asset(
    "Referential Integrity Violation",
    dax_string=
    """
    SELECT
        [Database_name],
        [Dimension_Name],
        [RIVIOLATION_COUNT]
    FROM $SYSTEM.DISCOVER_STORAGE_TABLES
    """
)

Expectativas

Para agregar restricciones específicas a los recursos, primero debe configurar Expectation Suites. Después de agregar expectativas individuales a cada conjunto, puede actualizar el contexto de datos configurado al principio con el nuevo conjunto de aplicaciones. Para obtener una lista completa de las expectativas disponibles, consulte la galería de expectativas de GX.

Comience agregando "Conjunto de ventas directas" con dos expectativas:

  • un código postal válido
  • una tabla con recuento de filas entre 80 y 200
suite_store = context.add_expectation_suite("Retail Store Suite")

suite_store.add_expectation(ExpectationConfiguration("expect_column_values_to_be_valid_zip5", { "column": "PostalCode" }))
suite_store.add_expectation(ExpectationConfiguration("expect_table_row_count_to_be_between", { "min_value": 80, "max_value": 200 }))

context.add_or_update_expectation_suite(expectation_suite=suite_store)

TotalUnits Medida

Agregue "Conjunto de medidas de ventas directas" con una expectativa:

  • Los valores de columna deben ser mayores que 50 000
suite_measure = context.add_expectation_suite("Retail Measure Suite")
suite_measure.add_expectation(ExpectationConfiguration(
    "expect_column_values_to_be_between", 
    {
        "column": "TotalUnits",
        "min_value": 50000
    }
))

context.add_or_update_expectation_suite(expectation_suite=suite_measure)

Total Units Ratio DAX

Agregue "Conjunto de DAX de ventas directas" con una expectativa:

  • Los valores de columna para la relación total de unidades deben estar comprendidos entre 0,8 y 1,5
suite_dax = context.add_expectation_suite("Retail DAX Suite")
suite_dax.add_expectation(ExpectationConfiguration(
    "expect_column_values_to_be_between", 
    {
        "column": "[Total Units Ratio]",
        "min_value": 0.8,
        "max_value": 1.5
    }
))

context.add_or_update_expectation_suite(expectation_suite=suite_dax)

Infracciones de integridad referencial (DMV)

Agregue "Conjunto de DMV de ventas directas" con una expectativa:

  • el RIVIOLATION_COUNT debe ser 0
suite_dmv = context.add_expectation_suite("Retail DMV Suite")
# There should be no RI violations
suite_dmv.add_expectation(ExpectationConfiguration(
    "expect_column_values_to_be_in_set", 
    {
        "column": "RIVIOLATION_COUNT",
        "value_set": [0]
    }
))
context.add_or_update_expectation_suite(expectation_suite=suite_dmv)

Validation

Para ejecutar realmente las expectativas especificadas con los datos, cree primero un punto de control y agréguelo al contexto. Para obtener más información sobre la configuración del punto de control, consulte Flujo de trabajo de validación de datos.

checkpoint_config = {
    "name": f"Retail Analysis Checkpoint",
    "validations": [
        {
            "expectation_suite_name": "Retail Store Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Store Asset",
            },
        },
        {
            "expectation_suite_name": "Retail Measure Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Total Units Asset",
            },
        },
        {
            "expectation_suite_name": "Retail DAX Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Total Units YoY Asset",
            },
        },
        {
            "expectation_suite_name": "Retail DMV Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Referential Integrity Violation",
            },
        },
    ],
}
checkpoint = context.add_checkpoint(
    **checkpoint_config
)

Ahora ejecute el punto de control y extraiga los resultados como un DataFrame de Pandas para un formato sencillo.

result = checkpoint.run()

Procese e imprima los resultados.

import pandas as pd

data = []

for run_result in result.run_results:
    for validation_result in result.run_results[run_result]["validation_result"]["results"]:
        row = {
            "Batch ID": run_result.batch_identifier,
            "type": validation_result.expectation_config.expectation_type,
            "success": validation_result.success
        }

        row.update(dict(validation_result.result))
        
        data.append(row)

result_df = pd.DataFrame.from_records(data)    

result_df[["Batch ID", "type", "success", "element_count", "unexpected_count", "partial_unexpected_list"]]

En la tabla se muestran los resultados de validación.

A partir de estos resultados, puede ver que todas las expectativas pasaron la validación, excepto el "Recurso de variación anual del total de unidades" que definió a través de una consulta DAX personalizada.

Diagnóstico

Mediante el vínculo semántico, puede capturar los datos de origen para comprender qué años exactos están fuera del intervalo. El vínculo semántico proporciona una magia insertada para ejecutar consultas DAX. Use el vínculo semántico para ejecutar la misma consulta que ha pasado al recurso de datos de GX y visualizar los valores resultantes.

%%dax "Retail Analysis Sample PBIX"

EVALUATE SUMMARIZECOLUMNS(
    'Time'[FiscalYear],
    'Time'[FiscalMonth],
    "Total Units Ratio", DIVIDE([Total Units This Year], [Total Units Last Year])
)

En la tabla se muestran los resultados del resumen de consultas de DAX.

Guarde estos resultados en un DataFrame.

df = _

Trace los resultados.

import matplotlib.pyplot as plt

df["Total Units % Change YoY"] = (df["[Total Units Ratio]"] - 1)

df.set_index(["Time[FiscalYear]", "Time[FiscalMonth]"]).plot.bar(y="Total Units % Change YoY")

plt.axhline(0)

plt.axhline(-0.2, color="red", linestyle="dotted")
plt.axhline( 0.5, color="red", linestyle="dotted")

None

En el trazado se muestran los resultados del resumen de consultas de DAX.

Desde el trazado, puede ver que abril y julio estaban ligeramente fuera del intervalo y, a continuación, puede realizar más pasos para investigar.

Almacenamiento de la configuración de GX

A medida que los datos del conjunto de datos cambian con el tiempo, es posible que quiera volver a ejecutar las validaciones de GX que acaba de realizar. Actualmente, el contexto de datos (que contiene los recursos de datos conectados, los conjuntos de expectativas y el punto de control) reside efímeramente, pero se puede convertir en un contexto de archivo para su uso futuro. Como alternativa, puede crear una instancia de un contexto de archivo (consulte Creación de instancias de un contexto de datos).

context = context.convert_to_file_context()

Ahora que ha guardado el contexto, copie el directorio gx en el almacén de lago.

Importante

Esta celda supone que ha agregado un almacén de lago al cuaderno. Si no hay ningún almacén de lago adjunto, no verá un error, pero tampoco podrá obtener el contexto más adelante. Si agrega un almacén de lago ahora, el kernel se reiniciará, por lo que tendrá que volver a ejecutar todo el cuaderno para volver a este punto.

# copy GX directory to attached lakehouse
!cp -r gx/ /lakehouse/default/Files/gx

Ahora, se pueden crear contextos futuros con context = gx.get_context(project_root_dir="<your path here>") para usar todas las configuraciones de este tutorial.

Por ejemplo, en un nuevo cuaderno, adjunte el mismo almacén de lago y use context = gx.get_context(project_root_dir="/lakehouse/default/Files/gx") para recuperar el contexto.

Consulte otros tutoriales para el vínculo semántico/SemPy: