特徴を使用してモデルをトレーニングする

この記事では、Unity Catalog またはローカルワークスペース特徴ストアで特徴エンジニアリングを使用してモデルをトレーニングする方法について説明します。 まず、使用する特徴と結合方法を定義するトレーニング データセットを作成する必要があります。 次に、モデルをトレーニングする場合、モデルは特徴への参照を保持します。

Unity Catalog で特徴エンジニアリングを使用してモデルをトレーニングすると、カタログ エクスプローラーでモデルの系列を表示できます。 モデルの作成に使用されたテーブルと関数は自動的に追跡され、表示されます。 「特徴量ストアの系列を表示する」を参照してください。

推論にモデルを使用する場合は、特徴ストアから特徴値を取得することができます。 モデル提供によってモデルを提供することもできます。その場合、オンライン ストアに公開された特徴が自動的に検索されます。 Feature Store モデルは MLflow pyfunc インターフェイスとも互換性があるため、MLflow を使用して、機能テーブルでバッチ推論を実行できます。

モデルで環境変数を使用する場合は、「モデル提供エンドポイントからリソースへのアクセスを構成する」で、モデルをオンラインで提供するときにそれらを使用する方法の詳細を参照してください。

モデルでは、トレーニングに最大 50 個のテーブルと 100 個の関数を使用できます。

トレーニング データセットの作成

モデル トレーニング用特徴テーブルから特定の特徴を選ぶには、FeatureEngineeringClient.create_training_set (Unity Catalog の Feature Engineering の場合) または FeatureStoreClient.create_training_set (ワークスペース Feature Store の場合) API と、FeatureLookup というオブジェクトを使ってトレーニング データセットを作成します。 FeatureLookup では、トレーニング セットで使用する各特徴 (特徴テーブルの名前、特徴の名前、特徴テーブルを create_training_set に渡された DataFrame と結合するときに使用するキーなど) を指定します。 詳細については、「FeatureLookup」を参照してください。

FeatureLookup を作成するときには、feature_names パラメーターを使用します。 feature_names は、トレーニング セットが作成された時点で特徴テーブル内のすべての特徴 (主キーを除く) を検索するために、1 つの特徴名、特徴名の一覧、または None を受け取ります。

Note

その DataFrame 内の lookup_key 列の型と順序は、参照機能テーブルの主キーの型 (timestamp キーを除く) と順序と一致する必要があります。

この記事には、両方のバージョンの構文のコード例が含まれています。

この例では、trainingSet.load_df によって返される DataFrame に、feature_lookups の各特徴の列が含まれています。 exclude_columns を使用して除外された列を除き、create_training_set に指定された DataFrame のすべての列が保持されます。

Unity Catalog の特徴エンジニアリング

from databricks.feature_engineering import FeatureEngineeringClient, FeatureLookup

# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key='customer_id'
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

fe = FeatureEngineeringClient()

# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fe.create_training_set(
  df=training_df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id', 'product_id']
)

training_df = training_set.load_df()

ワークスペース Feature Store

from databricks.feature_store import FeatureLookup, FeatureStoreClient

# The model training uses two features from the 'customer_features' feature table and
# a single feature from 'product_features'
feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key='customer_id'
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

fs = FeatureStoreClient()

# Create a training set using training DataFrame and features from Feature Store
# The training DataFrame must contain all lookup keys from the set of feature lookups,
# in this case 'customer_id' and 'product_id'. It must also contain all labels used
# for training, in this case 'rating'.
training_set = fs.create_training_set(
  df=training_df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id', 'product_id']
)

training_df = training_set.load_df()

参照キーが主キーと一致しない場合に TrainingSet を作成する

トレーニング セットの列名には、lookup_key の 引数 FeatureLookup を使用します。 create_training_set は、特徴テーブルの作成時に主キーが指定された順序を使用して、lookup_key 引数で指定されたトレーニング セットの列間で順序付けされた結合を実行します。

この例では、recommender_system.customer_features には次の主キーがあります。customer_iddt

recommender_system.product_features 特徴テーブルには主キー product_id があります。

training_df に次の列がある場合、

  • cid
  • transaction_dt
  • product_id
  • rating

次のコードでは TrainingSet の正しい特徴参照が作成されます。

Unity Catalog の特徴エンジニアリング

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key=['cid', 'transaction_dt']
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

ワークスペース Feature Store

feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d', 'total_purchases_7d'],
      lookup_key=['cid', 'transaction_dt']
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

create_training_set が呼び出された場合は、次のコードに示すように、左結合を実行し、recommender_system.customer_featurestraining_df のテーブルを結合し、(cidtransaction_dt) に対応するキー (customer_iddt) を使用してトレーニング データセットを作成します。

Unity Catalog の特徴エンジニアリング

customer_features_df = spark.sql("SELECT * FROM ml.recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM ml.recommender_system.product_features")

training_df.join(
  customer_features_df,
  on=[training_df.cid == customer_features_df.customer_id,
      training_df.transaction_dt == customer_features_df.dt],
  how="left"
).join(
  product_features_df,
  on="product_id",
  how="left"
)

ワークスペース Feature Store

customer_features_df = spark.sql("SELECT * FROM recommender_system.customer_features")
product_features_df = spark.sql("SELECT * FROM recommender_system.product_features")

training_df.join(
  customer_features_df,
  on=[training_df.cid == customer_features_df.customer_id,
      training_df.transaction_dt == customer_features_df.dt],
  how="left"
).join(
  product_features_df,
  on="product_id",
  how="left"
)

異なる特徴テーブルの同じ名前の 2 つの特徴を含む TrainingSet を作成する

FeatureLookup で省略可能な引数 output_name を使用します。 指定された名前は、TrainingSet.load_df によって返される DataFrame の特徴名の代わりに使用されます。 たとえば、次のコードでは、training_set.load_df によって返される DataFrame に列 customer_heightproduct_height が含まれます。

Unity Catalog の特徴エンジニアリング

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['height'],
      lookup_key='customer_id',
      output_name='customer_height',
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['height'],
      lookup_key='product_id',
      output_name='product_height'
    ),
  ]

fe = FeatureEngineeringClient()

with mlflow.start_run():
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id']
  )
  training_df = training_set.load_df()

ワークスペース Feature Store

feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['height'],
      lookup_key='customer_id',
      output_name='customer_height',
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['height'],
      lookup_key='product_id',
      output_name='product_height'
    ),
  ]

fs = FeatureStoreClient()

with mlflow.start_run():
  training_set = fs.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id']
  )
  training_df = training_set.load_df()

同じ機能を複数回使用して TrainingSet を作成する

異なる検索回数キーで結合された同じ機能を使用して TrainingSet を作成するには、複数の FeatureLookups を使用します。 FeatureLookup 出力ごとに一意の output_name を使用します。

Unity Catalog の特徴エンジニアリング

feature_lookups = [
    FeatureLookup(
      table_name='ml.taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['pickup_zip'],
      output_name='pickup_temp'
    ),
    FeatureLookup(
      table_name='ml.taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['dropoff_zip'],
      output_name='dropoff_temp'
    )
  ]

ワークスペース Feature Store

feature_lookups = [
    FeatureLookup(
      table_name='taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['pickup_zip'],
      output_name='pickup_temp'
    ),
    FeatureLookup(
      table_name='taxi_data.zip_features',
      feature_names=['temperature'],
      lookup_key=['dropoff_zip'],
      output_name='dropoff_temp'
    )
  ]

管理されていない機械学習モデル用の TrainingSet を作成する

管理されていない学習モデル用の TrainingSet を作成するときは label=None を設定します。 たとえば、次の TrainingSet を使用して、関心に基づいてさまざまな顧客をグループにクラスター化できます。

Unity Catalog の特徴エンジニアリング

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['interests'],
      lookup_key='customer_id',
    ),
  ]

fe = FeatureEngineeringClient()
with mlflow.start_run():
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label=None,
    exclude_columns=['customer_id']
  )

  training_df = training_set.load_df()

ワークスペース Feature Store

feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['interests'],
      lookup_key='customer_id',
    ),
  ]

fs = FeatureStoreClient()
with mlflow.start_run():
  training_set = fs.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label=None,
    exclude_columns=['customer_id']
  )

  training_df = training_set.load_df()

モデルをトレーニングし、特徴テーブルを使用してバッチ推論を実行する

Feature Store の特徴を使用してモデルをトレーニングする場合、モデルは特徴への参照を保持します。 推論にモデルを使用する場合は、特徴ストアから特徴値を取得することができます。 モデルで使用される特徴の主キーを指定する必要があります。 モデルは、ワークスペースの Feature Store から必要な特徴を取得します。 その後、スコアリング中に必要に応じて特徴値を結合します。

推論時の特徴参照をサポートするには:

  • FeatureEngineeringClient (Unity Catalog の Feature Engineering の場合) または FeatureStoreClient (ワークスペース Feature Store の場合) の log_model メソッドを使ってモデルをログする必要があります。
  • モデルをトレーニングするには、TrainingSet.load_df によって返される DataFrame を使用する必要があります。 この DataFrame をモデルのトレーニングに使用する前に何か変更した場合、推論にモデルを使用しても変更は適用されません。 このため、モデルのパフォーマンスが低下します。
  • モデル型には、MLflow の対応する python_flavor が必要です。 MLflow では、次を含むほとんどの Python モデル トレーニング フレームワークがサポートされています。
    • scikit-learn
    • keras
    • PyTorch
    • SparkML
    • LightGBM
    • XGBoost
    • TensorFlow Keras (python_flavor mlflow.keras を使用)
  • カスタム MLflow pyfunc モデル

Unity Catalog の特徴エンジニアリング

# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
    FeatureLookup(
      table_name='ml.recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

fe = FeatureEngineeringClient()

with mlflow.start_run():

  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id', 'product_id']
  )

  training_df = training_set.load_df().toPandas()

  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating

  model = linear_model.LinearRegression().fit(X_train, y_train)

  fe.log_model(
    model=model,
    artifact_path="recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model"
  )

# Batch inference

# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.

fe = FeatureEngineeringClient()

# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fe.score_batch(
    model_uri=model_uri,
    df=batch_df
)

# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’

ワークスペース Feature Store

# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
    FeatureLookup(
      table_name='recommender_system.product_features',
      feature_names=['category'],
      lookup_key='product_id'
    )
  ]

fs = FeatureStoreClient()

with mlflow.start_run():

  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fs.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id', 'product_id']
  )

  training_df = training_set.load_df().toPandas()

  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating

  model = linear_model.LinearRegression().fit(X_train, y_train)

  fs.log_model(
    model=model,
    artifact_path="recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model"
  )

# Batch inference

# If the model at model_uri is packaged with the features, the FeatureStoreClient.score_batch()
# call automatically retrieves the required features from Feature Store before scoring the model.
# The DataFrame returned by score_batch() augments batch_df with
# columns containing the feature values and a column containing model predictions.

fs = FeatureStoreClient()

# batch_df has columns ‘customer_id’ and ‘product_id’
predictions = fs.score_batch(
    model_uri=model_uri,
    df=batch_df
)

# The ‘predictions’ DataFrame has these columns:
# ‘customer_id’, ‘product_id’, ‘total_purchases_30d’, ‘category’, ‘prediction’

特徴メタデータでパッケージ化されたモデルをスコア付けするときに、カスタム特徴値を使用する

既定で、特徴メタデータを使ってパッケージ化されたモデルは、推論時に特徴テーブルから特徴を検索します。 スコアリングにカスタムの特徴値を使うには、FeatureEngineeringClient.score_batch (Unity Catalog の Feature Engineering の場合) または FeatureStoreClient.score_batch (ワークスペース Feature Store の場合) に渡される DataFrame にそれらを含めます。

たとえば、次の 2 つの特徴を持つモデルをパッケージ化するとします。

Unity Catalog の特徴エンジニアリング

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['account_creation_date', 'num_lifetime_purchases'],
      lookup_key='customer_id',
    ),
  ]

ワークスペース Feature Store

feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['account_creation_date', 'num_lifetime_purchases'],
      lookup_key='customer_id',
    ),
  ]

推論では、account_creation_date という名前の列を含む DataFrame で score_batch を呼び出すことによって、特徴 account_creation_date のカスタム値を指定できます。 この場合、API は Feature Store の num_lifetime_purchases 特徴のみを検索し、指定されたカスタム account_creation_date 列の値をモデル スコアリングに使用します。

Unity Catalog の特徴エンジニアリング

# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fe.score_batch(
  model_uri='models:/ban_prediction_model/1',
  df=batch_df
)

ワークスペース Feature Store

# batch_df has columns ['customer_id', 'account_creation_date']
predictions = fs.score_batch(
  model_uri='models:/ban_prediction_model/1',
  df=batch_df
)

Feature Store の特徴と、Feature Store の外部に存在するデータの組み合わせを使用してモデルをトレーニングおよびスコア付けする

Feature Store の特徴と外部のデータを組み合わせて使用して、モデルをトレーニングできます。 モデルを特徴メタデータでパッケージ化すると、モデルは推論のために特徴ストアから特徴値を取得します。

モデルをトレーニングするには、FeatureEngineeringClient.create_training_set (Unity Catalog の Feature Engineering の場合) または FeatureStoreClient.create_training_set (ワークスペース Feature Store の場合) に渡される DataFrame に列として追加のデータを含めます。 この例では、Feature Store の total_purchases_30d 特徴と外部列 browser を使用します。

Unity Catalog の特徴エンジニアリング

feature_lookups = [
    FeatureLookup(
      table_name='ml.recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
  ]

fe = FeatureEngineeringClient()

# df has columns ['customer_id', 'browser', 'rating']
training_set = fe.create_training_set(
  df=df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id']  # 'browser' is not excluded
)

ワークスペース Feature Store

feature_lookups = [
    FeatureLookup(
      table_name='recommender_system.customer_features',
      feature_names=['total_purchases_30d'],
      lookup_key='customer_id',
    ),
  ]

fs = FeatureStoreClient()

# df has columns ['customer_id', 'browser', 'rating']
training_set = fs.create_training_set(
  df=df,
  feature_lookups=feature_lookups,
  label='rating',
  exclude_columns=['customer_id']  # 'browser' is not excluded
)

推論では、FeatureStoreClient.score_batch で使用される DataFrame に browser 列を含める必要があります。

Unity Catalog の特徴エンジニアリング

# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fe.score_batch(
  model_uri=model_uri,
  df=batch_df
)

ワークスペース Feature Store

# At inference, 'browser' must be provided
# batch_df has columns ['customer_id', 'browser']
predictions = fs.score_batch(
  model_uri=model_uri,
  df=batch_df
)

MLflow を使用してモデルを読み込み、バッチ推論を実行する

モデルが FeatureEngineeringClient (Unity カタログの Feature Engineering の場合) または FeatureStoreClient (ワークスペース Feature Store の場合) のlog_model メソッドを使用してログに記録された後、MLflow を推論で使用できます。 MLflow.pyfunc.predict は、Feature Store から特徴値を取得し、推論時に指定されたすべての値も結合します。 モデルで使用される特徴の主キーを指定する必要があります。

Note

MLflow でのバッチ推論には、MLflow バージョン 2.11 以降が必要です。 時系列特徴テーブルを使用するモデルはサポートされていません。 時系列特徴テーブルを使用してバッチ推論を実行するには、 score_batch を使用します。 「モデルをトレーニングし、特徴テーブルを使用してバッチ推論を実行する」を参照してください。

# Train model
import mlflow
from sklearn import linear_model

feature_lookups = [
  FeatureLookup(
    table_name='ml.recommender_system.customer_features',
    feature_names=['total_purchases_30d'],
    lookup_key='customer_id',
  ),
  FeatureLookup(
    table_name='ml.recommender_system.product_features',
    feature_names=['category'],
    lookup_key='product_id'
  )
]

fe = FeatureEngineeringClient()

with mlflow.start_run():

  # df has columns ['customer_id', 'product_id', 'rating']
  training_set = fe.create_training_set(
    df=df,
    feature_lookups=feature_lookups,
    label='rating',
    exclude_columns=['customer_id', 'product_id']
  )

  training_df = training_set.load_df().toPandas()

  # "training_df" columns ['total_purchases_30d', 'category', 'rating']
  X_train = training_df.drop(['rating'], axis=1)
  y_train = training_df.rating

  model = linear_model.LinearRegression().fit(X_train, y_train)

  fe.log_model(
    model=model,
    artifact_path="recommendation_model",
    flavor=mlflow.sklearn,
    training_set=training_set,
    registered_model_name="recommendation_model",
    #refers to the default value of "result_type" if not provided at inference
    params={"result_type":"double"},
  )

# Batch inference with MLflow

# NOTE: the result_type parameter can only be used if a default value
# is provided in log_model. This is automatically done for all models
# logged using Databricks Runtime for ML 15.0 or above.
# For earlier Databricks Runtime versions, use set_result as shown below.

# batch_df has columns ‘customer_id’ and ‘product_id’
model = mlflow.pyfunc.load_model(model_version_uri)

# If result_type parameter is provided in log_model
predictions = model.predict(df, {"result_type":"double"})

# If result_type parameter is NOT provided in log_model
model._model_impl.set_result_type("double")
predictions = model.predict(df)

見つからない特徴量値を処理する

存在しない検索キーが予測用のモデルに渡された場合、FeatureLookup によってフェッチされる特徴量値は、環境に応じて None または NaN のどちらかになります。 モデルの実装では、両方の値を処理できる必要があります。

  • fe.score_batch を使用するオフライン アプリケーションの場合、見つからない特徴量に対する戻り値は NaN となります。
  • Model Serving を使用するオンライン アプリケーションの場合、戻り値は None または NaN のどちらかになります。
    • 指定された検索キーがどれも存在しない場合、値は None になります。
    • 検索キーのサブセットのみが存在しない場合、値は NaN になります。

オンデマンド特徴量を使用する際に見つからない特徴量値を処理するには、「見つからない特徴値を処理する方法」を参照してください。