NLP に Hugging Face Transformers を使用するモデル推論

この記事では、自然言語処理 (NLP) モデル推論に Hugging Face Transformers を使用する方法について説明します。

Hugging Face Transformersは、推論に事前トレーニング済みのモデルを使用するためのパイプライン クラスを提供します。 🤗 Transformers パイプラインは、Azure Databricks で簡単に使用できる幅広い NLP タスクをサポートします。

要件

  • MLflow 2.3
  • Hugging Face transformers ライブラリがインストールされているクラスターは、バッチ推論に使用できます。 transformers ライブラリは、Databricks Runtime 10.4 LTS ML 以降にプレインストールされています。 一般的な NLP モデルの多くは GPU ハードウェアで最適に動作するため、CPU で使用するために特に最適化されたモデルを使用しない限り、最新の GPU ハードウェアを使用して最高のパフォーマンスを得ることができます。

Pandas UDF を使用して Spark クラスターでモデル計算を分散する

事前トレーニング済みのモデルを実験する場合は、Pandas UDF を使用してモデルをラップし、ワーカー CPU または GPU で計算を実行できます。 Pandas UDF は、各ワーカーにモデルを分散します。

また、機械翻訳用の Hugging Face Transformers パイプラインを作成し、Pandas UDF を使用して Spark クラスターのワーカーでパイプラインを実行することもできます。

import pandas as pd
from transformers import pipeline
import torch
from pyspark.sql.functions import pandas_udf

device = 0 if torch.cuda.is_available() else -1
translation_pipeline = pipeline(task="translation_en_to_fr", model="t5-base", device=device)

@pandas_udf('string')
def translation_udf(texts: pd.Series) -> pd.Series:
  translations = [result['translation_text'] for result in translation_pipeline(texts.to_list(), batch_size=1)]
  return pd.Series(translations)

この方法で device を設定すると、クラスターで使用可能な GPU が確実に使用されます。

翻訳用の Hugging Face パイプラインは、Python dict オブジェクトのリストを返します。各オブジェクトには、1 つのキー translation_text と、翻訳されたテキストを含む値があります。 この UDF は、結果から翻訳を抽出して、翻訳されたテキストのみを含む Pandas 系列を返します。 device=0 を設定して GPU を使用するようにパイプラインが構築されている場合、クラスターに複数の GPU を持つインスタンスがある場合、Spark はワーカー ノード上の GPU を自動的に再割り当てします。

UDF を使用してテキスト列を翻訳するには、select ステートメントで UDF を呼び出します。

texts = ["Hugging Face is a French company based in New York City.", "Databricks is based in San Francisco."]
df = spark.createDataFrame(pd.DataFrame(texts, columns=["texts"]))
display(df.select(df.texts, translation_udf(df.texts).alias('translation')))

複雑な結果の型を返す

Pandas UDF を使用すると、より構造化された出力を返すこともできます。 たとえば、名前付きエンティティ認識では、パイプラインは、エンティティ、そのスパン、型、および関連付けられたスコアを含む dict オブジェクトのリストを返します。 翻訳の例と同様ですが、名前付きエンティティ認識の場合、@pandas_udf 注釈の戻り値の型はより複雑になります。

たとえば、ドライバーでパイプラインを実行するなどして、パイプラインの結果の検査を通じて使用する戻り値の型を理解できます。

このサンプルでは、次のコードを使用します。

from transformers import pipeline
import torch
device = 0 if torch.cuda.is_available() else -1
ner_pipeline = pipeline(task="ner", model="Davlan/bert-base-multilingual-cased-ner-hrl", aggregation_strategy="simple", device=device)
ner_pipeline(texts)

注釈を生成するには:

[[{'entity_group': 'ORG',
   'score': 0.99933606,
   'word': 'Hugging Face',
   'start': 0,
   'end': 12},
  {'entity_group': 'LOC',
   'score': 0.99967843,
   'word': 'New York City',
   'start': 42,
   'end': 55}],
 [{'entity_group': 'ORG',
   'score': 0.9996372,
   'word': 'Databricks',
   'start': 0,
   'end': 10},
  {'entity_group': 'LOC',
   'score': 0.999588,
   'word': 'San Francisco',
   'start': 23,
   'end': 36}]]

これを戻り値の型として表すには、struct フィールドの array を使用して、dict エントリを struct のフィールドとして一覧表示します。

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('array<struct<word string, entity_group string, score float, start integer, end integer>>')
def ner_udf(texts: pd.Series) -> pd.Series:
  return pd.Series(ner_pipeline(texts.to_list(), batch_size=1))

display(df.select(df.texts, ner_udf(df.texts).alias('entities')))

パフォーマンスの調整

UDF のパフォーマンスをチューニングするには、いくつかの重要な側面があります。 1 つ目は、各 GPU を効果的に使用することです。これを調整するには、Transformers パイプラインによって GPU に送信されるバッチのサイズを変更します。 2 つ目は、クラスター全体を利用するために DataFrame が適切にパーティション分割されていることを確認することです。

最後に、Hugging Face モデルをキャッシュして、モデルの読み込み時間またはイングレス コストを節約できます。

バッチ サイズを選択する

上記の UDF は、batch_size が 1 の状態ですぐに動作するはずですが、ワーカーが使用できるリソースを効率的に使用できない可能性があります。 パフォーマンスを向上させるには、クラスター内のモデルとハードウェアに合わせてバッチ サイズを調整します。 Databricks では、最適なパフォーマンスを見つけるために、クラスター上のパイプラインのさまざまなバッチ サイズを試すことをお勧めします。 パイプラインのバッチ処理とその他のパフォーマンス オプションの詳細については、Hugging Face のドキュメントを参照してください。

完全な GPU の使用が促進されるが CUDA out of memory のエラーにはつながらないような十分な大きさのバッチ サイズを見つけてみてください。 チューニング中に CUDA out of memory エラーが発生した場合は、GPU 内のモデルとデータによって使用されるメモリを解放するために、ノートブックをデタッチして再アタッチする必要があります。

クラスターのライブ クラスター メトリックを表示し、GPU プロセッサ使用率の gpu0-util や GPU メモリ使用率などの gpu0_mem_util などのメトリックを選択して、GPU のパフォーマンスを監視してください。

ステージ レベルのスケジュール設定を使用して並列処理を調整する

既定では、Spark は各マシンの GPU ごとに 1 つのタスクをスケジュールします。 並列処理を向上させるために、ステージ レベルのスケジュールを使用して、GPU ごとに実行するタスクの数を Spark に指示できます。 たとえば、Spark で GPU ごとに 2 つのタスクを実行する場合は、次のように指定できます。

from pyspark.resource import TaskResourceRequests, ResourceProfileBuilder

task_requests = TaskResourceRequests().resource("gpu", 0.5)

builder = ResourceProfileBuilder()
resource_profile = builder.require(task_requests).build

rdd = df.withColumn('predictions', loaded_model(struct(*map(col, df.columns)))).rdd.withResources(resource_profile)

使用可能なすべてのハードウェアを使用するようにデータを再パーティション分割する

パフォーマンスの 2 番目の考慮事項は、クラスター内のハードウェアを最大限に活用することです。 一般に、ワーカー上の GPU の数 (GPU クラスターの場合) またはクラスター内のワーカー全体のコア数 (CPU クラスターの場合) の倍数が小さい場合は適切に機能します。 入力 DataFrame には、クラスターの並列処理を利用するのに十分なパーティションが既にある場合があります。 DataFrame に含まれるパーティションの数を確認するには、df.rdd.getNumPartitions() を使用します。 repartitioned_df = df.repartition(desired_partition_count) を使用して DataFrame を再パーティション分割できます。

DBFS またはマウント ポイントでモデルをキャッシュする

異なるクラスターまたは再起動されたクラスターからモデルを頻繁に読み込む場合は、DBFS ルート ボリュームまたはマウント ポイントに Hugging Face モデルをキャッシュすることもできます。 これにより、イングレス コストを削減し、新しいクラスターまたは再起動されたクラスターにモデルを読み込む時間を短縮できます。 これを行うには、パイプラインを読み込む前に、コードで TRANSFORMERS_CACHE 環境変数を設定します。

次に例を示します。

import os
os.environ['TRANSFORMERS_CACHE'] = '/dbfs/hugging_face_transformers_cache/'

または、MLflow transformers フレーバーを使用してモデルを MLflow にログ記録することで、同様の結果を得ることもできます。

ノートブック: Hugging Face Transformers 推論と MLflow ログ

サンプル コードをすぐに使い始めるために、このノートブックは、Hugging Face Transformers パイプライン推論と MLflow ログを使用したテキスト要約のエンドツーエンドの例になっています。

Hugging Face Transformers パイプライン推論ノートブック

ノートブックを入手

その他のリソース

次のガイドを使用して Hugging Face モデルを微調整できます。

「Hugging Face Transformers とは」の詳細情報