Como solucionar problemas do ParallelRunStep

APLICA-SE A: SDK do Python azureml v1

Neste artigo, você aprenderá a solucionar problemas ao obter erros, usando a classe ParallelRunStep do SDK do Azure Machine Learning.

Para obter dicas gerais sobre como solucionar problemas de um pipeline, confira Como solucionar problemas de pipelines do machine learning.

Testar scripts localmente

O ParallelRunStep é executado como uma etapa em pipelines de ML. Talvez você queira testar os scripts localmente como uma primeira etapa.

Requisitos de script de entrada

O script de entrada para um ParallelRunStepdeve conterrun() uma função e, opcionalmente, contém uma init() função:

  • init(): use essa função para qualquer preparação dispendiosa ou comum para processamento posterior. Por exemplo, use-a para carregar o modelo em um objeto global. Essa função será chamada apenas uma vez no início do processo.

    Observação

    Se o método init criar um diretório de saída, especifique que parents=True e exist_ok=True. O método init é chamado de cada processo de trabalho em cada nó no qual o trabalho está em execução.

  • run(mini_batch): A função será executada para cada instância de mini_batch.
    • mini_batch: ParallelRunStep invocará o método run e transmitirá uma lista ou o DataFrame Pandas como um argumento para o método. Cada entrada em min_batch será um caminho de arquivo se a entrada for um FileDataset ou um DataFrame Pandas se a entrada for um TabularDataset.
    • response: o método run() deve retornar um DataFrame Pandas ou uma matriz. Para append_row output_action, esses elementos retornados são acrescentados ao arquivo de saída comum. Para summary_only, o conteúdo dos elementos é ignorado. Para todas as ações de saída, cada elemento de saída retornado indica uma execução bem-sucedida do elemento de entrada no minilote de entrada. Verifique se dados suficientes foram incluídos no resultado da execução para mapear a entrada para o resultado da saída da execução. A saída de execução será gravada no arquivo de saída e não haverá garantia de que esteja em ordem; você deverá usar uma chave na saída para mapeá-la para a entrada.

      Observação

      Um elemento de saída é esperado para um elemento de entrada.

%%writefile digit_identification.py
# Snippets from a sample script.
# Refer to the accompanying digit_identification.py
# (https://github.com/Azure/MachineLearningNotebooks/tree/master/how-to-use-azureml/machine-learning-pipelines/parallel-run)
# for the implementation script.

import os
import numpy as np
import tensorflow as tf
from PIL import Image
from azureml.core import Model


def init():
    global g_tf_sess

    # Pull down the model from the workspace
    model_path = Model.get_model_path("mnist")

    # Construct a graph to execute
    tf.reset_default_graph()
    saver = tf.train.import_meta_graph(os.path.join(model_path, 'mnist-tf.model.meta'))
    g_tf_sess = tf.Session()
    saver.restore(g_tf_sess, os.path.join(model_path, 'mnist-tf.model'))


def run(mini_batch):
    print(f'run method start: {__file__}, run({mini_batch})')
    resultList = []
    in_tensor = g_tf_sess.graph.get_tensor_by_name("network/X:0")
    output = g_tf_sess.graph.get_tensor_by_name("network/output/MatMul:0")

    for image in mini_batch:
        # Prepare each image
        data = Image.open(image)
        np_im = np.array(data).reshape((1, 784))
        # Perform inference
        inference_result = output.eval(feed_dict={in_tensor: np_im}, session=g_tf_sess)
        # Find the best probability, and add it to the result list
        best_result = np.argmax(inference_result)
        resultList.append("{}: {}".format(os.path.basename(image), best_result))

    return resultList

Se você tiver outro arquivo ou pasta no mesmo diretório que o script de inferência, poderá consultá-lo localizando o diretório de trabalho atual. Caso deseje importar os pacotes, acrescente a pasta do pacote ao sys.path.

script_dir = os.path.realpath(os.path.join(__file__, '..',))
file_path = os.path.join(script_dir, "<file_name>")

packages_dir = os.path.join(file_path, '<your_package_folder>')
if packages_dir not in sys.path:
    sys.path.append(packages_dir)
from <your_package> import <your_class>

Parâmetros para ParallelRunConfig

ParallelRunConfig é a principal configuração para a instância ParallelRunStep no pipeline do Azure Machine Learning. Use-a para encapsular o script e configurar os parâmetros necessários, incluindo todas as seguintes entradas:

  • entry_script: Um script de usuário como um caminho de arquivo local que será executado em paralelo em vários nós. Se source_directory estiver presente, use um caminho relativo. Caso contrário, use qualquer caminho que seja acessível no computador.

  • mini_batch_size: O tamanho do minilote passado para uma única chamada de run(). (opcional; o valor padrão são arquivos 10 para FileDataset e 1MB para TabularDataset.)

    • Para FileDataset, é o número de arquivos com um valor mínimo de 1. Você pode combinar vários arquivos em um minilote.
    • Para TabularDataset, é o tamanho dos dados. Os valores de exemplo são 1024, 1024KB, 10MB e 1GB. O valor recomendado é 1MB. O minilote de TabularDataset nunca ultrapassará os limites do arquivo. Por exemplo, se você tiver arquivos .csv com vários tamanhos, o menor arquivo será de 100 KB, e o maior será de 10 MB. Se você definir mini_batch_size = 1MB, os arquivos com um tamanho menor que 1 MB serão tratados como um minilote. Arquivos com um tamanho maior que 1 MB serão divididos em vários minilotes.

      Observação

      TabularDatasets com o respaldo de SQL não pode ser particionado. TabularDatasets de um arquivo parquet e um grupo de linhas único não podem ser particionados.

  • error_threshold: O número de falhas de registro para TabularDataset e falhas de arquivo para FileDataset que devem ser ignorados durante o processamento. Se a contagem de erros de toda a entrada ficar acima desse valor, o trabalho será anulado. O limite de erro é para toda a entrada, não para um minilote individual enviado ao método run(). O intervalo é [-1, int.max]. A parte -1 indica que é para ignorar todas as falhas durante o processamento.

  • output_action: Um dos seguintes valores indica como a saída será organizada:

    • summary_only: O script de usuário armazenará a saída. ParallelRunStep usará a saída somente para o cálculo do limite de erro.
    • append_row: para todas as entradas, somente um arquivo será criado na pasta de saída para acrescentar todas as saídas separadas por linha.
  • append_row_file_name: para personalizar o nome do arquivo de saída para append_row output_action (opcional; o valor padrão é parallel_run_step.txt).

  • source_directory: Caminhos para pastas que contêm todos os arquivos a serem executados no destino de computação (opcional).

  • compute_target: Apenas AmlCompute tem suporte.

  • node_count: O número de nós de computação a serem usados para executar o script do usuário.

  • process_count_per_node: O número de processos de trabalho por nó para executar o script de entrada em paralelo. Para um computador de GPU, o valor padrão é 1. Para um computador de CPU, o valor padrão é o número de núcleos por nó. Um processo de trabalho chamará run() repetidamente passando o mini lote que ele obtém. O número total de processos de trabalho em seu trabalho é process_count_per_node * node_count, o que decide o número máximo de run() a ser executado em paralelo.

  • environment: A definição de ambiente Python. Você pode configurá-lo para usar um ambiente Python existente ou para configurar um ambiente temporário. A definição também é responsável por configurar as dependências de aplicativo necessárias (opcional).

  • logging_level: Detalhamento do log. Os valores no detalhamento crescente são: WARNING, INFO e DEBUG. (opcional; o valor padrão é INFO)

  • run_invocation_timeout: O tempo limite de invocação do método run() em segundos. (opcional; o valor padrão é 60)

  • run_max_try: contagem máxima de tentativas de run() para um minilote. Um run() falhará se uma exceção for gerada ou nada será retornado quando run_invocation_timeout for atingido (opcional; o valor padrão é 3).

Especifique mini_batch_size, node_count, process_count_per_node, logging_level, run_invocation_timeout e run_max_try como PipelineParameter, de modo que, ao reenviar uma execução de pipeline, você possa ajustar os valores de parâmetro. Neste exemplo, você usa PipelineParameter para mini_batch_size e Process_count_per_node e alterará esses valores ao reenviar outra execução.

Visibilidade de dispositivos CUDA

Para destinos de computação equipados com GPUs, a variável de ambiente CUDA_VISIBLE_DEVICES será definida em processos de trabalho. No AmlCompute, você pode encontrar o número total de dispositivos GPU na variável de ambiente AZ_BATCHAI_GPU_COUNT_FOUND, que é definido automaticamente. Se você quiser que cada processo de trabalho tenha uma GPU dedicada, defina process_count_per_node igual ao número de dispositivos GPU em um computador. Cada processo de trabalho atribuirá um índice exclusivo a CUDA_VISIBLE_DEVICES. Se um processo de trabalho parar por algum motivo, o próximo processo de trabalho iniciado usará o índice de GPU liberado.

Se o número total de dispositivos GPU for menor que process_count_per_node, os processos de trabalho serão atribuídos ao índice de GPU até que todos tenham sido usados.

Considerando que o total de dispositivos GPU é 2 e process_count_per_node = 4, por exemplo, o processo 0 e o processo 1 terão índices 0 e 1. Os processos 2 e 3 não terão uma variável de ambiente. Para uma biblioteca que usa essa variável de ambiente para atribuição de GPU, os processos 2 e 3 não terão GPUs e não tentarão adquirir dispositivos GPU. Se o processo 0 parar, ele lançará o índice de GPU 0. O próximo processo, que é o processo 4, terá o índice de GPU 0 atribuído.

Para obter mais informações, confira Dica de profissional do CUDA: controlar a visibilidade da GPU com CUDA_VISIBLE_DEVICES.

Parâmetros para a criação do ParallelRunStep

Crie o ParallelRunStep usando o script, a configuração do ambiente e os parâmetros. Especifique o destino de computação que você já anexou ao seu workspace como o destino de execução do seu script de inferência. Use ParallelRunStep para criar a etapa do pipeline de inferência de lote, que usa todos os seguintes parâmetros:

  • name: O nome da etapa, com as seguintes restrições de nomenclatura: unique, 3-32 characters e regex ^[a-z]([-a-z0-9]*[a-z0-9])?$.
  • parallel_run_config: Um objeto ParallelRunConfig, conforme definido anteriormente.
  • inputs: um ou mais conjuntos de dados do Azure Machine Learning de tipo único a serem particionados para processamento paralelo.
  • side_inputs: um ou mais dados de referência ou conjuntos de dados usados como entradas laterais sem a necessidade de partição.
  • output: um objeto OutputFileDatasetConfig que representa o caminho do diretório no qual os dados de saída serão armazenados.
  • arguments: uma lista de argumentos passados para o script do usuário. Use unknown_args para recuperá-los em seu script de entrada (opcional).
  • allow_reuse: Se a etapa deve reutilizar os resultados anteriores quando executada com as mesmas configurações/entradas. Se esse parâmetro for False, uma nova execução sempre será gerada para essa etapa durante a execução do pipeline. (opcional; o valor padrão é True.)
from azureml.pipeline.steps import ParallelRunStep

parallelrun_step = ParallelRunStep(
    name="predict-digits-mnist",
    parallel_run_config=parallel_run_config,
    inputs=[input_mnist_ds_consumption],
    output=output_dir,
    allow_reuse=True
)

Depurar scripts do contexto remoto

A transição da depuração de um script de pontuação localmente para depurar um script de pontuação em um pipeline real pode ser um passo difícil. Para obter informações sobre como localizar os logs no portal, confira a seção pipelines do machine learning em scripts de depuração com base em um contexto remoto. As informações contidas nessa seção também se aplicam a um ParallelRunStep.

Por exemplo, o arquivo de log 70_driver_log.txt contém informações do controlador que inicia o código ParallelRunStep.

Devido à natureza distribuída dos trabalhos do ParallelRunStep, há logs de várias fontes diferentes. No entanto, são criados dois arquivos consolidados que fornecem informações de alto nível:

  • ~/logs/job_progress_overview.txt: Esse arquivo fornece informações de alto nível sobre o número de mini-lotes (também conhecidos como tarefas) criados até o momento e o número de mini-lotes processados até o momento. Nesse final, ele mostra o resultado do trabalho. Se o trabalho tiver falhado, ele mostrará a mensagem de erro e onde iniciar a solução de problemas.

  • ~/logs/sys/master_role.txt: esse arquivo fornece a exibição do nó principal (também conhecido como orquestrador) do trabalho em execução. Inclui a criação de tarefas, o monitoramento de progresso, o resultado da execução.

Os logs gerados do script de entrada usando o auxiliar EntryScript e as instruções PRINT serão encontrados nos seguintes arquivos:

  • ~/logs/user/entry_script_log/<node_id>/<process_name>.log.txt: esses arquivos são os logs gravados de entry_script usando o auxiliar EntryScript.

  • ~/logs/user/stdout/<node_id>/<process_name>.stdout.txt: esses arquivos são os logs de stdout (por exemplo, a instrução PRINT) de entry_script.

  • ~/logs/user/stderr/<node_id>/<process_name>.stderr.txt: esses arquivos são os logs de stderr de entry_script.

Para uma compreensão concisa dos erros em seu script, há:

  • ~/logs/user/error.txt: Este arquivo tentará resumir os erros em seu script.

Para obter mais informações sobre erros no seu script, há:

  • ~/logs/user/error/: contém rastreamentos de pilha completos de exceções geradas durante o carregamento e a execução do script de entrada.

Quando você precisar de um entendimento completo de como cada nó executou o script de pontuação, examine os logs de processo individuais para cada nó. Os logs de processo podem ser encontrados na pasta sys/node, agrupados por nós de trabalho:

  • ~/logs/sys/node/<node_id>/<process_name>.txt: esse arquivo fornece informações detalhadas sobre cada minilote à medida que ele é selecionado ou concluído por um trabalho. Para cada mini-lote, esse arquivo inclui:

    • O endereço IP e o PID do processo de trabalho.
    • O número total de itens, contagem de itens processados com êxito e contagem de itens com falha.
    • A hora de início, a duração, o tempo de processamento e o tempo do método de execução.

Você também pode ver os resultados de verificações periódicas do uso de recursos para cada nó. Os arquivos de log e de instalação estão nesta pasta:

  • ~/logs/perf: defina --resource_monitor_interval para alterar o intervalo de verificação em segundos. O intervalo padrão é 600, que é de aproximadamente 10 minutos. Para interromper o monitoramento, defina o valor como 0. Cada pasta <node_id> inclui:

    • os/: informações sobre todos os processos em execução no nó. Uma verificação executa um comando do sistema operacional e salva o resultado em um arquivo. No Linux, o comando é ps. No Windows, use tasklist.
      • %Y%m%d%H: o nome da subpasta é o horário.
        • processes_%M: o arquivo termina com o minuto do horário de verificação.
    • node_disk_usage.csv: uso detalhado do disco do nó.
    • node_resource_usage.csv: visão geral do uso de recursos do nó.
    • processes_resource_usage.csv: visão geral do uso de recursos de cada processo.

Como fazer log do meu script de usuário a partir de um contexto remoto?

O ParallelRunStep pode executar vários processos em um nó com base em process_count_per_node. Para organizar os logs de cada processo no nó e combinar a instrução PRINT e LOG, recomendamos o uso do agente do ParallelRunStep, conforme mostrado abaixo. Você obtém um agente do EntryScript e faz com que os logs apareçam na pasta logs/usuário no portal.

Um script de entrada de exemplo usando o agente:

from azureml_user.parallel_run import EntryScript

def init():
    """Init once in a worker process."""
    entry_script = EntryScript()
    logger = entry_script.logger
    logger.info("This will show up in files under logs/user on the Azure portal.")


def run(mini_batch):
    """Call once for a mini batch. Accept and return the list back."""
    # This class is in singleton pattern and will return same instance as the one in init()
    entry_script = EntryScript()
    logger = entry_script.logger
    logger.info(f"{__file__}: {mini_batch}.")
    ...

    return mini_batch

Para onde a mensagem do python logging é enviada?

O ParallelRunStep define um manipulador no agente raiz, que recebe a mensagem como logs/user/stdout/<node_id>/processNNN.stdout.txt .

logging padrão para o nível INFO. Por padrão, os níveis abaixo de INFO não aparecerão, como DEBUG.

Como posso gravar em um arquivo para aparecer no portal?

Os arquivos logs na pasta serão carregados e aparecerão no portal. Você pode obter a pasta logs/user/entry_script_log/<node_id> como abaixo e compor o caminho do arquivo para gravar:

from pathlib import Path
from azureml_user.parallel_run import EntryScript

def init():
    """Init once in a worker process."""
    entry_script = EntryScript()
    log_dir = entry_script.log_dir
    log_dir = Path(entry_script.log_dir)  # logs/user/entry_script_log/<node_id>/.
    log_dir.mkdir(parents=True, exist_ok=True) # Create the folder if not existing.

    proc_name = entry_script.agent_name  # The process name in pattern "processNNN".
    fil_path = log_dir / f"{proc_name}_<file_name>" # Avoid conflicting among worker processes with proc_name.

Como lidar com o log em novos processos?

Você pode gerar novos processos no script de entrada com o módulo, conectar-se aos pipes de subprocess entrada/saída/erro e obter seus códigos de retorno.

A abordagem recomendada é usar a função run() com capture_output=True . Os erros aparecerão em logs/user/error/<node_id>/<process_name>.txt.

Se você quiser usar Popen(), deverá redirecionar stdout/stderr para arquivos, como:

from pathlib import Path
from subprocess import Popen

from azureml_user.parallel_run import EntryScript


def init():
    """Show how to redirect stdout/stderr to files in logs/user/entry_script_log/<node_id>/."""
    entry_script = EntryScript()
    proc_name = entry_script.agent_name  # The process name in pattern "processNNN".
    log_dir = Path(entry_script.log_dir)  # logs/user/entry_script_log/<node_id>/.
    log_dir.mkdir(parents=True, exist_ok=True) # Create the folder if not existing.
    stdout_file = str(log_dir / f"{proc_name}_demo_stdout.txt")
    stderr_file = str(log_dir / f"{proc_name}_demo_stderr.txt")
    proc = Popen(
        ["...")],
        stdout=open(stdout_file, "w"),
        stderr=open(stderr_file, "w"),
        # ...
    )

Observação

Um processo de trabalho executa o código "system" e o código de script de entrada no mesmo processo.

Se nenhum stdout ou stderr for especificado, um subprocesso criado com Popen() no script de entrada herdará a configuração do processo de trabalho.

stdout gravará logs/sys/node/<node_id>/processNNN.stdout.txt em stderr e em logs/sys/node/<node_id>/processNNN.stderr.txt .

Como fazer gravar um arquivo no diretório de saída e exibi-lo no portal?

Você pode obter o diretório de saída da classe EntryScript e fazer a gravação nele. Para ver os arquivos gravados, na etapa Executar exibição no portal do Azure Machine Learning, selecione a guia Saídas + logs. Selecione o link Saídas de dados e conclua as etapas descritas na caixa de diálogo.

Use EntryScript em seu script de entrada como neste exemplo:

from pathlib import Path
from azureml_user.parallel_run import EntryScript

def run(mini_batch):
    output_dir = Path(entry_script.output_dir)
    (Path(output_dir) / res1).write...
    (Path(output_dir) / res2).write...

Como fazer para passar uma entrada lateral, como um arquivo ou arquivos que contêm uma tabela de pesquisa, para todos os meus trabalhos?

O usuário pode passar dados de referência para o script usando o parâmetro side_inputs do ParalleRunStep. Todos os conjuntos de dados fornecidos como side_inputs serão montados em cada nó de trabalho. O usuário pode obter a localização da montagem passando um argumento.

Construa um Conjunto de dados contendo os dados de referência, especifique um caminho de montagem local e registre-o no espaço de trabalho. Passe-o para o parâmetro side_inputs do seu ParallelRunStep. Além disso, você pode adicionar o caminho na seção arguments para acessar facilmente o caminho montado.

Observação

Use FileDatasets somente para side_inputs.

local_path = "/tmp/{}".format(str(uuid.uuid4()))
label_config = label_ds.as_named_input("labels_input").as_mount(local_path)
batch_score_step = ParallelRunStep(
    name=parallel_step_name,
    inputs=[input_images.as_named_input("input_images")],
    output=output_dir,
    arguments=["--labels_dir", label_config],
    side_inputs=[label_config],
    parallel_run_config=parallel_run_config,
)

Depois disso, você pode acessá-lo em seu script de inferência (por exemplo, em seu método init()) da seguinte maneira:

parser = argparse.ArgumentParser()
parser.add_argument('--labels_dir', dest="labels_dir", required=True)
args, _ = parser.parse_known_args()

labels_path = args.labels_dir

Como usar conjuntos de dados de entrada com autenticação de entidade de serviço?

O usuário pode passar conjuntos de dados de entrada com a autenticação de entidade de serviço usada no espaço de trabalho. O uso desse conjunto de dados do ParallelRunStep requer que esse conjunto de dados seja registrado para construir a configuração do ParallelRunStep.

service_principal = ServicePrincipalAuthentication(
    tenant_id="***",
    service_principal_id="***",
    service_principal_password="***")

ws = Workspace(
    subscription_id="***",
    resource_group="***",
    workspace_name="***",
    auth=service_principal
    )

default_blob_store = ws.get_default_datastore() # or Datastore(ws, '***datastore-name***')
ds = Dataset.File.from_files(default_blob_store, '**path***')
registered_ds = ds.register(ws, '***dataset-name***', create_new_version=True)

Como verificar o progresso e analisá-lo

Esta seção trata de como verificar o progresso de um trabalho ParallelRunStep e verificar a causa do comportamento inesperado.

Como verificar o andamento do trabalho?

Além de examinar o status geral do StepRun, a contagem de minilotes agendados/processados e o progresso da geração de saídas podem ser exibidos no ~/logs/job_progress_overview.<timestamp>.txt. O arquivo faz a rotação diariamente, você pode verificar o que tem o maior carimbo de data/hora para obter as informações mais recentes.

O que deverei verificar se não houver progresso por um tempo?

Você pode entrar em ~/logs/sys/errror para ver se há alguma exceção. Se não houver nenhuma, é provável que o script de entrada esteja demorando muito. Você pode imprimir as informações de progresso em seu código para localizar a parte demorada ou adicionar "--profiling_module", "cProfile" ao arguments de ParallelRunStep para gerar um arquivo de perfil denominado <process_name>.profile na pasta ~/logs/sys/node/<node_id>.

Quando um trabalho será interrompido?

Se não for cancelado, o trabalho será interrompido com o status:

  • Concluído. Se todos os minilotes tiverem sido processados e a saída tiver sido gerada para o modo append_row.
  • Falhou. Se error_threshold no Parameters for ParallelRunConfig for excedido ou um erro no sistema tiver ocorrido durante o trabalho.

Onde encontrar a causa raiz da falha?

Você pode seguir o cliente potencial no ~logs/job_result.txt para encontrar a causa e o log de erros detalhado.

A falha do nó afetará o resultado do trabalho?

Não se houver outros nós disponíveis no cluster de computação designado. O orquestrador iniciará um novo nó como substituição, e ParallelRunStep será resiliente a tal operação.

O que acontecerá se a função init no script de entrada falhar?

O ParallelRunStep tem um mecanismo para tentar novamente por alguns momentos para dar chance de recuperação de problemas transitórios sem atrasar a falha do trabalho por muito tempo. O mecanismo é o seguinte:

  1. Se, após a inicialização de um nó, init em todos os agentes continuarem falhando, deixaremos de tentar após 3 * process_count_per_node falhas.
  2. Se, depois que o trabalho for iniciado, init em todos os agentes de todos os nós continuarem falhando, deixaremos de tentar se o trabalho for executado mais de 2 minutos e houver 2 * node_count * process_count_per_node falhas.
  3. Se todos os agentes estiverem presos init por mais de 3 * run_invocation_timeout + 30 segundos, o trabalho falhará devido à ausência de progresso por muito tempo.

O que acontecerá em OutOfMemory? Como posso verificar a causa?

O ParallelRunStep definirá a tentativa atual de processar o minilote para o status com falha e tentar reiniciar o processo com falha. Você pode verificar ~logs/perf/<node_id> para localizar o processo que consome memória.

Por que tenho muitos arquivos processNNN?

O ParallelRunStep iniciará novos processos de trabalho para substituir aqueles encerrados de maneira anormal, e cada processo gerará um arquivo processNNN como log. No entanto, se o processo tiver falhado devido à exceção durante a função init do script do usuário, e se o erro tiver sido repetido continuamente por 3 * process_count_per_node vezes, nenhum novo processo de trabalho será iniciado.

Próximas etapas