チュートリアル: SemPy と Great Expectations (GX) を使用してデータを検証する

このチュートリアルでは、SemPy を Great Expectations (GX) と共に使用して、Power BI セマンティック モデルでデータ検証を実行する方法について説明します。

このチュートリアルでは、次の操作方法について説明します。

  • Great Expectations の Fabric データ ソース (セマンティック リンク上に構築) を使用して、Fabric ワークスペース内のデータセットに対する制約を検証します。
    • GX データ コンテキスト、データ資産、期待を構成します。
    • GX チェックポイントを使用して検証結果を表示します。
  • セマンティック リンクを使用して生データを分析します。

前提条件

  • 左側のナビゲーション ペインから [ワークスペース] を選択して、お使いのワークスペースを見つけて選択します。 このワークスペースが現在のワークスペースになります。
  • Retail Analysis Sample PBIX.pbix ファイルをダウンロードします。
  • ワークスペースで、[アップロード] ボタンを使用して、Retail Analysis Sample PBIX.pbix ファイルをワークスペースにアップロードします。

ノートブックで作業を進める

great_expectations_tutorial.ipynb は、このチュートリアルに付属するノートブックです。

このチュートリアルに付随するノートブックを開くには、「データ サイエンス用にシステムを準備する」チュートリアル の手順に従い、ノートブックをお使いのワークスペースにインポートします。

このページからコードをコピーして貼り付ける場合は、[新しいノートブックを作成する] ことができます。

コードの実行を開始する前に、必ずレイクハウスをノートブックにアタッチしてください。

ノートブックを設定する

このセクションでは、必要なモジュールとデータを含むノートブック環境を設定します。

  1. ノートブック内の %pip インライン インストール機能を使用して、PyPI から SemPy と関連する Great Expectations ライブラリをインストールします。
# install libraries
%pip install semantic-link great-expectations great_expectations_experimental great_expectations_zipcode_expectations

# load %%dax cell magic
%load_ext sempy
  1. 後で必要になるモジュールの必要なインポートを実行します。
import great_expectations as gx
from great_expectations.expectations.expectation import ExpectationConfiguration
from great_expectations_zipcode_expectations.expectations import expect_column_values_to_be_valid_zip5

GX データ コンテキストとデータ ソースを設定する

Great Expectations を開始するには、まず GX データ コンテキストを設定する必要があります。 コンテキストは GX 操作のエントリ ポイントとして機能し、関連するすべての構成を保持します。

context = gx.get_context()

これで、このコンテキストに Fabric データセットを データ ソース として追加して、データの操作を開始できるようになりました。 このチュートリアルでは、標準的な Power BI サンプル セマンティック モデル Retail Analysis Sample .pbix ファイルを使用します。

ds = context.sources.add_fabric_powerbi("Retail Analysis Data Source", dataset="Retail Analysis Sample PBIX")

データ資産の指定

データ資産を定義して、操作するデータのサブセットを指定します。 資産は、完全なテーブルと同じくらい単純にすることも、カスタム Data Analysis Expressions (DAX) クエリと同じくらい複雑にすることもできます。

ここでは、複数の資産を追加します。

  • Power BI テーブル
  • Power BI メジャー
  • カスタム DAX クエリ
  • 動的管理ビュー (DMV) クエリ

Power BI テーブル

Power BI テーブルをデータ資産として追加します。

ds.add_powerbi_table_asset("Store Asset", table="Store")

Power BI メジャー

データセットに構成済みのメジャーが含まれている場合は、SemPy の evaluate_measure と同様の API に従って、メジャーを資産として追加します。

ds.add_powerbi_measure_asset(
    "Total Units Asset",
    measure="TotalUnits",
    groupby_columns=["Time[FiscalYear]", "Time[FiscalMonth]"]
)

DAX

独自のメジャーを定義する場合や、特定の行をより細かく制御したい場合は、カスタム DAX クエリを使用して DAX 資産を追加できます。 ここでは、2 つの既存のメジャーを分割して、Total Units Ratio メジャーを定義します。

ds.add_powerbi_dax_asset(
    "Total Units YoY Asset",
    dax_string=
    """
    EVALUATE SUMMARIZECOLUMNS(
        'Time'[FiscalYear],
        'Time'[FiscalMonth],
        "Total Units Ratio", DIVIDE([Total Units This Year], [Total Units Last Year])
    )    
    """
)

DMV クエリ

場合によっては、データ検証プロセスの一部として、動的管理ビュー (DMV) 計算を使用すると便利な場合があります。 たとえば、データセット内の参照整合性違反の数を追跡できます。 詳細については、「Clean data = faster reports」を参照してください。

ds.add_powerbi_dax_asset(
    "Referential Integrity Violation",
    dax_string=
    """
    SELECT
        [Database_name],
        [Dimension_Name],
        [RIVIOLATION_COUNT]
    FROM $SYSTEM.DISCOVER_STORAGE_TABLES
    """
)

期待される回答

アセットに特定の制約を追加するには、まず、Expectation Suite を構成する必要があります。 各スイートに個別の期待を追加した後、新しいスイートで最初に設定されたデータ コンテキストを更新できます。 利用可能な期待の完全な一覧については、GX 期待ギャラリーを参照してください。

まず、次の 2 つの期待を持つ "Retail Store Suite" を追加します。

  • 有効な郵便番号
  • 行数が 80 から 200 のテーブル
suite_store = context.add_expectation_suite("Retail Store Suite")

suite_store.add_expectation(ExpectationConfiguration("expect_column_values_to_be_valid_zip5", { "column": "PostalCode" }))
suite_store.add_expectation(ExpectationConfiguration("expect_table_row_count_to_be_between", { "min_value": 80, "max_value": 200 }))

context.add_or_update_expectation_suite(expectation_suite=suite_store)

TotalUnits メジャー

1 つの期待を持つ "Retail Measure Suite" を追加します。

  • 列の値は 50,000 をより大きくする必要があります
suite_measure = context.add_expectation_suite("Retail Measure Suite")
suite_measure.add_expectation(ExpectationConfiguration(
    "expect_column_values_to_be_between", 
    {
        "column": "TotalUnits",
        "min_value": 50000
    }
))

context.add_or_update_expectation_suite(expectation_suite=suite_measure)

Total Units Ratio DAX

1 つの期待を持つ "Retail DAX Suite" を追加します。

  • Total Units Ratio の列の値は 0.8 から 1.5 にする必要があります
suite_dax = context.add_expectation_suite("Retail DAX Suite")
suite_dax.add_expectation(ExpectationConfiguration(
    "expect_column_values_to_be_between", 
    {
        "column": "[Total Units Ratio]",
        "min_value": 0.8,
        "max_value": 1.5
    }
))

context.add_or_update_expectation_suite(expectation_suite=suite_dax)

参照整合性違反 (DMV)

1 つの期待を持つ "Retail DMV Suite" を追加します。

  • RIVIOLATION_COUNTは 0 にする必要があります
suite_dmv = context.add_expectation_suite("Retail DMV Suite")
# There should be no RI violations
suite_dmv.add_expectation(ExpectationConfiguration(
    "expect_column_values_to_be_in_set", 
    {
        "column": "RIVIOLATION_COUNT",
        "value_set": [0]
    }
))
context.add_or_update_expectation_suite(expectation_suite=suite_dmv)

検証

データに対して指定された期待を実際に実行するには、最初にチェックポイントを作成し、コンテキストに追加します。 チェックポイントの構成の詳細については、「データ検証ワークフロー」を参照してください。

checkpoint_config = {
    "name": f"Retail Analysis Checkpoint",
    "validations": [
        {
            "expectation_suite_name": "Retail Store Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Store Asset",
            },
        },
        {
            "expectation_suite_name": "Retail Measure Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Total Units Asset",
            },
        },
        {
            "expectation_suite_name": "Retail DAX Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Total Units YoY Asset",
            },
        },
        {
            "expectation_suite_name": "Retail DMV Suite",
            "batch_request": {
                "datasource_name": "Retail Analysis Data Source",
                "data_asset_name": "Referential Integrity Violation",
            },
        },
    ],
}
checkpoint = context.add_checkpoint(
    **checkpoint_config
)

次に、チェックポイントを実行し、単純な書式設定のために Pandas DataFrame として結果を抽出します。

result = checkpoint.run()

結果を処理して印刷します。

import pandas as pd

data = []

for run_result in result.run_results:
    for validation_result in result.run_results[run_result]["validation_result"]["results"]:
        row = {
            "Batch ID": run_result.batch_identifier,
            "type": validation_result.expectation_config.expectation_type,
            "success": validation_result.success
        }

        row.update(dict(validation_result.result))
        
        data.append(row)

result_df = pd.DataFrame.from_records(data)    

result_df[["Batch ID", "type", "success", "element_count", "unexpected_count", "partial_unexpected_list"]]

検証結果をテーブルに示します。

これらの結果から、カスタム DAX クエリで定義した "Total Units YoY Asset" を除き、すべての期待が検証に合格したことがわかります。

診断

セマンティック リンクを使用すると、ソース データをフェッチして、どの正確な年が範囲外であるかを把握できます。 セマンティック リンクは、DAX クエリを実行するためのインライン マジックを提供します。 セマンティック リンクを使用して、GX データ資産に渡したのと同じクエリを実行し、結果の値を視覚化します。

%%dax "Retail Analysis Sample PBIX"

EVALUATE SUMMARIZECOLUMNS(
    'Time'[FiscalYear],
    'Time'[FiscalMonth],
    "Total Units Ratio", DIVIDE([Total Units This Year], [Total Units Last Year])
)

DAX クエリの概要作成の結果をテーブルに示します。

これらの結果を DataFrame に保存します。

df = _

結果をプロットします。

import matplotlib.pyplot as plt

df["Total Units % Change YoY"] = (df["[Total Units Ratio]"] - 1)

df.set_index(["Time[FiscalYear]", "Time[FiscalMonth]"]).plot.bar(y="Total Units % Change YoY")

plt.axhline(0)

plt.axhline(-0.2, color="red", linestyle="dotted")
plt.axhline( 0.5, color="red", linestyle="dotted")

None

プロットには、DAX クエリの概要作成の結果が表示されます。

プロットから、4 月と 7 月が少し範囲外だったことがわかります。その後、さらに調査の手順を実行できます。

GX 構成の格納

データセット内のデータが時間の経過と同時に変化するにつれて、先ほど実行した GX 検証を再実行することが必要になる場合があります。 現在、データ コンテキスト (接続されたデータ資産、期待スイート、チェックポイントを含む) は一時的に存続しますが、将来使用するためにファイル コンテキストに変換できます。 または、ファイル コンテキストをインスタンス化することもできます (「データ コンテキストのインスタンス化」を参照)。

context = context.convert_to_file_context()

コンテキストを保存したので、gx ディレクトリをレイクハウスにコピーします。

重要

このセルは、ノートブックにレイクハウスを追加したことを前提としています。 レイクハウスがアタッチされていない場合、エラーは表示されませんが、後でコンテキストを取得することもできなくなります。 ここでレイクハウスを追加すると、カーネルが再起動するため、この時点に戻るにはノートブック全体を再実行する必要があります。

# copy GX directory to attached lakehouse
!cp -r gx/ /lakehouse/default/Files/gx

このチュートリアルのすべての構成を使用するために、context = gx.get_context(project_root_dir="<your path here>") を使用して将来のコンテキストを作成できるようになりました。

たとえば、新しいノートブックで、同じレイクハウスをアタッチし、context = gx.get_context(project_root_dir="/lakehouse/default/Files/gx") を使用してコンテキストを取得します。

セマンティック リンク や SemPy については、他のチュートリアルを確認してください。