Distribuire ed eseguire modelli MLflow in processi Spark
Questo articolo illustra come distribuire ed eseguire un modello MLflow in processi Spark per eseguire l'inferenza per grandi quantità di dati o nell’ambito di processi di data wrangling.
Informazioni su questo esempio
Questo esempio illustra come distribuire un modello MLflow registrato in Azure Machine Learning nei processi Spark in esecuzione in cluster Spark gestiti (anteprima), Azure Databricks o Azure Synapse Analytics per eseguire l'inferenza su grandi quantità di dati.
Il modello si basa sul set di dati UCI Heart Disease. Il database contiene 76 attributi ma viene usato un subset di 14 attributi. Il modello cerca di prevedere la presenza di una malattia cardiaca in un paziente. È un valore intero compreso tra 0 (assenza) e 1 (presenza). È stato eseguito il training usando un classificatore XGBBoost
e tutta la pre-elaborazione necessaria è stata inserita in un pacchetto come pipeline scikit-learn
, rendendo questo modello una pipeline end-to-end che va dai dati non elaborati alle previsioni.
Le informazioni contenute in questo articolo si basano sugli esempi di codice contenuti nel repository azureml-examples. Per eseguire i comandi in locale senza dover copiare/incollare file, è possibile clonare il repository e modificare le directory in sdk/using-mlflow/deploy
.
git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy
Prerequisiti
Prima di seguire la procedura descritta in questo articolo, assicurarsi di disporre dei prerequisiti seguenti:
Installare il pacchetto MLflow SDK
mlflow
e il plug-in Azure Machine Learningazureml-mlflow
per MLflow come indicato di seguito:pip install mlflow azureml-mlflow
Suggerimento
È possibile usare il pacchetto
mlflow-skinny
, che è un pacchetto di MLflow leggero senza risorse di archiviazione SQL, server, interfaccia utente o dipendenze di data science. Questo pacchetto è consigliato per gli utenti che necessitano principalmente delle funzionalità di rilevamento e registrazione di MLflow senza importare la suite completa di funzionalità, incluse le distribuzioni.Creare un'area di lavoro di Azure Machine Learning. Per creare un'area di lavoro, vedere Creare risorse necessarie per iniziare. Vedere quali sono le autorizzazioni di accesso necessarie per eseguire le operazioni MLflow nell'area di lavoro.
Per eseguire il rilevamento remoto o tenere traccia degli esperimenti in esecuzione all'esterno di Azure Machine Learning, configurare MLflow in modo che punti all'URI di rilevamento dell'area di lavoro di Azure Machine Learning. Per altre informazioni su come connettere MLflow all'area di lavoro, vedere Configurare MLflow per Azure Machine Learning.
- È necessario avere un modello MLflow registrato nella propria area di lavoro. In particolare, questo esempio registrerà un modello sottoposto a training per il set di dati Diabetes.
Connettersi all'area di lavoro
Connettersi innanzitutto all'area di lavoro di Azure Machine Learning in cui è registrato il modello.
Il rilevamento è già configurato automaticamente. Le credenziali predefinite verranno usate anche quando si usa MLflow.
Registrazione del modello
Per eseguire l'inferenza è necessario un modello registrato nel registro di Azure Machine Learning. In questo caso è già presente una copia locale del modello nel repository, per cui basta pubblicare il modello nel registro dell'area di lavoro. È possibile ignorare questo passaggio se il modello di cui si tenta la distribuzione è già registrato.
model_name = 'heart-classifier'
model_local_path = "model"
registered_model = mlflow_client.create_model_version(
name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version
Se il modello è stato registrato all'interno di un'esecuzione, invece, è possibile registrarlo direttamente.
Suggerimento
Per registrare il modello è necessario conoscere la posizione in cui è stato archiviato. Se si usa la funzionalità autolog
di MLflow, il percorso dipende dal tipo e dal framework del modello in uso. È consigliabile controllare l'output dei processi per identificare il nome di questa cartella. È possibile cercare la cartella contenente un file denominato MLModel
. Se i modelli vengono registrati manualmente usando log_model
, il percorso è l'argomento passato a tale metodo. Ad esempio, se si registra il modello usando mlflow.sklearn.log_model(my_model, "classifier")
, il percorso in cui viene archiviato il modello è classifier
.
model_name = 'heart-classifier'
registered_model = mlflow_client.create_model_version(
name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version
Nota
Il percorso MODEL_PATH
è la posizione in cui il modello è stato archiviato nell'esecuzione.
Ottenere i dati di input a cui assegnare il punteggio
Sono necessari alcuni dati di input per l'esecuzione o i processi. In questo esempio verranno scaricati dati di esempio da Internet e verranno inseriti in una risorsa di archiviazione condivisa usata dal cluster Spark.
import urllib
urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")
Spostare i dati in un account di archiviazione montato disponibile per l'intero cluster.
dbutils.fs.mv("file:/tmp/data", "dbfs:/")
Importante
Il codice precedente usa dbutils
, uno strumento disponibile nel cluster Azure Databricks. Usare lo strumento appropriato a seconda della piattaforma in uso.
I dati di input, quindi, vengono collocati nella cartella seguente:
input_data_path = "dbfs:/data"
Eseguire il modello in cluster Spark
La sezione seguente illustra come eseguire modelli MLflow registrati in Azure Machine Learning in processi Spark.
Accertarsi che nel cluster siano installate le librerie seguenti:
- mlflow<3,>=2.1 - cloudpickle==2.2.0 - scikit-learn==1.2.0 - xgboost==1.7.2
Verrà usato un notebook per illustrare come creare una routine di assegnazione dei punteggi con un modello MLflow registrato in Azure Machine Learning. Creare un notebook e usare PySpark come linguaggio predefinito.
Importare gli spazi dei nomi necessari:
import mlflow import pyspark.sql.functions as f
Configurare l’URI del modello. L'URI seguente conduce a un modello denominato
heart-classifier
nella versione più recente.model_uri = "models:/heart-classifier/latest"
Caricare il modello come funzione UDF. Una funzione UDF (User-Defined Function) è una funzione definita da un utente che consente di riutilizzare la logica personalizzata nell'ambiente dell’utente.
predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double')
Suggerimento
Usare l'argomento
result_type
per controllare il tipo restituito dalla funzionepredict()
.Leggere i dati a cui assegnare il punteggio:
df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
In questo caso, i dati di input sono in formato
CSV
e sono collocati nella cartelladbfs:/data/
. Si elimina anche la colonnatarget
perché questo set di dati contiene la variabile di destinazione da stimare. Negli scenari di produzione i dati non avranno questa colonna.Eseguire la funzione
predict_function
e collocare le stime in una nuova colonna. In questo caso, le stime vengono collocate nella colonnapredictions
.df.withColumn("predictions", score_function(*df.columns))
Suggerimento
predict_function
riceve come argomenti le colonne necessarie. In questo caso, tutte le colonne del frame di dati sono previste dal modello, per cui si usadf.columns
. Se il modello richiede un subset delle colonne, è possibile introdurle manualmente. Se il modello ha una firma, i tipi devono essere compatibili tra gli input e i tipi previsti.È possibile riscrivere le stime nella risorsa di archiviazione:
scored_data_path = "dbfs:/scored-data" scored_data.to_csv(scored_data_path)
Eseguire il modello in un processo Spark autonomo in Azure Machine Learning
Azure Machine Learning supporta la creazione di un processo Spark autonomo e la creazione di un componente Spark riutilizzabile che può essere usato in pipeline di Azure Machine Learning. In questo esempio verrà distribuito un processo di assegnazione dei punteggi eseguito nel processo Spark autonomo di Azure Machine Learning che esegue un modello MLflow per eseguire l'inferenza.
Nota
Per altre informazioni su processi Spark in Azure Machine Learning, vedere Inviare processi Spark in Azure Machine Learning (anteprima).
Un processo Spark richiede uno script Python che acquisisce argomenti. Creare uno script di assegnazione dei punteggi:
score.py
import argparse parser = argparse.ArgumentParser() parser.add_argument("--model") parser.add_argument("--input_data") parser.add_argument("--scored_data") args = parser.parse_args() print(args.model) print(args.input_data) # Load the model as an UDF function predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda") # Read the data you want to score df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target") # Run the function `predict_function` and place the predictions on a new column scored_data = df.withColumn("predictions", score_function(*df.columns)) # Save the predictions scored_data.to_csv(args.scored_data)
Lo script precedente acquisisce tre argomenti:
--model
,--input_data
e--scored_data
. I primi due sono input e rappresentano il modello da eseguire e i dati di input, l'ultimo è un output ed è la cartella di output in cui verranno collocate le stime.Suggerimento
Installazione di pacchetti Python: lo script di assegnazione dei punteggi precedente carica il modello MLflow in una funzione UDF, ma indica il parametro
env_manager="conda"
. Quando questo parametro è impostato, MLflow ripristinerà i pacchetti necessari come specificato nella definizione del modello in un ambiente isolato in cui viene eseguita solo la funzione UDF. Per altri dettagli, consultare la documentazione dimlflow.pyfunc.spark_udf
.Creare una definizione di processo:
mlflow-score-spark-job.yml
$schema: http://azureml/sdk-2-0/SparkJob.json type: spark code: ./src entry: file: score.py conf: spark.driver.cores: 1 spark.driver.memory: 2g spark.executor.cores: 2 spark.executor.memory: 2g spark.executor.instances: 2 inputs: model: type: mlflow_model path: azureml:heart-classifier@latest input_data: type: uri_file path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv mode: direct outputs: scored_data: type: uri_folder args: >- --model ${{inputs.model}} --input_data ${{inputs.input_data}} --scored_data ${{outputs.scored_data}} identity: type: user_identity resources: instance_type: standard_e4s_v3 runtime_version: "3.2"
Suggerimento
Per usare un pool di Spark Synapse collegato, definire la proprietà
compute
nel file di specifica YAML di esempio mostrato sopra anziché la proprietàresources
.Il file YAML mostrato sopra può essere usato nel comando
az ml job create
con il parametro--file
per creare un processo Spark autonomo, come illustrato di seguito:az ml job create -f mlflow-score-spark-job.yml