チュートリアル: Horovod Runner と TensorFlow を使用した分散トレーニング (非推奨)

Horovod は、TensorFlow や PyTorch のようなライブラリ向けの分散トレーニング フレームワークです。 Horovod を使うと、数行のコードで、数百の GPU 上で実行できるように既存のトレーニング スクリプトをスケールアップできます。

Azure Synapse Analytics 内で、既定の Apache Spark 3 ランタイムを使って Horovod をすぐに開始できます。 TensorFlow を使用する Spark ML パイプライン アプリケーションの場合は、HorovodRunner を使用できます。 このノートブックでは、Apache Spark データフレームを使って、MNIST データセットに対して分散ニューラル ネットワーク (DNN) モデルの分散トレーニングを実行します。 このチュートリアルでは、TensorFlow と HorovodRunner を使ってトレーニング プロセスを実行します。

前提条件

  • Azure Data Lake Storage Gen2 ストレージ アカウントが既定のストレージとして構成されている Azure Synapse Analytics ワークスペース。 使用する Data Lake Storage Gen2 ファイル システムの "Storage Blob データ共同作成者" である必要があります。
  • GPU 対応の Apache Spark プールを Azure Synapse Analytics ワークスペースに作成します。 詳細については、Azure Synapse での GPU 対応 Apache Spark プールの作成に関する記事を参照してください。 このチュートリアルでは、3 つのノードを含む GPU の大きなクラスター サイズを使用することをお勧めします。

Note

Azure Synapse GPU 対応プールのプレビューは非推奨になりました。

注意事項

Azure Synapse Runtime for Apache Spark 3.1 および 3.2 上の GPU の非推奨化と無効化の通知

  • Apache Spark 3.2 (非推奨) ランタイムで GPU アクセラレーション プレビューが非推奨になりました。 非推奨のランタイムでは、バグおよび機能の修正は行われません。 このランタイムと Spark 3.2 の対応する GPU アクセラレーション プレビューは、2024 年 7 月 8 日の時点で廃止され、無効化されました。
  • Azure Synapse 3.1 (非推奨) ランタイムで GPU アクセラレーション プレビューが非推奨になりました。 Azure Synapse Runtime for Apache Spark 3.1 は、2023 年 1 月 26 日でサポート終了となりました。公式サポートの提供は 2024 年 1 月 26 日をもって終了し、この日付以降、サポート チケットの処理、バグ修正、セキュリティ更新は行われません。

Apache Spark セッションを構成する

セッションの最初に、いくつかの Apache Spark 設定を構成する必要があります。 ほとんどの場合、設定する必要があるのは numExecutorsspark.rapids.memory.gpu.reserve のみです。 特に大規模なモデルでは、spark.kryoserializer.buffer.max 設定も構成する必要がある場合があります。 TensorFlow モデルでは、spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH を true に設定する必要があります。

この例では、%%configure コマンドを使って Spark 構成を渡す方法を紹介します。 各パラメーターの詳細な意味については、Apache Spark 構成ドキュメントを参照してください。 指定されている値は、Azure Synapse GPU 大規模プール向けに推奨されるベスト プラクティス値です。


%%configure -f
{
    "driverMemory": "30g",
    "driverCores": 4,
    "executorMemory": "60g",
    "executorCores": 12,
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true",
        "spark.kryoserializer.buffer.max": "2000m"
   }
}

このチュートリアルでは、次の構成を使います。


%%configure -f
{
    "numExecutors": 3,
    "conf":{
        "spark.rapids.memory.gpu.reserve": "10g",
        "spark.executorEnv.TF_FORCE_GPU_ALLOW_GROWTH": "true"
   }
}

Note

Horovod を使用したトレーニングでは、numExecutors の Spark 構成をノード数以下に設定する必要があります。

プライマリ ストレージ アカウントを設定する

中間データとモデル データを格納するには、Azure Data Lake Storage (ADLS) アカウントが必要です。 別のストレージ アカウントを使用している場合は、リンク サービスを設定して、アカウントを自動的に認証し、読み取ります。

この例では、プライマリ Azure Synapse Analytics ストレージ アカウントからデータを読み取ります。 結果を読み取るには、remote_url プロパティを変更する必要があります。

# Specify training parameters
num_proc = 3  # equal to numExecutors
batch_size = 128
epochs = 3
lr_single_node = 0.1  # learning rate for single node code

# configure adls store remote url
remote_url = "<<abfss path to storage account>>

データセットを準備する

次に、トレーニング用のデータセットを準備します。 このチュートリアルでは、Azure Open Datasets の MNIST データセットを使います。

def get_dataset(rank=0, size=1):
    # import dependency libs
    from azureml.opendatasets import MNIST
    from sklearn.preprocessing import OneHotEncoder
    import numpy as np

    # Download MNIST dataset from Azure Open Datasets
    mnist = MNIST.get_tabular_dataset()
    mnist_df = mnist.to_pandas_dataframe()

    # Preprocess dataset
    mnist_df['features'] = mnist_df.iloc[:, :784].values.tolist()
    mnist_df.drop(mnist_df.iloc[:, :784], inplace=True, axis=1)

    x = np.array(mnist_df['features'].values.tolist())
    y = np.array(mnist_df['label'].values.tolist()).reshape(-1, 1)

    enc = OneHotEncoder()
    enc.fit(y)
    y = enc.transform(y).toarray()

    (x_train, y_train), (x_test, y_test) = (x[:60000], y[:60000]), (x[60000:],
                                                                    y[60000:])

    # Prepare dataset for distributed training
    x_train = x_train[rank::size]
    y_train = y_train[rank::size]
    x_test = x_test[rank::size]
    y_test = y_test[rank::size]

    # Reshape and Normalize data for model input
    x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
    x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255.0
    x_test /= 255.0

    return (x_train, y_train), (x_test, y_test)

DNN モデルを定義する

データセットを処理したら、TensorFlow モデルを定義できます。 同じコードを使って、単一ノードの TensorFlow モデルをトレーニングすることもできます。

# Define the TensorFlow model without any Horovod-specific parameters
def get_model():
    from tensorflow.keras import models
    from tensorflow.keras import layers

    model = models.Sequential()
    model.add(
        layers.Conv2D(32,
                      kernel_size=(3, 3),
                      activation='relu',
                      input_shape=(28, 28, 1)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(layers.Dropout(0.25))
    model.add(layers.Flatten())
    model.add(layers.Dense(128, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(10, activation='softmax'))
    return model

1 つのノード用のトレーニング関数を定義する

まず、Apache Spark プールのドライバー ノード上で TensorFlow モデルをトレーニングします。 トレーニング プロセスが完了したら、モデルを評価し、損失と精度のスコアを出力します。


def train(learning_rate=0.1):
    import tensorflow as tf
    from tensorflow import keras

    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

    # Prepare dataset
    (x_train, y_train), (x_test, y_test) = get_dataset()

    # Initialize model
    model = get_model()

    # Specify the optimizer (Adadelta in this example)
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))
    return model

# Run the training process on the driver
model = train(learning_rate=lr_single_node)

# Evaluate the single node, trained model
_, (x_test, y_test) = get_dataset()
loss, accuracy = model.evaluate(x_test, y_test, batch_size=128)
print("loss:", loss)
print("accuracy:", accuracy)

分散トレーニングのために HorovodRunner に移行する

次に、分散トレーニングのために HorovodRunner を使って同じコードを再実行する方法について説明します。

トレーニング関数を定義する

モデルをトレーニングするには、まず HorovodRunner のトレーニング関数を定義します。

# Define training function for Horovod runner
def train_hvd(learning_rate=0.1):
    # Import base libs
    import tempfile
    import os
    import shutil
    import atexit

    # Import tensorflow modules to each worker
    import tensorflow as tf
    from tensorflow import keras
    import horovod.tensorflow.keras as hvd

    # Initialize Horovod
    hvd.init()

    # Pin GPU to be used to process local rank (one GPU per process)
    # These steps are skipped on a CPU cluster
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()],
                                                   'GPU')

    # Call the get_dataset function you created, this time with the Horovod rank and size
    (x_train, y_train), (x_test, y_test) = get_dataset(hvd.rank(), hvd.size())

    # Initialize model with random weights
    model = get_model()

    # Adjust learning rate based on number of GPUs
    optimizer = keras.optimizers.Adadelta(learning_rate=learning_rate *
                                          hvd.size())

    # Use the Horovod Distributed Optimizer
    optimizer = hvd.DistributedOptimizer(optimizer)

    model.compile(optimizer=optimizer,
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

    # Create a callback to broadcast the initial variable states from rank 0 to all other processes.
    # This is required to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint.
    callbacks = [
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    ]

    # Model checkpoint location.
    ckpt_dir = tempfile.mkdtemp()
    ckpt_file = os.path.join(ckpt_dir, 'checkpoint.h5')
    atexit.register(lambda: shutil.rmtree(ckpt_dir))

    # Save checkpoints only on worker 0 to prevent conflicts between workers
    if hvd.rank() == 0:
        callbacks.append(
            keras.callbacks.ModelCheckpoint(ckpt_file,
                                            monitor='val_loss',
                                            mode='min',
                                            save_best_only=True))

    model.fit(x_train,
              y_train,
              batch_size=batch_size,
              callbacks=callbacks,
              epochs=epochs,
              verbose=2,
              validation_data=(x_test, y_test))

    # Return model bytes only on worker 0
    if hvd.rank() == 0:
        with open(ckpt_file, 'rb') as f:
            return f.read()

トレーニングを実行する

モデルを定義したら、トレーニング プロセスを実行できます。

# Run training
import os
import sys
import horovod.spark


best_model_bytes = \
    horovod.spark.run(train_hvd, args=(lr_single_node, ), num_proc=num_proc,
                    env=os.environ.copy(),
                    stdout=sys.stdout, stderr=sys.stderr, verbose=2,
                    prefix_output_with_timestamp=True)[0]

ADLS ストレージにチェックポイントを保存する

次のコードは、チェックポイントを Azure Data Lake Storage (ADLS) アカウントに保存する方法を示しています。

import tempfile
import fsspec
import os

local_ckpt_dir = tempfile.mkdtemp()
local_ckpt_file = os.path.join(local_ckpt_dir, 'mnist-ckpt.h5')
adls_ckpt_file = remote_url + local_ckpt_file

with open(local_ckpt_file, 'wb') as f:
    f.write(best_model_bytes)

## Upload local file to ADLS
fs = fsspec.filesystem('abfss')
fs.upload(local_ckpt_file, adls_ckpt_file)

print(adls_ckpt_file)

Horovod トレーニング済みモデルを評価する

モデルのトレーニングが完了したら、最終的なモデルの損失と精度を確認できます。

import tensorflow as tf

hvd_model = tf.keras.models.load_model(local_ckpt_file)

_, (x_test, y_test) = get_dataset()
loss, accuracy = hvd_model.evaluate(x_test, y_test, batch_size=128)
print("loaded model loss and accuracy:", loss, accuracy)

リソースをクリーンアップする

Spark インスタンスがシャットダウンされるようにするには、接続されているセッション (ノートブック) を終了します。 プールは、Apache Spark プールに指定されているアイドル時間に達したときにシャットダウンされます。 また、ノートブックの右上にあるステータス バーから [セッションの停止] を選択することもできます。

ステータス バーの [セッションの停止] ボタンを示すスクリーンショット。

次のステップ