チュートリアル 3: 現行の再実体化を有効にしてバッチ推論を実行する

このチュートリアル シリーズでは、プロトタイプ作成、トレーニング、運用という機械学習ライフサイクルのすべてのフェーズを、特徴量によってシームレスに統合する方法について説明します。

最初のチュートリアルでは、カスタム変換を使用して特徴セットの仕様を作成し、その特徴セットを使用してトレーニング データを生成し、実体化を有効にし、バックフィルを実行する方法を示しました。 2 番目のチュートリアルでは、実体化を有効にし、バックフィルを実行する方法を示しました。 また、モデルのパフォーマンスを向上させる方法として、特徴を試す方法についても説明しました。

このチュートリアルでは、次の方法について説明します。

  • transactions 特徴量セットの反復する具体化を有効にします。
  • 登録済みモデルでバッチ推論パイプラインを実行します。

前提条件

このチュートリアルに進む前に、シリーズの最初と 2 番目のチュートリアルを完了してください。

設定

  1. Azure Machine Learning Spark ノートブックを構成します。

    このチュートリアルを実行するために、新しいノートブックを作成し、順を追って手順を実行できます。 次のような名前の既存のノートブックを開いて実行することもできます。3.Enable recurrent materialization and run batch inference。 そのノートブックと、このシリーズのすべてのノートブックは、"featurestore_sample/notebooks" ディレクトリにあります。 "sdk_only" または "sdk_and_cli" を選択できます。 このチュートリアルは開いたままにしておき、ドキュメントのリンクや追加の説明を参照してください。

    1. 上部のナビゲーションの コンピューティング ドロップダウン リストで、Azure Machine Learning サーバーレス Spark サーバーレス Spark コンピューティング を選択します。

    2. セッションを構成するには、以下を行います。

      1. 上部のステータス バーで [セッションの構成] を選びます。
      2. Python パッケージ タブを選択します。
      3. [Conda ファイルのアップロード] を選びます
      4. ローカル コンピューターから azureml-examples/sdk/python/featurestore-sample/project/env/online.yml ファイルを選択します。
      5. オプションで、前提条件が頻繁に再実行されないように、セッション タイムアウト (アイドル時間) を増やします。
  2. Spark セッションを開始します。

    # run this cell to start the spark session (any code block will start the session ). This can take around 10 mins.
    print("start spark session")
  3. サンプルのルート ディレクトリを設定します。

    import os
    
    # please update the dir to ./Users/<your_user_alias> (or any custom directory you uploaded the samples to).
    # You can find the name from the directory structure in the left nav
    root_dir = "./Users/<your_user_alias>/featurestore_sample"
    
    if os.path.isdir(root_dir):
        print("The folder exists.")
    else:
        print("The folder does not exist. Please create or fix the path")
  4. CLI を設定します。

    該当なし。


  1. プロジェクト ワークスペースの CRUD (作成、読み取り、更新、および削除) クライアントを初期化します。

    チュートリアル ノートブックは、この現在のワークスペースから実行されます。

    ### Initialize the MLClient of this project workspace
    import os
    from azure.ai.ml import MLClient
    from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
    
    project_ws_sub_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
    project_ws_rg = os.environ["AZUREML_ARM_RESOURCEGROUP"]
    project_ws_name = os.environ["AZUREML_ARM_WORKSPACE_NAME"]
    
    # connect to the project workspace
    ws_client = MLClient(
        AzureMLOnBehalfOfCredential(), project_ws_sub_id, project_ws_rg, project_ws_name
    )
  2. 特徴量ストア変数を初期化します。

    1 つめのチュートリアルで作成した内容を反映するように、featurestore_name の値を必ず更新してください。

    from azure.ai.ml import MLClient
    from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
    
    # feature store
    featurestore_name = (
        "<FEATURESTORE_NAME>"  # use the same name from part #1 of the tutorial
    )
    featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
    featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]
    
    # feature store ml client
    fs_client = MLClient(
        AzureMLOnBehalfOfCredential(),
        featurestore_subscription_id,
        featurestore_resource_group_name,
        featurestore_name,
    )
  3. Feature Store SDK クライアントを初期化します。

    # feature store client
    from azureml.featurestore import FeatureStoreClient
    from azure.ai.ml.identity import AzureMLOnBehalfOfCredential
    
    featurestore = FeatureStoreClient(
        credential=AzureMLOnBehalfOfCredential(),
        subscription_id=featurestore_subscription_id,
        resource_group_name=featurestore_resource_group_name,
        name=featurestore_name,
    )

トランザクション特徴量セットで反復する具体化を有効にする

2 つめのチュートリアルでは、具体化を有効にし、transactions 特徴量セットに対してバックフィルを実行しました。 バックフィルは、特徴量値を計算して具体化ストアに配置するオンデマンドの 1 回限りの操作です。

運用環境でモデルの推論を処理するために、反復する具体化ジョブを設定して、具体化ストアを最新の状態に保つことができます。 これらのジョブは、ユーザー定義のスケジュールで実行されます。 繰り返しジョブのスケジュールは次のように機能します。

  • 間隔と頻度の値を指定して、時間枠を定義します。 たとえば、次の値は 3 時間の時間枠を定義します。

    • interval = 3
    • frequency = Hour
  • 最初の時間枠は、RecurrenceTrigger に定義されている start_time 値で始まり、以降同様です。

  • 最初の繰り返しジョブは、更新時刻の後、次の時間枠の開始時に送信されます。

  • 以降の反復するジョブは、最初のジョブの後、時間枠ごとに送信されます。

これまでのチュートリアルで説明したように、データが具体化されると (バックフィルまたは反復する具体化)、特徴量取得は、具体化されたデータを既定で使用します。

from datetime import datetime
from azure.ai.ml.entities import RecurrenceTrigger

transactions_fset_config = fs_client.feature_sets.get(name="transactions", version="1")

# create a schedule that runs the materialization job every 3 hours
transactions_fset_config.materialization_settings.schedule = RecurrenceTrigger(
    interval=3, frequency="Hour", start_time=datetime(2023, 4, 15, 0, 4, 10, 0)
)

fs_poller = fs_client.feature_sets.begin_create_or_update(transactions_fset_config)

print(fs_poller.result())

(省略可能) 特徴量セット資産の YAML ファイルを保存する

更新された設定を使用して YAML ファイルを保存します。

## uncomment and run
# transactions_fset_config.dump(root_dir + "/featurestore/featuresets/transactions/featureset_asset_offline_enabled_with_schedule.yaml")

バッチ推論パイプラインを実行する

バッチ推論には、次の手順があります。

  1. トレーニング パイプラインで使用したのと同じ組み込みの特徴量取得コンポーネント (3 つめのチュートリアルで説明済み) を使用します。 パイプライン トレーニングでは、コンポーネント入力として特徴量取得仕様を指定しました。 バッチ推論では、登録済みモデルを入力として渡します。 コンポーネントは、モデル アーティファクトで特徴量取得仕様を検索します。

    さらに、トレーニングでは、観測データにターゲット変数が含まれました。 一方、バッチ推論の観測データにターゲット変数は含まれません。 特徴量取得手順では、観測データを特徴量と結合し、バッチ推論用のデータを出力します。

  2. パイプラインは、前の手順のバッチ推論入力データを使用して、モデルで推論を実行し、予測値を出力として追加します。

    Note

    この例では、バッチ推論にジョブを使用しています。 Azure Machine Learning でバッチ エンドポイントを使用することもできます。

    from azure.ai.ml import load_job  # will be used later
    
    # set the batch inference  pipeline path
    batch_inference_pipeline_path = (
        root_dir + "/project/fraud_model/pipelines/batch_inference_pipeline.yaml"
    )
    batch_inference_pipeline_definition = load_job(source=batch_inference_pipeline_path)
    
    # run the training pipeline
    batch_inference_pipeline_job = ws_client.jobs.create_or_update(
        batch_inference_pipeline_definition
    )
    
    # stream the run logs
    ws_client.jobs.stream(batch_inference_pipeline_job.name)

バッチ推論用の出力データを検査する

パイプライン ビューで次のようにします。

  1. outputs カードで inference_step を選択します。

  2. Data フィールド値をコピーします。 これは、azureml_995abbc2-3171-461e-8214-c3c5d17ede83_output_data_data_with_prediction:1 のように表示されます。

  3. Data フィールド値を、名前とバージョン値を分けて、次のセルに貼り付けます。 前にコロン (:) が付いた最後の文字がバージョンです。

  4. バッチ推論パイプラインによって生成された predict_is_fraud 列を書き留めます。

    inference_stepoutputsname または version の値を指定しなかったため、バッチ推論パイプライン ("/project/fraud_mode/pipelines/batch_inference_pipeline.yaml") 出力に、GUID を名前の値、バージョンを 1 にして、追跡されないデータ資産をシステムが作成しました。 このセルに、資産からデータ パスを派生させて表示します。

    inf_data_output = ws_client.data.get(
        name="azureml_1c106662-aa5e-4354-b5f9-57c1b0fdb3a7_output_data_data_with_prediction",
        version="1",
    )
    inf_output_df = spark.read.parquet(inf_data_output.path + "data/*.parquet")
    display(inf_output_df.head(5))

クリーンアップ

このシリーズの 5 番目のチュートリアルでは、リソースを削除する方法について説明します。

次のステップ