チュートリアル: Horovod Estimator と PyTorch を使用した分散トレーニング (非推奨)
Horovod は、TensorFlow や PyTorch のようなライブラリ向けの分散トレーニング フレームワークです。 Horovod を使うと、数行のコードで、数百の GPU 上で実行できるように既存のトレーニング スクリプトをスケールアップできます。
Azure Synapse Analytics 内で、既定の Apache Spark 3 ランタイムを使って Horovod をすぐに開始できます。 PyTorch を使用する Spark ML パイプライン アプリケーションの場合は、ユーザーは horovod.spark estimator API を使用できます。 このノートブックでは、Apache Spark データフレームを使って、MNIST データセットに対して分散ニューラル ネットワーク (DNN) モデルの分散トレーニングを実行します。 このチュートリアルでは、PyTorch と Horovod Estimator を使用してトレーニング プロセスを実行します。
前提条件
- Azure Data Lake Storage Gen2 ストレージ アカウントが既定のストレージとして構成されている Azure Synapse Analytics ワークスペース。 使用する Data Lake Storage Gen2 ファイル システムの "Storage Blob データ共同作成者" である必要があります。
- GPU 対応の Apache Spark プールを Azure Synapse Analytics ワークスペースに作成します。 詳細については、Azure Synapse での GPU 対応 Apache Spark プールの作成に関する記事を参照してください。 このチュートリアルでは、3 つのノードを含む GPU の大きなクラスター サイズを使用することをお勧めします。
Note
Azure Synapse GPU 対応プールのプレビューは非推奨になりました。
注意事項
Azure Synapse Runtime for Apache Spark 3.1 および 3.2 での GPU の非推奨と無効化の通知
- Apache Spark 3.2 (非推奨) ランタイムで GPU 高速プレビューが非推奨になりました。 非推奨のランタイムでは、バグおよび機能の修正は行われません。 このランタイムと Spark 3.2 の対応する GPU 高速化プレビューは、2024 年 7 月 8 日の時点で廃止され、無効になっています。
- Azure Synapse 3.1 (非推奨) ランタイムで GPU 高速プレビューが非推奨になりました。 Azure Synapse Runtime for Apache Spark 3.1 は、2023 年 1 月 26 日でサポート終了となりました。公式サポートの提供は 2024 年 1 月 26 日をもって終了し、この日付以降、サポート チケットの処理、バグ修正、セキュリティ更新は行われません。
Apache Spark セッションを構成する
セッションの最初に、Apache Spark 設定の構成をいくつか行う必要があります。 ほとんどの場合、設定する必要があるのは numExecutors と spark.rapids.memory.gpu.reserve のみです。 大規模なモデルでは、spark.kryoserializer.buffer.max
設定も構成する必要がある場合があります。 Tensorflow モデルでは、spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH
を true に設定する必要があります。
この例では、%%configure
コマンドを使って Spark 構成を渡す方法を紹介します。 各パラメーターの詳細な意味については、Apache Spark 構成ドキュメントを参照してください。 指定された値は、Azure Synapse GPU 大規模プールに推奨されるベスト プラクティス値を示します。
%%configure -f
{
"driverMemory": "30g",
"driverCores": 4,
"executorMemory": "60g",
"executorCores": 12,
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g",
"spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
"spark.kryoserializer.buffer.max": "2000m"
}
}
このチュートリアルでは、次の構成を使います。
%%configure -f
{
"numExecutors": 3,
"conf":{
"spark.rapids.memory.gpu.reserve": "10g"
}
}
Note
Horovod を使用したトレーニングでは、ユーザーは numExecutors
の Spark 構成をノード数以下に設定する必要があります。
依存関係のインポート
このチュートリアルでは、PySpark を使用してデータセットの読み取りと処理を行います。 次に、PyTorch と Horovod を使用して分散ニューラル ネットワーク (DNN) モデルを構築し、トレーニング プロセスを実行します。 開始するには、次の依存関係をインポートする必要があります。
# base libs
import sys
import uuid
# numpy
import numpy as np
# pyspark related
import pyspark
import pyspark.sql.types as T
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
# pytorch related
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
# horovod related
import horovod.spark.torch as hvd
from horovod.spark.common.backend import SparkBackend
from horovod.spark.common.store import Store
# azure related
from azure.synapse.ml.horovodutils import AdlsStore
代替ストレージ アカウントに接続する
中間データとモデル データを格納するには、Azure Data Lake Storage (ADLS) アカウントが必要です。 別のストレージ アカウントを使用している場合は、リンク サービスを設定して、アカウントを自動的に認証し、読み取ります。 さらに、remote_url
、account_name
、linked_service_name
プロパティを変更する必要があります。
num_proc = 3 # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.01 # learning rate for single node code
uuid_str = str(uuid.uuid4()) # with uuid, each run will use a new directory
work_dir = '/tmp/' + uuid_str
# create adls store for model training, use your own adls account info
remote_url = "<<ABFS path to storage account>>"
account_name = "<<name of storage account>>"
linked_service_name = "<<name of linked service>>"
sas_token = TokenLibrary.getConnectionString(linked_service_name)
adls_store_path = remote_url + work_dir
store = AdlsStore.create(adls_store_path,
storage_options={
'account_name': account_name,
'sas_token': sas_token
},
save_runs=True)
print(adls_store_path)
データセットを準備する
次に、トレーニング用のデータセットを準備します。 このチュートリアルでは、Azure Open Datasets の MNIST データセットを使用します。
# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()
# Download MNIST dataset from Azure Open Datasets
from azureml.opendatasets import MNIST
mnist = MNIST.get_tabular_dataset()
mnist_df = mnist.to_pandas_dataframe()
mnist_df.info()
# Preprocess dataset
mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)
mnist_df.head()
Apache Spark を使用してデータを処理する
ここで、Apache Spark データフレームを作成します。 このデータフレームは、トレーニングのために HorovodEstimator
で使用されます。
# Create Spark DataFrame for training
df = spark.createDataFrame(mnist_df)
# repartition DataFrame for training
train_df = df.repartition(num_proc)
# Train/test split
train_df, test_df = train_df.randomSplit([0.9, 0.1])
# show the dataset
train_df.show()
train_df.count()
DNN モデルを定義する
データセットの処理が完了したら、PyTorch モデルを定義できます。 同じコードを使用して、単一ノードの PyTorch モデルをトレーニングすることもできます。
# Define the PyTorch model without any Horovod-specific parameters
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = x.float()
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x)
model = Net()
optimizer = optim.SGD(model.parameters(),
lr=lr_single_node * num_proc,
momentum=0.5) # notice the lr is scaled up
loss = nn.NLLLoss()
モデルのトレーニング
これで、Apache Spark データフレーム上で Horovod Spark 推定器をトレーニングできます。
# Train a Horovod Spark Estimator on the DataFrame
backend = SparkBackend(num_proc=num_proc,
stdout=sys.stdout,
stderr=sys.stderr,
prefix_output_with_timestamp=True)
torch_estimator = hvd.TorchEstimator(
backend=backend,
store=store,
partitions_per_process=1, # important for GPU training
model=model,
optimizer=optimizer,
loss=lambda input, target: loss(input, target.long()),
input_shapes=[[-1, 1, 28, 28]],
feature_cols=['features'],
label_cols=['label'],
batch_size=batch_size,
epochs=epochs,
validation=0.1,
verbose=2)
torch_model = torch_estimator.fit(train_df).setOutputCols(['label_prob'])
トレーニング済みモデルを評価する
トレーニング プロセスが完了したら、テスト データセットに対してモデルを評価できます。
# Evaluate the model on the held-out test DataFrame
pred_df = torch_model.transform(test_df)
argmax = udf(lambda v: float(np.argmax(v)), returnType=T.DoubleType())
pred_df = pred_df.withColumn('label_pred', argmax(pred_df.label_prob))
evaluator = MulticlassClassificationEvaluator(predictionCol='label_pred',
labelCol='label',
metricName='accuracy')
print('Test accuracy:', evaluator.evaluate(pred_df))
リソースをクリーンアップする
Spark インスタンスがシャットダウンされるようにするには、接続されているセッション (ノートブック) を終了します。 プールは、Apache Spark プールに指定されているアイドル時間に達したときにシャットダウンされます。 また、ノートブックの右上にあるステータス バーから [セッションの停止] を選択することもできます。