ML パイプラインのステップ間でのデータの移動 (Python)

適用対象: Python SDK azureml v1

この記事では、Azure Machine Learning パイプラインのステップ間でデータをインポート、変換、および移動するためのコードを示します。 Azure Machine Learning でデータがどのように動作するかの概要については、Azure ストレージ サービスのデータへのアクセスに関する記事を参照してください。 Azure Machine Learning パイプラインの利点と構造については、「Azure Machine Learning パイプラインとは」を参照してください

この記事で取り上げるテクニック:

  • 既存のデータに Dataset オブジェクトを使用する
  • ステップ内でデータにアクセスする
  • トレーニングや検証のサブセットなど、Dataset データをサブセットに分割する
  • 次のパイプライン ステップにデータを転送する OutputFileDatasetConfig オブジェクトを作成する
  • パイプライン ステップへの入力として OutputFileDatasetConfig オブジェクトを使用する
  • 永続化する OutputFileDatasetConfig から新しい Dataset オブジェクトを作成する

前提条件

必要なもの:

既存のデータに Dataset オブジェクトを使用する

パイプラインにデータを取り込む方法としては、データセット オブジェクトを使用することをお勧めします。 Dataset オブジェクトは、ワークスペース全体で使用できる永続データを表します。

Dataset オブジェクトを作成および登録する方法は多数あります。 表形式データセットは、1つまたは複数のファイルで使用できる区切られたデータ用です。 ファイル データセットは、バイナリ データ (画像など) または解析するデータ用です。 Dataset オブジェクトを作成する最も簡単なプログラム的方法は、ワークスペース ストレージまたはパブリック URL で既存の Blob を使用することです。

datastore = Datastore.get(workspace, 'training_data')
iris_dataset = Dataset.Tabular.from_delimited_files(DataPath(datastore, 'iris.csv'))

datastore_path = [
    DataPath(datastore, 'animals/dog/1.jpg'),
    DataPath(datastore, 'animals/dog/2.jpg'),
    DataPath(datastore, 'animals/cat/*.jpg')
]
cats_dogs_dataset = Dataset.File.from_files(path=datastore_path)

各種のオプションとさまざまなソースからのデータセットの作成、それらの登録と Azure Machine Learning UI での確認、データ サイズがコンピューティング能力とどのように相互作用するかの理解、およびそれらのバージョン管理についての詳細は、「Azure Machine Learning データセットを作成する」を参照してください。

データセットをスクリプトに渡す

データセットのパスをスクリプトに渡すには、Dataset オブジェクトの as_named_input() メソッドを使用します。 生成された DatasetConsumptionConfig オブジェクトを引数としてスクリプトに渡すか、またはパイプライン スクリプトへの inputs 引数を使用して、Run.get_context().input_datasets[] を使ってデータセットを取得することができます。

名前付き入力を作成したら、そのアクセス モード (as_mount() または as_download()) を選択できます (FileDataset の場合のみ)。 スクリプトがデータセット内のすべてのファイルを処理し、コンピューティング リソースのディスクがデータセットに対して十分な大きさである場合は、ダウンロード アクセス モードを選択することをお勧めします。 ダウンロード アクセス モードを使用すると、実行時のデータ ストリーミングのオーバーヘッドを回避できます。 スクリプトがデータセットのサブセットにアクセスする場合、またはそれがコンピューティングに対して大きすぎる場合は、マウント アクセス モードを使用します。 詳細については、「マウントとダウンロード」をご覧ください

データセットをパイプライン ステップに渡すには、次の手順を実行します。

  1. TabularDataset.as_named_input() または FileDataset.as_named_input() (末尾に ’s’ はありません) を使用して DatasetConsumptionConfig オブジェクトを作成します
  2. FileDataset の場合のみ: as_mount() または as_download() を使用してアクセス モードを設定します。 TabularDataset では、アクセス モードの設定はサポートされていません。
  3. arguments または inputs 引数のいずれかを使用して、データセットをパイプライン ステップに渡します

次のスニペットは、iris_dataset を使用した、PythonScriptStep コンストラクター内でのこれらの手順の組み合わせの一般的なパターンを示しています (TabularDataset)。


train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    inputs=[iris_dataset.as_named_input('iris')]
)

注意

これらのすべての引数の値 (つまり、"train_data""train.py"cluster、および iris_dataset) を独自のデータで置き換える必要があります。 上記のスニペットは、呼び出しの形式のみを示しており、Microsoft のサンプルには含まれていません。

また、random_split()take_sample() などのメソッドを使用して、複数の入力を作成したり、パイプライン ステップに渡されるデータの量を減らしたりすることもできます。

seed = 42 # PRNG seed
smaller_dataset = iris_dataset.take_sample(0.1, seed=seed) # 10%
train, test = smaller_dataset.random_split(percentage=0.8, seed=seed)

train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    inputs=[train.as_named_input('train'), test.as_named_input('test')]
)

スクリプト内でデータセットにアクセスする

パイプライン ステップ スクリプトへの名前付き入力は、Run オブジェクト内でディクショナリとして使用できます。 Run.get_context() を使用してアクティブな Run オブジェクトを取得し、input_datasets を使用して名前付き入力のディクショナリを取得します。 inputs 引数ではなく arguments 引数を使用して DatasetConsumptionConfig オブジェクトを渡した場合は、ArgParser コードを使用してデータにアクセスします。 次のスニペットでは、両方の手法が示されています。

パイプライン定義スクリプト

# Code for demonstration only: It would be very confusing to split datasets between `arguments` and `inputs`
train_step = PythonScriptStep(
    name="train_data",
    script_name="train.py",
    compute_target=cluster,
    # datasets passed as arguments
    arguments=['--training-folder', train.as_named_input('train').as_download()],
    # datasets passed as inputs
    inputs=[test.as_named_input('test').as_download()]
)

train.pyPythonScriptStep から参照されるスクリプト

# In pipeline script
parser = argparse.ArgumentParser()
# Retreive the dataset passed as an argument
parser.add_argument('--training-folder', type=str, dest='train_folder', help='training data folder mounting point')
args = parser.parse_args()
training_data_folder = args.train_folder
# Retrieve the dataset passed as an input
testing_data_folder = Run.get_context().input_datasets['test']

渡される値は、データセット ファイルのパスです。

登録済みの Dataset に直接アクセスすることもできます。 登録済みのデータセットは永続的であり、ワークスペース全体で共有されるため、それらを直接取得できます。

run = Run.get_context()
ws = run.experiment.workspace
ds = Dataset.get_by_name(workspace=ws, name='mnist_opendataset')

注意

前のスニペットは、呼び出しの形式を示しており、Microsoft のサンプルには含まれていません。 さまざまな引数を、自分のプロジェクトの値に置き換える必要があります。

中間データに OutputFileDatasetConfig を使用する

Dataset オブジェクトは永続データを表しますが、OutputFileDatasetConfig オブジェクトはパイプライン ステップから出力される一時的なデータおよび永続出力データに使用できます。 OutputFileDatasetConfig は、BLOB ストレージ、ファイル共有、adlsgen1、または adlsgen2 へのデータの書き込みをサポートしています。 マウント モードとアップロード モードの両方をサポートしています。 マウント モードでは、マウントされたディレクトリに書き込まれたファイルは、ファイルを閉じたときに永続的に保存されます。 アップロード モードでは、出力ディレクトリに書き込まれたファイルがジョブの最後にアップロードされます。 ジョブが失敗した場合、または取り消された場合、出力ディレクトリはアップロードされません。

OutputFileDatasetConfig オブジェクトの既定の動作では、ワークスペースの既定のデータストアに書き込みます。 arguments パラメーターを使用して OutputFileDatasetConfig オブジェクトを PythonScriptStep に渡します。

from azureml.data import OutputFileDatasetConfig
dataprep_output = OutputFileDatasetConfig()
input_dataset = Dataset.get_by_name(workspace, 'raw_data')

dataprep_step = PythonScriptStep(
    name="prep_data",
    script_name="dataprep.py",
    compute_target=cluster,
    arguments=[input_dataset.as_named_input('raw_data').as_mount(), dataprep_output]
    )

注意

OutputFileDatasetConfig への同時書き込みは失敗します。 1 つの OutputFileDatasetConfig を同時に使用しないようにしてください。 分散トレーニングを使用する場合など、マルチプロセッシングの状況では、1 つの OutputFileDatasetConfig を共有しないでください。

トレーニング ステップの出力として OutputFileDatasetConfig を使用する

パイプラインの PythonScriptStep 内で、プログラムの引数を使用して、使用可能な出力パスを取得できます。 このステップが最初のものであり、出力データを初期化する場合は、指定されたパスにディレクトリを作成する必要があります。 その後、OutputFileDatasetConfig に含める任意のファイルを書き込むことができます。

parser = argparse.ArgumentParser()
parser.add_argument('--output_path', dest='output_path', required=True)
args = parser.parse_args()

# Make directory for file
os.makedirs(os.path.dirname(args.output_path), exist_ok=True)
with open(args.output_path, 'w') as f:
    f.write("Step 1's output")

最初以外のステップへの入力として OutputFileDatasetConfig を読み取る

最初のパイプライン ステップで OutputFileDatasetConfig パスにデータが書き込まれ、それがその最初のステップの出力になった場合、それを後のステップへの入力として使用できます。

次のコードの内容は以下のとおりです。

  • step1_output_data は、PythonScriptStep の出力である step1 が、アップロード アクセス モードで、ADLS Gen 2 データストア my_adlsgen2 に書き込まれることを示します。 ADLS Gen 2 データストアにデータを書き戻すには、ロールのアクセス許可を設定する方法を参照してください。

  • step1 が完了し、step1_output_data によって示される書き込み先に出力が書き込まれると、手順 2 で入力として step1_output_data を使用する準備ができます。

# get adls gen 2 datastore already registered with the workspace
datastore = workspace.datastores['my_adlsgen2']
step1_output_data = OutputFileDatasetConfig(name="processed_data", destination=(datastore, "mypath/{run-id}/{output-name}")).as_upload()

step1 = PythonScriptStep(
    name="generate_data",
    script_name="step1.py",
    runconfig = aml_run_config,
    arguments = ["--output_path", step1_output_data]
)

step2 = PythonScriptStep(
    name="read_pipeline_data",
    script_name="step2.py",
    compute_target=compute,
    runconfig = aml_run_config,
    arguments = ["--pd", step1_output_data.as_input()]

)

pipeline = Pipeline(workspace=ws, steps=[step1, step2])

ヒント

Python スクリプト step2.py 内のデータの読み取りは、「スクリプト内のデータセットにアクセスする」に記載されているものと同じです。スクリプトに --pd の引数を追加してデータにアクセスするには、ArgumentParser を使用します。

再利用のために OutputFileDatasetConfig オブジェクトを登録する

実験期間より長く OutputFileDatasetConfig を利用できるようにする場合、ワークスペースにそれを登録し、実験間で共有し、再利用します。

step1_output_ds = step1_output_data.register_on_complete(
    name='processed_data', 
    description = 'files from step1'
)

不要になったときに OutputFileDatasetConfig のコンテンツを削除する

OutputFileDatasetConfig で書き込まれた中間データは、Azure によって自動的に削除されません。 大量の不要なデータに対するストレージの課金を回避するには、次のいずれかを行う必要があります。

注意事項

中間データの削除は、必ずデータの最終変更日から 30 日が経過した後に行ってください。 それより前にデータを削除すると、パイプラインの実行に失敗する可能性があります。これは、30 日間は中間データを再利用できるとパイプラインで想定しているためです。

  • 不要になった場合は、パイプライン ジョブの終了時に中間データをプログラムで削除してください。
  • 中間データの短期的な記憶域ポリシーを設定して BLOB ストレージを使用します (「Azure Blob Storage アクセス層の自動化によるコストの最適化」を参照してください)。 このポリシーは、ワークスペースの既定ではないデータストアにのみ設定できます。 中間データを既定ではない別のデータ ストアにエクスポートするには、OutputFileDatasetConfig を使います。
    # Get adls gen 2 datastore already registered with the workspace
    datastore = workspace.datastores['my_adlsgen2']
    step1_output_data = OutputFileDatasetConfig(name="processed_data", destination=(datastore, "mypath/{run-id}/{output-name}")).as_upload()
    
  • 不要になったデータは、定期的に確認して削除してください。

詳細については、「Azure Machine Learning のコストを計画して管理する」を参照してください。

次のステップ