Usare processi in parallelo nelle pipeline

SI APPLICA A:Estensione ml dell'interfaccia della riga di comando di Azure v2 (corrente)Python SDK azure-ai-ml v2 (corrente)

Questo articolo illustra come usare l'interfaccia della riga di comando v2 e Python SDK v2 per eseguire processi in parallelo nelle pipeline di Azure Machine Learning. I processi in parallelo accelerano l'esecuzione dei processi distribuendo attività ripetute in cluster di calcolo multinodo efficienti.

I tecnici che si occupano di apprendimento automatico hanno sempre requisiti di scalabilità per le attività di training o inferenza. Ad esempio, quando un data scientist fornisce un singolo script per eseguire il training di un modello di stima delle vendite, i tecnici che si occupano di apprendimento automatico devono applicare questa attività di training a ogni singolo archivio dati. Le problematiche poste da questo processo di scale-out includono tempi di esecuzione lunghi che causano ritardi e problemi imprevisti che richiedono l'intervento manuale per mantenere l'attività in esecuzione.

Il processo principale della parallelizzazione di Azure Machine Learning consiste nel suddividere una singola attività seriale in mini-batch e inviare tali mini batch a più ambienti di calcolo per l'esecuzione in parallelo. I processi in parallelo riducono significativamente il tempo di esecuzione end-to-end e gestiscono automaticamente gli errori. È consigliabile usare il processo in parallelo di Azure Machine Learning per eseguire il training di vari modelli sui dati partizionati o accelerare le attività di inferenza batch su larga scala.

Ad esempio, in uno scenario in cui si esegue un modello di rilevamento di oggetti in un set di immagini di grandi dimensioni, i processi in parallelo di Azure Machine Learning consentono di distribuire facilmente le immagini per eseguire codice personalizzato in parallelo in un cluster di calcolo specifico. La parallelizzazione può ridurre significativamente i tempi. L'uso del processo in parallelo di Azure Machine Learning consente inoltre di semplificare e automatizzare il processo per renderlo più efficiente.

Prerequisiti

Creare ed eseguire una pipeline con un passaggio di processo in parallelo

Un processo in parallelo di Azure Machine Learning può essere usato solo come passaggio in un processo della pipeline.

Prepararsi per la parallelizzazione

Questo passaggio di processo in parallelo richiede preparazione. È necessario uno script di immissione che implementa le funzioni predefinite. È anche necessario impostare gli attributi nella definizione del processo in parallelo che:

  • Definire e associare i dati di input.
  • Impostare il metodo di divisione dati.
  • Configurare le risorse di calcolo.
  • Chiamare lo script di immissione.

Le sezioni seguenti descrivono come preparare il processo in parallelo.

Dichiarare gli input e l'impostazione di divisione dati

Un processo in parallelo richiede la suddivisione e l'elaborazione di un input principale in parallelo. Il formato dei dati di input principale può essere costituito da dati tabulari o da un elenco di file.

Formati di dati diversi hanno tipi di input, modalità di input e metodi di divisione dati differenti. Nella tabella seguente vengono descritte le opzioni disponibili:

Formato dati Tipo di input Modalità di input Metodo di divisione dati
Elenco di file mltable oppure uri_folder ro_mount oppure download In base a dimensione (numero di file) o partizione
Dati tabulari mltable direct In base a dimensione (dimensione fisica stimata) o partizione

Nota

Se si usa mltable tabulare come dati di input principali, è necessario:

  • Installare la libreria mltable nell'ambiente, come nella riga 9 di questo file conda.
  • Disporre di un file di specifica MLTable nel percorso specificato con la sezione transformations: - read_delimited: compilata. Per esempi, vedere Creare e gestire asset di dati.

È possibile dichiarare i dati di input principali con l'attributo input_data nel processo in parallelo YAML o Python e associare i dati con il input definito del processo parallelo usando ${{inputs.<input name>}}. Definire quindi l'attributo di divisione dati per l'input principale a seconda del metodo di divisione dati.

Metodo di divisione dati Attribute name Tipo di attributo Esempio di processo
Per dimensioni mini_batch_size string Stima batch Iris
In base a partizione partition_keys Elenco di stringhe Stima delle vendite di succo d'arancia

Configurare le risorse di calcolo per la parallelizzazione

Dopo aver definito l'attributo di divisione dati, configurare le risorse di calcolo per la parallelizzazione impostando gli attributi instance_count e max_concurrency_per_instance.

Attribute name Type Descrizione Default value
instance_count integer Numero di nodi da usare per il processo. 1
max_concurrency_per_instance integer Numero di processori in ogni nodo. Per un calcolo GPU: 1. Per un calcolo CPU: numero di core.

Questi attributi interagiscono con il cluster di calcolo specificato, come illustrato nel diagramma seguente:

Diagramma che mostra il funzionamento dei dati distribuiti nel processo in parallelo.

Chiamare lo script di immissione

Lo script di immissione è un singolo file Python che implementa le tre funzioni predefinite seguenti con codice personalizzato.

Nome della funzione Richiesto Descrzione Input Restituzione
Init() Y Preparazione comune prima di iniziare a eseguire mini-batch. Ad esempio, usare questa funzione per caricare il modello in un oggetto globale. -- --
Run(mini_batch) Y Implementa la logica di esecuzione principale per mini-batch. mini_batch è il dataframe Pandas se i dati di input sono tabulari o un elenco di percorsi di file se i dati di input sono una directory. Dataframe, elenco o tupla.
Shutdown() N Funzione facoltativa per eseguire operazioni di pulizia personalizzate prima di calcolare il pool. -- --

Importante

Per evitare eccezioni durante l'analisi di argomenti in funzioni Init() o Run(mini_batch), usare parse_known_args anziché parse_args. Vedere l'esempio iris_score per uno script di immissione con parser di argomenti.

Importante

La funzione Run(mini_batch) richiede la restituzione di un dataframe, un elenco o una tupla. Il processo in parallelo usa il conteggio di tali elementi restituiti per misurare gli elementi con esito positivo in tale mini-batch. Il numero di mini-batch deve essere uguale al conteggio dell'elenco restituito se tutti gli elementi sono stati elaborati.

Il processo in parallelo esegue le funzioni in ogni processore, come illustrato nel diagramma seguente.

Diagramma che mostra il funzionamento dello script di immissione nel processo in parallelo.

Vedere gli esempi di script di immissione seguenti:

Per chiamare lo script di immissione, impostare i due attributi seguenti nella definizione del processo in parallelo:

Attribute name Type Descrzione
code stringa Percorso locale della directory del codice sorgente da caricare e usare per il processo.
entry_script string File Python che contiene l'implementazione di funzioni parallele predefinite.

Esempio di passaggio del processo in parallelo

Il passaggio di processo in parallelo seguente dichiara il tipo di input, la modalità e il metodo di divisione dati, associa l'input, configura il calcolo e chiama lo script di immissione.

batch_prediction:
  type: parallel
  compute: azureml:cpu-cluster
  inputs:
    input_data: 
      type: mltable
      path: ./neural-iris-mltable
      mode: direct
    score_model: 
      type: uri_folder
      path: ./iris-model
      mode: download
  outputs:
    job_output_file:
      type: uri_file
      mode: rw_mount

  input_data: ${{inputs.input_data}}
  mini_batch_size: "10kb"
  resources:
      instance_count: 2
  max_concurrency_per_instance: 2

  logging_level: "DEBUG"
  mini_batch_error_threshold: 5
  retry_settings:
    max_retries: 2
    timeout: 60

  task:
    type: run_function
    code: "./script"
    entry_script: iris_prediction.py
    environment:
      name: "prs-env"
      version: 1
      image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
      conda_file: ./environment/environment_parallel.yml

Prendere in considerazione le impostazioni di automazione

Il processo in parallelo di Azure Machine Learning espone numerose impostazioni opzionali in grado di controllare automaticamente il processo senza intervento manuale. La tabella seguente descrive le singole impostazioni.

Chiave Type Descrizione Valori consentiti Default value Impostare nell'attributo o nell'argomento programma
mini_batch_error_threshold integer Numero di mini-batch non riusciti da ignorare in questo processo in parallelo. Se il numero di mini-batch non riusciti è superiore a questa soglia, il processo in parallelo è contrassegnato come non riuscito.

Il mini batch viene contrassegnato come non riuscito se:
- Il numero di elementi restituiti da run() è inferiore al numero di input di mini-batch.
- Le eccezioni vengono rilevate nel codice personalizzato run().
[-1, int.max] -1, ovvero ignorare tutti i mini-batch non riusciti Attributo mini_batch_error_threshold
mini_batch_max_retries integer Numero di tentativi quando il mini-batch ha esito negativo o si verifica il timeout. Se tutti i tentativi hanno esito negativo, il mini-batch viene contrassegnato come non riuscito in base al calcolo mini_batch_error_threshold. [0, int.max] 2 Attributo retry_settings.max_retries
mini_batch_timeout integer Timeout in secondi per l'esecuzione della funzione personalizzata run(). Se il tempo di esecuzione è superiore a questa soglia, il mini-batch verrà interrotto e contrassegnato come non riuscito per attivare nuovi tentativi. (0, 259200] 60 Attributo retry_settings.timeout
item_error_threshold integer Soglia di elementi non riusciti. Gli elementi non riusciti vengono conteggiati in base al divario numerico tra gli input e gli elementi restituiti da ogni mini batch. Se la somma degli elementi non riusciti è superiore a questa soglia, il processo in parallelo è contrassegnato come non riuscito. [-1, int.max] -1, ovvero ignorare tutti gli errori durante il processo in parallelo Argomento programma
--error_threshold
allowed_failed_percent integer Simile a mini_batch_error_threshold, ma usa la percentuale di mini batch non riusciti anziché il conteggio. [0, 100] 100 Argomento programma
--allowed_failed_percent
overhead_timeout integer Timeout in secondi per l'inizializzazione di ogni mini-batch. Ad esempio, caricare i dati del mini batch e passarli alla funzione run(). (0, 259200] 600 Argomento programma
--task_overhead_timeout
progress_update_timeout integer Timeout in secondi per il monitoraggio dello stato di avanzamento dell'esecuzione di mini-batch. Se non vengono ricevuti aggiornamenti dello stato entro questa impostazione di timeout, il processo in parallelo viene contrassegnato come non riuscito. (0, 259200] Calcolato dinamicamente da altre impostazioni Argomento programma
--progress_update_timeout
first_task_creation_timeout integer Timeout in secondi per il monitoraggio del tempo tra l'avvio del processo e l'esecuzione del primo mini-batch. (0, 259200] 600 Argomento programma
--first_task_creation_timeout
logging_level string Livello di log di cui eseguire il dump nei file di log utente. INFO, WARNING o DEBUG INFO Attributo logging_level
append_row_to string Aggregare tutti i valori restituiti da ogni esecuzione di mini-batch ed emetterli come output in questo file. Può fare riferimento a uno degli output del processo in parallelo usando l'espressione ${{outputs.<output_name>}} Attributo task.append_row_to
copy_logs_to_parent string Opzione booleana per specificare se copiare lo stato del processo, la panoramica e i log nel processo della pipeline padre. True oppure False False Argomento programma
--copy_logs_to_parent
resource_monitor_interval integer Intervallo di tempo in secondi per eseguire il dump dell'utilizzo delle risorse del nodo (ad esempio CPU o memoria) nella cartella di log nel percorso logs/sys/perf.

Nota: i log delle risorse di dump frequenti rallentano leggermente la velocità di esecuzione. Impostare questo valore su 0 per interrompere il dump dell'utilizzo delle risorse.
[0, int.max] 600 Argomento programma
--resource_monitor_interval

Il codice di esempio seguente aggiorna queste impostazioni:

batch_prediction:
  type: parallel
  compute: azureml:cpu-cluster
  inputs:
    input_data: 
      type: mltable
      path: ./neural-iris-mltable
      mode: direct
    score_model: 
      type: uri_folder
      path: ./iris-model
      mode: download
  outputs:
    job_output_file:
      type: uri_file
      mode: rw_mount

  input_data: ${{inputs.input_data}}
  mini_batch_size: "10kb"
  resources:
      instance_count: 2
  max_concurrency_per_instance: 2

  logging_level: "DEBUG"
  mini_batch_error_threshold: 5
  retry_settings:
    max_retries: 2
    timeout: 60

  task:
    type: run_function
    code: "./script"
    entry_script: iris_prediction.py
    environment:
      name: "prs-env"
      version: 1
      image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
      conda_file: ./environment/environment_parallel.yml
    program_arguments: >-
      --model ${{inputs.score_model}}
      --error_threshold 5
      --allowed_failed_percent 30
      --task_overhead_timeout 1200
      --progress_update_timeout 600
      --first_task_creation_timeout 600
      --copy_logs_to_parent True
      --resource_monitor_interva 20
    append_row_to: ${{outputs.job_output_file}}

Creare la pipeline con il passaggio del processo in parallelo

L'esempio seguente mostra il processo della pipeline completo con il passaggio di processo in parallelo inline:

$schema: https://azuremlschemas.azureedge.net/latest/pipelineJob.schema.json
type: pipeline

display_name: iris-batch-prediction-using-parallel
description: The hello world pipeline job with inline parallel job
tags:
  tag: tagvalue
  owner: sdkteam

settings:
  default_compute: azureml:cpu-cluster

jobs:
  batch_prediction:
    type: parallel
    compute: azureml:cpu-cluster
    inputs:
      input_data: 
        type: mltable
        path: ./neural-iris-mltable
        mode: direct
      score_model: 
        type: uri_folder
        path: ./iris-model
        mode: download
    outputs:
      job_output_file:
        type: uri_file
        mode: rw_mount

    input_data: ${{inputs.input_data}}
    mini_batch_size: "10kb"
    resources:
        instance_count: 2
    max_concurrency_per_instance: 2

    logging_level: "DEBUG"
    mini_batch_error_threshold: 5
    retry_settings:
      max_retries: 2
      timeout: 60

    task:
      type: run_function
      code: "./script"
      entry_script: iris_prediction.py
      environment:
        name: "prs-env"
        version: 1
        image: mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04
        conda_file: ./environment/environment_parallel.yml
      program_arguments: >-
        --model ${{inputs.score_model}}
        --error_threshold 5
        --allowed_failed_percent 30
        --task_overhead_timeout 1200
        --progress_update_timeout 600
        --first_task_creation_timeout 600
        --copy_logs_to_parent True
        --resource_monitor_interva 20
      append_row_to: ${{outputs.job_output_file}}

Inviare il processo della pipeline

Inviare il processo della pipeline con un passaggio parallelo usando il comando dell'interfaccia della riga di comandoaz ml job create:

az ml job create --file pipeline.yml

Controllare il passaggio parallelo nell'interfaccia utente di Studio

Dopo aver inviato un processo della pipeline, il widget dell'SDK o dell'interfaccia della riga di comando fornisce un collegamento URL Web al grafico della pipeline nell'interfaccia utente di Azure Machine Learning Studio.

Per visualizzare i risultati dei processi in parallelo, fare doppio clic sul passaggio parallelo nel grafico della pipeline, selezionare la scheda Impostazioni nel pannello dei dettagli, espandere Impostazioni esecuzione e successivamente la sezione Parallelo.

Screenshot di studio di Azure Machine Learning che mostra le impostazioni del processo in parallelo.

Per eseguire il debug di un errore di processo in parallelo, selezionare la scheda Output + log, espandere la cartella logs e controllare job_result.txt per stabilire il motivo per cui il processo in parallelo non è riuscito. Per informazioni sulla struttura di registrazione dei processi in parallelo, vedere readme.txt nella stessa cartella.

Screenshot di studio di Azure Machine Learning nella scheda dei processi che mostra i risultati del processo in parallelo.