Python ユーザー定義関数を使ってオンデマンドで特徴量を計算する

この記事では、Azure Databricks でオンデマンド特徴量を作成して使用する方法について説明します。

オンデマンド特徴量を使うには、Unity Catalog に対してワークスペースが有効になっている必要があり、Databricks Runtime 13.3 LTS ML 以降を使う必要があります。

オンデマンド特徴量とは

"オンデマンド" とは、値が事前にはわからず、推論時に計算される特徴量のことを指します。 Azure Databricks では、Python ユーザー定義関数 (UDF) を使って、オンデマンド特徴量の計算方法を指定します。 これらの関数は Unity Catalog によって管理され、Catalog Explorer を使って確認できます。

要件

  • ユーザー定義関数 (UDF) を使用してトレーニング セットを作成したり、機能提供エンドポイントを作成したりするには、Unity カタログの system カタログに対する USE CATALOG 権限が必要です。

ワークフロー

オンデマンドで特徴量を計算するには、特徴量の値の計算方法を指示する Python ユーザー定義関数 (UDF) を指定します。

  • トレーニング中に、create_training_set API の feature_lookups パラメーターで、この関数とその入力バインドを指定します。
  • トレーニング済みのモデルは、Feature Store メソッド log_model を使ってログする必要があります。 これにより、推論に使われるときにモデルによってオンデマンド特徴量が自動的に評価されるようになります。
  • バッチ スコアリングの場合、score_batch API によって、オンデマンド特徴量を含むすべての特徴量の値が自動的に計算され、返されます。
  • Mosaic AI Model Serving を使ってモデルを提供する場合、モデルは自動的に Python UDF を使って、スコアリング要求ごとにオンデマンド特徴量を計算します。

Python UDF を作成する

Python UDF は、ノートブックまたは Databricks SQL で作成できます。

たとえば、ノートブック セルで次のコードを実行すると、カタログ main とスキーマ default に Python UDF example_feature が作成されます。

%sql
CREATE FUNCTION main.default.example_feature(x INT, y INT)
RETURNS INT
LANGUAGE PYTHON
COMMENT 'add two numbers'
AS $$
def add_numbers(n1: int, n2: int) -> int:
  return n1 + n2

return add_numbers(x, y)
$$

コードを実行した後は、Catalog Explorer で 3 レベルの名前空間内を移動して、関数定義を表示できます。

カタログ エクスプローラーの関数

Python UDF の作成について詳しくは、「Python UDF を Unity Catalog に登録する」と SQL 言語マニュアルを参照してください。

欠損特徴量値を処理する方法

Python UDF が FeatureLookup の結果に依存している場合、要求された参照キーが見つからないときに返される値は環境によって異なります。 score_batch を使用している場合、返される値は None です。 オンライン サービスを使用している場合、返される値は float("nan") です。

次のコードは、両方の事例を処理する方法の例です。

%sql
CREATE OR REPLACE FUNCTION square(x INT)
RETURNS INT
LANGUAGE PYTHON AS
$$
import numpy as np
if x is None or np.isnan(x):
  return 0
return x * x
$$

オンデマンド特徴量を使ってモデルをトレーニングする

モデルをトレーニングするには、FeatureFunction を使います。これは create_training_set API に feature_lookups パラメーターで渡されます。

次のコード例では、前のセクションで定義した Python UDF main.default.example_feature を使用しています。

# Install databricks-feature-engineering first with:
# %pip install databricks-feature-engineering
# dbutils.library.restartPython()

from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering import FeatureFunction, FeatureLookup
from sklearn import linear_model

fe = FeatureEngineeringClient()

features = [
  # The feature 'on_demand_feature' is computed as the sum of the the input value 'new_source_input'
  # and the pre-materialized feature 'materialized_feature_value'.
  # - 'new_source_input' must be included in base_df and also provided at inference time.
  #   - For batch inference, it must be included in the DataFrame passed to 'FeatureEngineeringClient.score_batch'.
  #   - For real-time inference, it must be included in the request.
  # - 'materialized_feature_value' is looked up from a feature table.

  FeatureFunction(
      udf_name="main.default.example_feature",    # UDF must be in Unity Catalog so uses a three-level namespace
      input_bindings={
        "x": "new_source_input",
        "y": "materialized_feature_value"
      },
      output_name="on_demand_feature",
  ),
  # retrieve the prematerialized feature
  FeatureLookup(
    table_name = 'main.default.table',
    feature_names = ['materialized_feature_value'],
    lookup_key = 'id'
  )
]

# base_df includes the columns 'id', 'new_source_input', and 'label'
training_set = fe.create_training_set(
  df=base_df,
  feature_lookups=features,
  label='label',
  exclude_columns=['id', 'new_source_input', 'materialized_feature_value']     # drop the columns not used for training
)

# The training set contains the columns 'on_demand_feature' and 'label'.
training_df = training_set.load_df().toPandas()

# training_df columns ['materialized_feature_value', 'label']
X_train = training_df.drop(['label'], axis=1)
y_train = training_df.label

model = linear_model.LinearRegression().fit(X_train, y_train)

モデルをログし、Unity Catalog に登録する

特徴量のメタデータと共にパッケージ化されたモデルを Unity Catalog に登録することができます。 モデルの作成に使われる特徴テーブルは、Unity Catalog に格納する必要があります。

推論に使われるときにモデルによってオンデマンド特徴量が自動的に評価されるようにするには、次のようにレジストリ URI を設定してから、モデルをログする必要があります。

import mlflow
mlflow.set_registry_uri("databricks-uc")

fe.log_model(
    model=model,
    artifact_path="main.default.model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="main.default.recommender_model"
)

オンデマンド特徴量を定義する Python UDF で Python パッケージがインポートされる場合は、引数 extra_pip_requirements を使ってそれらのパッケージを指定する必要があります。 次に例を示します。

import mlflow
mlflow.set_registry_uri("databricks-uc")

fe.log_model(
    model=model,
    artifact_path="model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="main.default.recommender_model",
    extra_pip_requirements=["scikit-learn==1.20.3"]
)

制限

オンデマンド特徴量は、MapType と ArrayType を除き、Feature Store でサポートされているすべてのデータ型を出力できます。

Notebook の例: オンデマンド機能

次のノートブックでは、オンデマンド機能を使用するモデルをトレーニングし、スコアを付ける方法の例を示しています。

基本的なオンデマンド機能のデモ ノートブック

ノートブックを入手

次のノートブックでは、レストラン推奨モデルの例を示しています。 レストランの場所は Databricks オンライン テーブルから検索されます。 ユーザーの現在の場所がスコア付け要求の一部として送信されます。 このモデルでは、オンデマンド機能を使用し、ユーザーからレストランまでのリアルタイム距離を計算します。 その距離がモデルへの入力として使用されます。

オンライン テーブルのデモ ノートブックを使用したレストランのレコメンデーション オンデマンド機能

ノートブックを入手