Creare un modello di Machine Learning con MLlib di Apache Spark MLlib

Questo articolo illustra come usare MLlib di Apache Spark per creare un'applicazione di apprendimento automatico che gestisce analisi predittive semplici su un set di dati aperto. Spark offre librerie di apprendimento automatico predefinite. Questo esempio usa la classificazione tramite regressione logistica.

SparkML e MLlib, librerie Spark di base, offrono diverse utilità in grado di agevolare le attività di apprendimento automatico. Queste utilità sono adatte per:

  • Classificazione
  • Cluster
  • Testing e calcolo ipotetici di statistiche di esempio
  • Regressione
  • Scomposizione di valori singolari e analisi in componenti principali
  • Modellazione di argomenti

Informazioni sulla classificazione e la regressione logistica

Classificazione, un'attività comune di apprendimento automatico, prevede il processo di ordinamento dei dati in categorie. Un algoritmo di classificazione deve determinare come assegnare etichette ai dati di input forniti. Ad esempio, un algoritmo di apprendimento automatico che può accettare informazioni sulle azioni come input e divide le azioni in due categorie: azioni da vendere e azioni da conservare.

L’algoritmo regressione logistica è utile per la classificazione. L'API di regressione logistica di Spark è utile per la classificazione binaria, ovvero la classificazione di dati di input in uno di due gruppi. Per altre informazioni sulla regressione logistica, vedere Wikipedia.

La regressione logistica genera una funzione logistica che è possibile usare per stimare la probabilità che un vettore di input appartenga a un gruppo o all'altro.

Esempio di analisi predittiva di dati relativi alla rete taxi di New York

Installare prima di tutto azureml-opendatasets. I dati sono disponibili tramite risorse di set di dati aperti di Azure. Questo sottoinsieme di set di dati ospita informazioni sulle corse in taxi gialle, inclusi gli orari di inizio, le ore di fine, le posizioni di inizio, le posizioni di fine, i costi delle corse e altri attributi.

%pip install azureml-opendatasets

Il resto di questo articolo si basa su Apache Spark per eseguire prima un'analisi sui dati relativi alle mance dei taxi di New York e quindi sviluppare un modello per stimare se una determinata corsa include o meno una mancia.

Creare un modello di Machine Learning con Apache Spark

  1. Creare un notebook PySpark. Per altre informazioni, vedere Creare un notebook.

  2. Importare i tipi richiesti per questo notebook.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Si userà MLflow per tenere traccia degli esperimenti di Machine Learning e delle esecuzioni corrispondenti. Se l'assegnazione automatica di Microsoft Fabric è abilitata, vengono acquisite automaticamente le metriche e i parametri corrispondenti.

    import mlflow
    

Creare il DataFrame di input

Questo esempio carica i dati in un dataframe Pandas e quindi lo converte in un dataframe Apache Spark. In questo formato, è possibile applicare altre operazioni Apache Spark per pulire e filtrare il set di dati.

  1. Incollare queste righe in una nuova cella ed eseguirle per creare un dataframe Spark. Questo passaggio consente di recuperare dati tramite l'API dei set di dati aperti di Azure. È possibile filtrare questi dati in modo da esaminare una finestra di dati specifica. L'esempio di codice usa start_date e end_date per applicare un filtro che restituisce un singolo mese di dati.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Questo codice riduce il set di dati a circa 10.000 righe. Per velocizzare lo sviluppo e il training, per il momento il set di dati viene ridotto.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Si vogliono esaminare i dati usando il comando predefinito display() . Con questo comando è possibile visualizzare facilmente un esempio di dati o esplorare graficamente le tendenze nei dati.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Preparare i dati

La preparazione dei dati è un passaggio fondamentale nel processo di Machine Learning. Implica la pulizia, la trasformazione e l'organizzazione dei dati non elaborati, per renderli adatti per l'analisi e la modellazione. In questo esempio di codice vengono eseguiti diversi passaggi di preparazione dei dati:

  • Filtrare il set di dati per rimuovere outlier e valori non corretti
  • Rimuovere colonne non necessarie per il training del modello
  • Creare nuove colonne dai dati non elaborati
  • Generare un'etichetta per determinare se una determinata corsa taxi comporta una mancia
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Successivamente, viene eseguito un secondo passaggio sui dati per aggiungere le funzionalità finali.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Creare un modello di regressione logistica

L'ultima attività converte i dati con etichetta in un formato che possa essere gestito dalla regressione logistica. L'input per un algoritmo di regressione logistica deve avere una struttura di coppie etichetta-vettore di funzionalità, dove il vettore di funzionalità è un vettore di numeri che rappresenta il punto di ingresso.

In base ai requisiti finali dell'attività, è necessario convertire le colonne categorica in numeri. In particolare, è necessario convertire le colonne trafficTimeBins e weekdayString in rappresentazioni integer. Sono disponibili molte opzioni per gestire questo requisito. Questo esempio prevede l'approccio OneHotEncoder:

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Questa azione genera un nuovo DataFrame con tutte le colonne nel formato appropriato per eseguire il training di un modello.

Eseguire il training di un modello di regressione logistica

La prima attività suddivide il set di dati in un set di training e in un set di test o di convalida.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Dopo aver creato due dataframe, è necessario creare la formula del modello ed eseguirla sul dataframe di training. Quindi, eseguire la convalida rispetto al DataFrame di test. Provare versioni diverse della formula del modello per vedere l'effetto di combinazioni diverse.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Output della cella:

Area under ROC = 0.9749430523917996

Creare una rappresentazione visiva della stima

È ora possibile creare una visualizzazione finale per interpretare i risultati del modello. Una curva ROC può certamente presentare il risultato.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Grafico che mostra la curva ROC per la regressione logistica nel modello di mancia.