Tutorial: Criar uma aplicação de machine learning com o Apache Spark MLlib e o Azure Synapse Analytics

Neste artigo, irá aprender a utilizar o MLlib do Apache Spark para criar uma aplicação de machine learning que faz uma análise preditiva simples num conjunto de dados aberto do Azure. O Spark fornece bibliotecas de machine learning incorporadas. Este exemplo utiliza a classificação através da regressão logística.

O SparkML e o MLlib são bibliotecas principais do Spark que fornecem muitos utilitários úteis para tarefas de machine learning, incluindo utilitários adequados para:

  • Classificação
  • Regressão
  • Clustering
  • Modelação de tópicos
  • Decomposição de valor singular (SVD) e análise de componentes principais (PCA)
  • Testes de hipóteses e cálculo de estatísticas de exemplo

Compreender a classificação e a regressão logística

A classificação, uma tarefa popular de machine learning, é o processo de ordenar dados de entrada em categorias. É tarefa de um algoritmo de classificação descobrir como atribuir etiquetas a dados de entrada fornecidos por si. Por exemplo, pode pensar num algoritmo de machine learning que aceita informações de ações como entrada e divide o stock em duas categorias: ações que deve vender e ações que deve manter.

A regressão logística é um algoritmo que pode utilizar para classificação. A API de regressão logística do Spark é útil para classificação binária ou classificação de dados de entrada num de dois grupos. Para obter mais informações sobre a regressão logística, consulte Wikipédia.

Em resumo, o processo de regressão logística produz uma função logística que pode utilizar para prever a probabilidade de um vetor de entrada pertencer a um grupo ou outro.

Exemplo de análise preditiva sobre dados de táxi de NYC

Neste exemplo, vai utilizar o Spark para efetuar algumas análises preditivas sobre dados de sugestões de viagens de táxi a partir de Nova Iorque. Os dados estão disponíveis através do Azure Open Datasets. Este subconjunto do conjunto de dados contém informações sobre viagens de táxi amarelas, incluindo informações sobre cada viagem, a hora de início e de fim e as localizações, o custo e outros atributos interessantes.

Importante

Poderá haver custos adicionais para extrair estes dados da localização de armazenamento.

Nos passos seguintes, vai desenvolver um modelo para prever se uma viagem específica inclui ou não uma sugestão.

Criar um modelo de machine learning do Apache Spark

  1. Crie um bloco de notas com o kernel do PySpark. Para obter instruções, consulte Criar um bloco de notas.

  2. Importe os tipos necessários para esta aplicação. Copie e cole o seguinte código numa célula vazia e, em seguida, prima Shift+Enter. Em alternativa, execute a célula com o ícone de reprodução azul à esquerda do código.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    

    Devido ao kernel do PySpark, não precisa de criar nenhum contexto explicitamente. O contexto do Spark é criado automaticamente quando executa a primeira célula de código.

Construir o DataFrame de entrada

Uma vez que os dados não processados estão num formato Parquet, pode utilizar o contexto do Spark para extrair o ficheiro para a memória como um DataFrame diretamente. Embora o código nos passos seguintes utilize as opções predefinidas, é possível forçar o mapeamento de tipos de dados e outros atributos de esquema, se necessário.

  1. Execute as seguintes linhas para criar um DataFrame do Spark ao colar o código numa nova célula. Este passo obtém os dados através da API Open Datasets. Extrair todos estes dados gera cerca de 1,5 mil milhões de linhas.

    Consoante o tamanho do conjunto do Apache Spark sem servidor, os dados não processados podem ser demasiado grandes ou demorar demasiado tempo a funcionar. Pode filtrar estes dados para algo mais pequeno. O seguinte exemplo de código utiliza start_date e end_date para aplicar um filtro que devolve um único mês de dados.

    from azureml.opendatasets import NycTlcYellow
    
    from datetime import datetime
    from dateutil import parser
    
    end_date = parser.parse('2018-05-08 00:00:00')
    start_date = parser.parse('2018-05-01 00:00:00')
    
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    filtered_df = spark.createDataFrame(nyc_tlc.to_pandas_dataframe())
    
    
  2. A desvantagem da filtragem simples é que, do ponto de vista estatístico, pode introduzir preconceitos nos dados. Outra abordagem é utilizar a amostragem incorporada no Spark.

    O código seguinte reduz o conjunto de dados para cerca de 2000 linhas, se for aplicado após o código anterior. Pode utilizar este passo de amostragem em vez do filtro simples ou em conjunto com o filtro simples.

    # To make development easier, faster, and less expensive, downsample for now
    sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)
    
  3. Agora é possível analisar os dados para ver o que foi lido. Normalmente, é melhor rever os dados com um subconjunto em vez do conjunto completo, dependendo do tamanho do conjunto de dados.

    O código seguinte oferece duas formas de ver os dados. A primeira forma é básica. A segunda forma proporciona uma experiência de grelha muito mais rica, juntamente com a capacidade de visualizar os dados graficamente.

    #sampled_taxi_df.show(5)
    display(sampled_taxi_df)
    
  4. Dependendo do tamanho do conjunto de dados gerado e da sua necessidade de experimentar ou executar o bloco de notas muitas vezes, poderá querer colocar o conjunto de dados em cache localmente na área de trabalho. Existem três formas de efetuar a colocação em cache explícita:

    • Guarde o DataFrame localmente como um ficheiro.
    • Guarde o DataFrame como uma tabela ou vista temporária.
    • Guarde o DataFrame como uma tabela permanente.

As duas primeiras abordagens estão incluídas nos seguintes exemplos de código.

A criação de uma tabela ou vista temporária fornece caminhos de acesso diferentes para os dados, mas dura apenas durante a sessão da instância do Spark.

sampled_taxi_df.createOrReplaceTempView("nytaxi")

Preparar os dados

Os dados na sua forma não processada não são, muitas vezes, adequados para transmitir diretamente para um modelo. Tem de efetuar uma série de ações nos dados para os colocar num estado em que o modelo o possa consumir.

No código seguinte, executa quatro classes de operações:

  • A remoção de valores atípicos ou incorretos através da filtragem.
  • A remoção de colunas, que não são necessárias.
  • A criação de novas colunas derivadas dos dados não processados para que o modelo funcione de forma mais eficaz. Por vezes, esta operação é denominada caracterização.
  • Etiquetagem. Uma vez que está a realizar uma classificação binária (existirá uma sugestão ou não numa determinada viagem), é necessário converter o valor da sugestão num valor de 0 ou 1.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                                , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                                , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                                , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                                , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                                , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                                )\
                        .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                                & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                                & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                                & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                                & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                                & (sampled_taxi_df.rateCodeId <= 5)
                                & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                                )

Em seguida, efetue uma segunda passagem dos dados para adicionar as funcionalidades finais.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Criar um modelo de regressão logística

A tarefa final consiste em converter os dados etiquetados num formato que pode ser analisado através da regressão logística. A entrada para um algoritmo de regressão logística tem de ser um conjunto de pares de vetores de etiquetas/funcionalidades, em que o vetor de funcionalidades é um vetor de números que representam o ponto de entrada.

Por isso, tem de converter as colunas categóricas em números. Especificamente, tem de converter as trafficTimeBins colunas e weekdayString em representações de números inteiros. Existem várias abordagens para efetuar a conversão. O exemplo seguinte aceita a OneHotEncoder abordagem, que é comum.

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Esta ação resulta num novo DataFrame com todas as colunas no formato certo para preparar um modelo.

Preparar um modelo de regressão logística

A primeira tarefa consiste em dividir o conjunto de dados num conjunto de preparação e num conjunto de testes ou validação. A divisão aqui é arbitrária. Experimente diferentes definições de divisão para ver se afetam o modelo.

# Decide on the split between training and testing data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Agora que existem dois DataFrames, a próxima tarefa é criar a fórmula do modelo e executá-la no DataFrame de preparação. Em seguida, pode validar em relação ao DataFrame de teste. Experimente diferentes versões da fórmula de modelo para ver o impacto de diferentes combinações.

Nota

Para guardar o modelo, atribua a função Contribuidor de Dados do Blob de Armazenamento ao âmbito de recursos do servidor da Base de Dados do SQL do Azure. Para obter os passos detalhados, veja o artigo Atribuir funções do Azure com o portal do Azure. Apenas os membros com privilégios de proprietário podem efetuar este passo.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Saving the model is optional, but it's another form of inter-session cache
datestamp = datetime.now().strftime('%m-%d-%Y-%s')
fileName = "lrModel_" + datestamp
logRegDirfilename = fileName
lrModel.save(logRegDirfilename)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

O resultado desta célula é:

Area under ROC = 0.9779470729751403

Criar uma representação visual da predição

Agora, pode construir uma visualização final para o ajudar a determinar os resultados deste teste. Uma curva ROC é uma forma de rever o resultado.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Gráfico que mostra a curva ROC para regressão logística no modelo de ponta.

Encerrar a instância do Spark

Depois de concluir a execução da aplicação, encerre o bloco de notas para libertar os recursos ao fechar o separador. Em alternativa, selecione Terminar Sessão no painel de estado na parte inferior do bloco de notas.

Ver também

Passos seguintes

Nota

Parte da documentação oficial do Apache Spark baseia-se na utilização da consola do Spark, que não está disponível no Apache Spark no Azure Synapse Analytics. Em alternativa, utilize o bloco de notas ou as experiências do IntelliJ .