チュートリアル パート 4: バッチ スコアリングを実行し、予測をレイクハウスに保存する
このチュートリアルでは、Microsoft Fabric MLflow モデル レジストリを使用して、パート 3 でトレーニングした登録済みの LightGBMClassifier モデルをインポートし、レイクハウスから読み込まれたテスト データセットに対してバッチ予測を実行する方法について説明します。
Microsoft Fabric を使用すると、ユーザーは PREDICT と呼ばれるスケーラブルな関数を使用して機械学習モデルを運用化できます。これは、任意のコンピューティング エンジンでのバッチ スコアリングをサポートします。 ユーザーは、Microsoft Fabric ノートブックまたは特定のモデルの項目ページから直接バッチ予測を生成できます。 PREDICT について説明します。
テスト データセットでバッチ予測を生成するには、すべてのトレーニング済み機械学習モデルの中で最高のパフォーマンスを示したトレーニング済み LightGBM モデルのバージョン 1 を使用します。 テスト データセットを Spark DataFrame に読み込み、バッチ予測を生成するために MLFlowTransformer オブジェクトを作成します。 次に、次の 3 つの方法のいずれかを使用して、PREDICT 関数を呼び出します。
- SynapseML のトランスフォーマー API
- Spark SQL API
- PySpark ユーザー定義関数 (UDF)
前提条件
Microsoft Fabric サブスクリプションを取得します。 または、無料の Microsoft Fabric 試用版にサインアップします。
Microsoft Fabric にサインインします。
ホーム ページの左側にあるエクスペリエンス スイッチャー アイコンを使って、Synapse Data Science エクスペリエンスに切り替えます。
これは、5 部構成チュートリアル シリーズの第 4 部です。 このチュートリアルを完了するには、最初に次の手順を完了します。
- パート 1: Apache Spark を使用して Microsoft Fabric レイクハウスにデータを取り込む。
- パート 2: Microsoft Fabric ノートブックを使用してデータを探索して視覚化し、データの詳細を確認する。
- パート 3: 機械学習モデルをトレーニングして登録する。
ノートブックで作業を進める
4-predict.ipynb は、このチュートリアルに付属するノートブックです。
このチュートリアルに付随するノートブックを開くには、「データ サイエンス用にシステムを準備する」チュートリアル の手順に従い、ノートブックをお使いのワークスペースにインポートします。
このページからコードをコピーして貼り付ける場合は、新しいノートブックを作成することができます。
コードの実行を開始する前に、必ずレイクハウスをノートブックにアタッチしてください。
重要
このシリーズの他の部分で使用したのと同じレイクハウスを接続します。
テストデータの読み込み
パート 3 で保存したテスト データを読み込みます。
df_test = spark.read.format("delta").load("Tables/df_test")
display(df_test)
Transformer API を使用した PREDICT
SynapseML の トランスフォーマー API を使用するには、最初に MLFlowTransformer オブジェクトを作成する必要があります。
MLFlowTransformer オブジェクトのインスタンス化
MLFlowTransformer オブジェクトは、パート 3 で登録した MLFlow モデルのラッパーです。 これにより、指定された DataFrame でバッチ予測を生成できるようになります。 MLFlowTransformer オブジェクトをインスタンス化するには、次のパラメーターを指定する必要があります。
- モデルに対する入力として必要になるテスト DataFrame からの列 (この場合、すべての列が必要です)。
- 新しい出力列の名前 (この場合は予測値)。
- 予測を生成するための正しいモデル名とモデル バージョン (この場合は
lgbm_sm
とバージョン 1)。
from synapse.ml.predict import MLFlowTransformer
model = MLFlowTransformer(
inputCols=list(df_test.columns),
outputCol='predictions',
modelName='lgbm_sm',
modelVersion=1
)
これで MLFlowTransformer オブジェクトを入手したので、それを使用してバッチ予測を生成できます。
import pandas
predictions = model.transform(df_test)
display(predictions)
Spark SQL API を使用した PREDICT
次のコードでは、Spark SQL API を使用して PREDICT 関数を呼び出します。
from pyspark.ml.feature import SQLTransformer
# Substitute "model_name", "model_version", and "features" below with values for your own model name, model version, and feature columns
model_name = 'lgbm_sm'
model_version = 1
features = df_test.columns
sqlt = SQLTransformer().setStatement(
f"SELECT PREDICT('{model_name}/{model_version}', {','.join(features)}) as predictions FROM __THIS__")
# Substitute "X_test" below with your own test dataset
display(sqlt.transform(df_test))
ユーザー定義関数を使用した PREDICT (UDF)
次のコードでは、PySpark UDF を使用して PREDICT 関数を呼び出します。
from pyspark.sql.functions import col, pandas_udf, udf, lit
# Substitute "model" and "features" below with values for your own model name and feature columns
my_udf = model.to_udf()
features = df_test.columns
display(df_test.withColumn("predictions", my_udf(*[col(f) for f in features])))
モデルのアイテム ページから PREDICT コードも生成できることに注意してください。 PREDICT について説明します。
モデル予測結果をレイクハウスに書き込む
バッチ予測を生成したら、モデル予測結果をレイクハウスに書き戻します。
# Save predictions to lakehouse to be used for generating a Power BI report
table_name = "customer_churn_test_predictions"
predictions.write.format('delta').mode("overwrite").save(f"Tables/{table_name}")
print(f"Spark DataFrame saved to delta table: {table_name}")
次のステップ
次に進みます。