Implantar e executar modelos MLflow em trabalhos do Spark

Neste artigo, saiba como implantar e executar seu modelo MLflow em trabalhos do Spark para executar inferência sobre grandes quantidades de dados ou como parte de trabalhos de disputa de dados.

Sobre este exemplo

Este exemplo mostra como você pode implantar um modelo MLflow registrado no Azure Machine Learning para trabalhos do Spark em execução em clusters gerenciados do Spark (visualização), Azure Databricks ou Azure Synapse Analytics, para executar inferência sobre grandes quantidades de dados.

O modelo é baseado no UCI Heart Disease Data set. O banco de dados contém 76 atributos, mas estamos usando um subconjunto de 14 deles. O modelo tenta prever a presença de doença cardíaca em um paciente. É inteiro valorizado de 0 (sem presença) a 1 (presença). Ele foi treinado usando um XGBBoost classificador e todo o pré-processamento necessário foi empacotado como um scikit-learn pipeline, tornando este modelo um pipeline de ponta a ponta que vai de dados brutos a previsões.

As informações neste artigo são baseadas em exemplos de código contidos no repositório azureml-examples . Para executar os comandos localmente sem ter que copiar/colar arquivos, clone o repositório e, em seguida, altere os diretórios para sdk/using-mlflow/deploy.

git clone https://github.com/Azure/azureml-examples --depth 1
cd sdk/python/using-mlflow/deploy

Pré-requisitos

Antes de seguir as etapas neste artigo, verifique se você tem os seguintes pré-requisitos:

  • Instale o pacote MLflow SDK mlflow e o plug-in do Azure Machine Learning azureml-mlflow para MLflow da seguinte maneira:

    pip install mlflow azureml-mlflow
    

    Gorjeta

    Você pode usar o mlflow-skinny pacote, que é um pacote MLflow leve sem dependências de armazenamento SQL, servidor, interface do usuário ou ciência de dados. Este pacote é recomendado para usuários que precisam principalmente dos recursos de rastreamento e registro em log do MLflow sem importar o conjunto completo de recursos, incluindo implantações.

  • Crie um espaço de trabalho do Azure Machine Learning. Para criar um espaço de trabalho, consulte Criar recursos necessários para começar. Revise as permissões de acesso necessárias para executar suas operações MLflow em seu espaço de trabalho.

  • Para fazer o acompanhamento remoto ou acompanhar experiências em execução fora do Azure Machine Learning, configure o MLflow para apontar para o URI de acompanhamento da sua área de trabalho do Azure Machine Learning. Para obter mais informações sobre como conectar o MLflow ao seu espaço de trabalho, consulte Configurar o MLflow para o Azure Machine Learning.

  • Você deve ter um modelo MLflow registrado em seu espaço de trabalho. Particularmente, este exemplo registrará um modelo treinado para o conjunto de dados Diabetes.

Ligar à sua área de trabalho

Primeiro, vamos nos conectar ao espaço de trabalho do Azure Machine Learning onde seu modelo está registrado.

O rastreamento já está configurado para você. Suas credenciais padrão também serão usadas ao trabalhar com MLflow.

Registo do modelo

Precisamos de um modelo registrado no registro do Azure Machine Learning para executar a inferência. Nesse caso, já temos uma cópia local do modelo no repositório, então só precisamos publicar o modelo no registro no espaço de trabalho. Você pode pular esta etapa se o modelo que você está tentando implantar já estiver registrado.

model_name = 'heart-classifier'
model_local_path = "model"

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"file://{model_local_path}"
)
version = registered_model.version

Como alternativa, se o seu modelo foi registrado dentro de uma corrida, você pode registrá-lo diretamente.

Gorjeta

Para registrar o modelo, você precisará saber o local onde o modelo foi armazenado. Se você estiver usando autolog o recurso MLflow, o caminho dependerá do tipo e da estrutura do modelo que está sendo usado. Recomendamos verificar a saída de trabalhos para identificar qual é o nome dessa pasta. Você pode procurar a pasta que contém um arquivo chamado MLModel. Se você estiver registrando seus modelos manualmente usando log_model, então o caminho é o argumento que você passa para esse método. Por exemplo, se você registrar o modelo usando mlflow.sklearn.log_model(my_model, "classifier"), o caminho onde o modelo está armazenado será classifier.

model_name = 'heart-classifier'

registered_model = mlflow_client.create_model_version(
    name=model_name, source=f"runs://{RUN_ID}/{MODEL_PATH}"
)
version = registered_model.version

Nota

O caminho MODEL_PATH é o local onde o modelo foi armazenado na execução.


Obter dados de entrada para pontuar

Precisaremos de alguns dados de entrada para executar ou trabalhar. Neste exemplo, baixaremos dados de exemplo da Internet e os colocaremos em um armazenamento compartilhado usado pelo cluster do Spark.

import urllib

urllib.request.urlretrieve("https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv", "/tmp/data")

Mova os dados para uma conta de armazenamento montada disponível para todo o cluster.

dbutils.fs.mv("file:/tmp/data", "dbfs:/")

Importante

O código anterior usa dbutils, que é uma ferramenta disponível no cluster do Azure Databricks. Use a ferramenta apropriada dependendo da plataforma que você está usando.

Os dados de entrada são então colocados na seguinte pasta:

input_data_path = "dbfs:/data"

Executar o modelo em clusters do Spark

A seção a seguir explica como executar modelos MLflow registrados no Azure Machine Learning em trabalhos do Spark.

  1. Verifique se as seguintes bibliotecas estão instaladas no cluster:

    - mlflow<3,>=2.1
    - cloudpickle==2.2.0
    - scikit-learn==1.2.0
    - xgboost==1.7.2
    
  2. Usaremos um bloco de anotações para demonstrar como criar uma rotina de pontuação com um modelo MLflow registrado no Azure Machine Learning. Crie um bloco de anotações e use o PySpark como idioma padrão.

  3. Importe os namespaces necessários:

    import mlflow
    import pyspark.sql.functions as f
    
  4. Configure o URI do modelo. O URI a seguir traz um modelo nomeado heart-classifier em sua versão mais recente.

    model_uri = "models:/heart-classifier/latest"
    
  5. Carregue o modelo como uma função UDF. Uma função definida pelo usuário (UDF) é uma função definida por um usuário, permitindo que a lógica personalizada seja reutilizada no ambiente do usuário.

    predict_function = mlflow.pyfunc.spark_udf(spark, model_uri, result_type='double') 
    

    Gorjeta

    Use o argumento result_type para controlar o tipo retornado pela predict() função.

  6. Leia os dados que deseja pontuar:

    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data_path).drop("target")
    

    No nosso caso, os dados de entrada estão no CSV formato e colocados na pasta dbfs:/data/. Também estamos descartando a coluna target , pois esse conjunto de dados contém a variável de destino a ser prevista. Em cenários de produção, seus dados não terão essa coluna.

  7. Execute a função predict_function e coloque as previsões em uma nova coluna. Neste caso, estamos colocando as previsões na coluna predictions.

    df.withColumn("predictions", score_function(*df.columns))
    

    Gorjeta

    O predict_function recebe como argumentos as colunas necessárias. No nosso caso, todas as colunas do quadro de dados são esperadas pelo modelo e, portanto, df.columns são usadas. Se o seu modelo requer um subconjunto das colunas, você pode introduzi-las manualmente. Se o modelo tiver uma assinatura, os tipos precisarão ser compatíveis entre entradas e tipos esperados.

  8. Você pode escrever suas previsões de volta ao armazenamento:

    scored_data_path = "dbfs:/scored-data"
    scored_data.to_csv(scored_data_path)
    

Executar o modelo em um trabalho autônomo do Spark no Azure Machine Learning

O Azure Machine Learning dá suporte à criação de um trabalho autônomo do Spark e à criação de um componente do Spark reutilizável que pode ser usado nos pipelines do Azure Machine Learning. Neste exemplo, implantaremos um trabalho de pontuação que é executado no trabalho autônomo do Spark do Azure Machine Learning e executa um modelo MLflow para executar inferência.

Nota

Para saber mais sobre trabalhos do Spark no Azure Machine Learning, consulte Enviar trabalhos do Spark no Azure Machine Learning (visualização).

  1. Um trabalho do Spark requer um script Python que usa argumentos. Crie um script de pontuação:

    score.py

    import argparse
    
    parser = argparse.ArgumentParser()
    parser.add_argument("--model")
    parser.add_argument("--input_data")
    parser.add_argument("--scored_data")
    
    args = parser.parse_args()
    print(args.model)
    print(args.input_data)
    
    # Load the model as an UDF function
    predict_function = mlflow.pyfunc.spark_udf(spark, args.model, env_manager="conda")
    
    # Read the data you want to score
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(input_data).drop("target")
    
    # Run the function `predict_function` and place the predictions on a new column
    scored_data = df.withColumn("predictions", score_function(*df.columns))
    
    # Save the predictions
    scored_data.to_csv(args.scored_data)
    

    O script acima usa três argumentos --model--input_data e --scored_data. Os dois primeiros são entradas e representam o modelo que queremos executar e os dados de entrada, o último é uma saída e é a pasta de saída onde as previsões serão colocadas.

    Gorjeta

    Instalação de pacotes Python: O script de pontuação anterior carrega o modelo MLflow em uma função UDF, mas indica o parâmetro env_manager="conda". Quando esse parâmetro é definido, o MLflow restaurará os pacotes necessários conforme especificado na definição do modelo em um ambiente isolado onde apenas a função UDF é executada. Para obter mais detalhes, consulte a mlflow.pyfunc.spark_udf documentação.

  2. Crie uma definição de trabalho:

    mlflow-score-spark-job.yml

    $schema: http://azureml/sdk-2-0/SparkJob.json
    type: spark
    
    code: ./src
    entry:
      file: score.py
    
    conf:
      spark.driver.cores: 1
      spark.driver.memory: 2g
      spark.executor.cores: 2
      spark.executor.memory: 2g
      spark.executor.instances: 2
    
    inputs:
      model:
        type: mlflow_model
        path: azureml:heart-classifier@latest
      input_data:
        type: uri_file
        path: https://azuremlexampledata.blob.core.windows.net/data/heart-disease-uci/data/heart.csv
        mode: direct
    
    outputs:
      scored_data:
        type: uri_folder
    
    args: >-
      --model ${{inputs.model}}
      --input_data ${{inputs.input_data}}
      --scored_data ${{outputs.scored_data}}
    
    identity:
      type: user_identity
    
    resources:
      instance_type: standard_e4s_v3
      runtime_version: "3.2"
    

    Gorjeta

    Para usar um pool Synapse Spark anexado, defina compute a propriedade no arquivo de especificação YAML de exemplo mostrado acima em vez da resources propriedade.

  3. Os arquivos YAML mostrados az ml job create acima podem ser usados no comando, com o --file parâmetro, para criar um trabalho Spark autônomo, conforme mostrado:

    az ml job create -f mlflow-score-spark-job.yml
    

Próximos passos