チュートリアル 5: カスタム ソースを使用して特徴量セットを開発する

Azure Machine Learning のマネージド Feature Store を使うと、特徴量の検出、作成、運用化を行うことができます。 特徴量は、さまざまな特徴量を実験するプロトタイプ作成フェーズから始まり、機械学習ライフサイクルの結合組織として機能します。 このライフサイクルはモデルをデプロイする運用化フェーズに進み、推論のステップで特徴量のデータについて調べます。 Feature Store の詳細については、「Feature Store の概念」を参照してください。

このチュートリアル シリーズのパート 1 では、カスタム変換を使用して特徴量セットの仕様を作成し、具体化を有効にし、バックフィルを実行する方法を示しました。 パート 2 では、実験とトレーニングのフローで特徴量を実験する方法を示しました。 パート 3 では、transactions 特徴量セットの定期的な具体化について説明し、登録されたモデルでバッチ推論パイプラインを実行する方法を示しました。 パート 4 では、バッチ推論を実行する方法について説明しました。

このチュートリアルでは、次のことについて説明します。

  • カスタム データ ソースからデータを読み込むロジックを定義します。
  • このカスタム データ ソースから使用する特徴量セットを構成して登録します。
  • 登録済みの特徴量セットをテストします。

前提条件

Note

このチュートリアルでは、サーバーレス Spark コンピューティングを搭載した Azure Machine Learning ノートブックを使用します。

  • このシリーズのこれまでのチュートリアルを完了していることを確認してください。 このチュートリアルでは、前のチュートリアルで作成した特徴量ストアと他のリソースを再利用します。

設定

このチュートリアルでは、Python Feature Store core SDK (azureml-featurestore) を使用します。 Python SDK は、Feature Store、特徴量セット、Feature Store エンティティの作成、読み取り、更新、削除 (CRUD) 操作に使用されます。

このチュートリアルでは、これらのリソースを明示的にインストールする必要はありません。ここに示すセットアップ手順では、conda.yml ファイルにこれらのリソースが含まれています。

Azure Machine Learning Spark ノートブックを構成する

新しいノートブックを作成し、順を追ってこのチュートリアルの手順を実行できます。 また既存のノートブック featurestore_sample/notebooks/sdk_only/5.Develop-feature-set-custom-source.ipynb を開いて実行することもできます。 このチュートリアルは開いたままにしておき、ドキュメントのリンクや追加の説明を参照してください。

  1. 上部のメニューの [Compute] (コンピューティング) ドロップダウン リストで、[Azure Machine Learning Serverless Spark] (Azure Machine Learning サーバーレス Spark) の下の [Serverless Spark Compute] (サーバーレス Spark コンピューティング) を選択します。

  2. セッションを構成するには、以下を行います。

    1. 上部のステータス バーで [セッションの構成] を選びます。
    2. [Python パッケージ] タブを選びます。
    3. [Conda ファイルのアップロード] を選びます。
    4. 1 つめのチュートリアルでアップロードした "conda.yml" ファイルをアップロードします。
    5. オプションで、前提条件が頻繁に再実行されないように、セッション タイムアウト (アイドル時間) を増やします。

サンプル用のルート ディレクトリを設定する

このコード セルによって、サンプル用のルート ディレクトリを設定します。 すべての依存関係をインストールして Spark セッションを開始するには、約 10 分かかります。

import os

# Please update the dir to ./Users/{your_user_alias} (or any custom directory you uploaded the samples to).
# You can find the name from the directory structure in the left navigation panel.
root_dir = "./Users/<your_user_alias>/featurestore_sample"

if os.path.isdir(root_dir):
    print("The folder exists.")
else:
    print("The folder does not exist. Please create or fix the path")

特徴量ストア ワークスペースの CRUD クライアントを初期化する

特徴量ストア ワークスペースの作成、読み取り、更新、削除 (CRUD) 操作に対応するために、特徴量ストア ワークスペースの MLClient を初期化します。

from azure.ai.ml import MLClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

# Feature store
featurestore_name = (
    "<FEATURESTORE_NAME>"  # use the same name that was used in the tutorial #1
)
featurestore_subscription_id = os.environ["AZUREML_ARM_SUBSCRIPTION"]
featurestore_resource_group_name = os.environ["AZUREML_ARM_RESOURCEGROUP"]

# Feature store ml client
fs_client = MLClient(
    AzureMLOnBehalfOfCredential(),
    featurestore_subscription_id,
    featurestore_resource_group_name,
    featurestore_name,
)

Feature Store コア SDK クライアントを初期化する

前述のように、このチュートリアルでは Python Feature Store core SDK (azureml-featurestore) を使用します。 この初期化された SDK クライアントは、特徴量ストア、特徴量セット、特徴量ストア エンティティの作成、読み取り、更新、削除 (CRUD) 操作に対応します。

from azureml.featurestore import FeatureStoreClient
from azure.ai.ml.identity import AzureMLOnBehalfOfCredential

featurestore = FeatureStoreClient(
    credential=AzureMLOnBehalfOfCredential(),
    subscription_id=featurestore_subscription_id,
    resource_group_name=featurestore_resource_group_name,
    name=featurestore_name,
)

カスタム ソース定義

カスタム ソース定義が含まれる任意のデータ ストレージに基づいて独自のソース読み込みロジックを定義できます。 この特徴量を使用するには、ソース プロセッサのユーザー定義関数 (UDF) クラス (このチュートリアルの CustomSourceTransformer) を実装します。 このクラスでは、__init__(self, **kwargs) 関数と process(self, start_time, end_time, **kwargs) 関数を定義する必要があります。 kwargs 辞書が、特徴量セット仕様定義の一部として提供されています。 その後、この定義が UDF に渡されます。 start_time および end_time パラメーターが計算され、UDF 関数に渡されます。

ソース プロセッサの UDF クラスのサンプル コードを次に示します。

from datetime import datetime

class CustomSourceTransformer:
    def __init__(self, **kwargs):
        self.path = kwargs.get("source_path")
        self.timestamp_column_name = kwargs.get("timestamp_column_name")
        if not self.path:
            raise Exception("`source_path` is not provided")
        if not self.timestamp_column_name:
            raise Exception("`timestamp_column_name` is not provided")

    def process(
        self, start_time: datetime, end_time: datetime, **kwargs
    ) -> "pyspark.sql.DataFrame":
        from pyspark.sql import SparkSession
        from pyspark.sql.functions import col, lit, to_timestamp

        spark = SparkSession.builder.getOrCreate()
        df = spark.read.json(self.path)

        if start_time:
            df = df.filter(col(self.timestamp_column_name) >= to_timestamp(lit(start_time)))

        if end_time:
            df = df.filter(col(self.timestamp_column_name) < to_timestamp(lit(end_time)))

        return df

カスタム ソースを使用して特徴量セット仕様を作成し、ローカルで実験する

次に、カスタム ソース定義を使用して特徴量セット仕様を作成し、それを開発環境で使用して、特徴量セットを実験します。 サーバーレス Spark コンピューティングにアタッチされたチュートリアル ノートブックは、開発環境として機能します。

from azureml.featurestore import create_feature_set_spec
from azureml.featurestore.feature_source import CustomFeatureSource
from azureml.featurestore.contracts import (
    SourceProcessCode,
    TransformationCode,
    Column,
    ColumnType,
    DateTimeOffset,
    TimestampColumn,
)

transactions_source_process_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/source_process_code"
)
transactions_feature_transform_code_path = (
    root_dir
    + "/featurestore/featuresets/transactions_custom_source/feature_process_code"
)

udf_featureset_spec = create_feature_set_spec(
    source=CustomFeatureSource(
        kwargs={
            "source_path": "wasbs://data@azuremlexampledata.blob.core.windows.net/feature-store-prp/datasources/transactions-source-json/*.json",
            "timestamp_column_name": "timestamp",
        },
        timestamp_column=TimestampColumn(name="timestamp"),
        source_delay=DateTimeOffset(days=0, hours=0, minutes=20),
        source_process_code=SourceProcessCode(
            path=transactions_source_process_code_path,
            process_class="source_process.CustomSourceTransformer",
        ),
    ),
    feature_transformation=TransformationCode(
        path=transactions_feature_transform_code_path,
        transformer_class="transaction_transform.TransactionFeatureTransformer",
    ),
    index_columns=[Column(name="accountID", type=ColumnType.string)],
    source_lookback=DateTimeOffset(days=7, hours=0, minutes=0),
    temporal_join_lookback=DateTimeOffset(days=1, hours=0, minutes=0),
    infer_schema=True,
)

udf_featureset_spec

その後、特徴ウィンドウを定義し、この特徴ウィンドウの特徴量値を表示します。

from datetime import datetime

st = datetime(2023, 1, 1)
et = datetime(2023, 6, 1)

display(
    udf_featureset_spec.to_spark_dataframe(
        feature_window_start_date_time=st, feature_window_end_date_time=et
    )
)

特徴量セット仕様としてエクスポートする

特徴量セット仕様を特徴量ストアに登録するには、まずその仕様を特定の形式で保存します。 生成された transactions_custom_source 特徴量セット仕様を確認します。 次のようにファイル ツリーからこのファイルを開き、仕様を確認します: featurestore/featuresets/transactions_custom_source/spec/FeaturesetSpec.yaml

仕様には、次の要素が含まれています。

  • features: 特徴量とそのデータ型の一覧。
  • index_columns: 特徴量セットから値にアクセスするために必要な結合キー。

仕様の詳細については、「マネージド Feature Store の最上位エンティティについて」および「CLI (v2) 特徴量セット YAML スキーマ」を参照してください。

特徴量セット仕様の永続化には、特徴量セット仕様をソース管理できるというもう 1 つの利点があります。

feature_spec_folder = (
    root_dir + "/featurestore/featuresets/transactions_custom_source/spec"
)

udf_featureset_spec.dump(feature_spec_folder)

トランザクション特徴量セットを Feature Store に登録する

このコードを使用して、カスタム ソースから読み込んだ特徴量セット資産を特徴量ストアに登録します。 その後、その資産を再利用し、簡単に共有できます。 特徴量セット資産の登録では、バージョン管理や具体化などの管理機能が提供されます。

from azure.ai.ml.entities import FeatureSet, FeatureSetSpecification

transaction_fset_config = FeatureSet(
    name="transactions_custom_source",
    version="1",
    description="transactions feature set loaded from custom source",
    entities=["azureml:account:1"],
    stage="Development",
    specification=FeatureSetSpecification(path=feature_spec_folder),
    tags={"data_type": "nonPII"},
)

poller = fs_client.feature_sets.begin_create_or_update(transaction_fset_config)
print(poller.result())

登録済みの特徴量セットを取得し、関連情報を出力します。

# Look up the feature set by providing name and version
transactions_fset_config = featurestore.feature_sets.get(
    name="transactions_custom_source", version="1"
)
# Print feature set information
print(transactions_fset_config)

登録済みの特徴量セットからの特徴量生成をテストする

特徴量セットの to_spark_dataframe() 関数を使用して、登録済みの特徴量セットからの特徴量生成をテストし、特徴量を表示します。 print-txn-fset-sample-values

df = transactions_fset_config.to_spark_dataframe()
display(df)

登録済みの特徴量セットを Spark データフレームとして正常に取り込み、表示できる必要があります。 これらの特徴量を、観測データとのポイントインタイム結合と、機械学習パイプラインの後続のステップに使用できるようになりました。

クリーンアップ

チュートリアル用にリソース グループを作成した場合は、そのリソース グループを削除して、このチュートリアルに関連付けられているすべてのリソースを削除できます。 そうでない場合は、リソースを個別に削除できます。

  • 特徴量ストアを削除するには、Azure portal でリソース グループを開き、その特徴量ストアを選択して削除します。
  • 特徴量ストア ワークスペースに割り当てられているユーザー割り当てマネージド ID (UAI) は、特徴量ストアを削除しても削除されません。 UAI を削除するには、こちらの手順に従います。
  • ストレージ アカウント タイプのオフライン ストアを削除するには、Azure portal でリソース グループを開き、作成したストレージを選択して削除します。
  • Azure Cache for Redis インスタンスを削除するには、Azure portal でリソース グループを開き、作成したインスタンスを選択して削除します。

次のステップ