Задачи классификации с помощью SynapseML
В этой статье вы выполняете одну и ту же задачу классификации двумя разными способами: один раз с использованием обычной pyspark
и однократной использования библиотеки synapseml
. Два метода дают одинаковую производительность, но выделяет простоту использования synapseml
по сравнению с pyspark
.
Задача заключается в том, чтобы предсказать, является ли отзыв клиента книги, проданной на Amazon, хорошо (рейтинг > 3) или плохо на основе текста обзора. Это можно сделать, обучая учащихся LogisticRegression с различными гиперпараметрами и выбирая лучшую модель.
Необходимые компоненты
Подключите записную книжку к lakehouse. В левой части нажмите кнопку "Добавить ", чтобы добавить существующее озеро или создать озеро.
Настройка
Импортируйте необходимые библиотеки Python и получите сеанс Spark.
from pyspark.sql import SparkSession
# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()
Чтение данных
Скачайте и считываете данные.
rawData = spark.read.parquet(
"wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet"
)
rawData.show(5)
Извлечение функций и данных обработки
Реальные данные более сложны, чем указанный выше набор данных. Обычно набор данных имеет признаки нескольких типов, таких как текст, числовые и категориальные. Чтобы иллюстрировать, насколько сложно работать с этими наборами данных, добавьте в набор данных два числовых признака: количество слов проверки и средняя длина слова.
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def wordCount(s):
return len(s.split())
def wordLength(s):
import numpy as np
ss = [len(w) for w in s.split()]
return round(float(np.mean(ss)), 2)
wordLengthUDF = udf(wordLength, DoubleType())
wordCountUDF = udf(wordCount, IntegerType())
from synapse.ml.stages import UDFTransformer
wordLength = "wordLength"
wordCount = "wordCount"
wordLengthTransformer = UDFTransformer(
inputCol="text", outputCol=wordLength, udf=wordLengthUDF
)
wordCountTransformer = UDFTransformer(
inputCol="text", outputCol=wordCount, udf=wordCountUDF
)
from pyspark.ml import Pipeline
data = (
Pipeline(stages=[wordLengthTransformer, wordCountTransformer])
.fit(rawData)
.transform(rawData)
.withColumn("label", rawData["rating"] > 3)
.drop("rating")
)
data.show(5)
Классификация с помощью pyspark
Чтобы выбрать лучший классификатор LogisticRegression с помощью pyspark
библиотеки, необходимо явно выполнить следующие действия:
- Обработайте функции:
- Токенизация текстового столбца
- Хэшируемый столбец в вектор с помощью хэширования
- Объединение числовых функций с вектором
- Обработайте столбец метки: приведите его к соответствующему типу.
- Обучение нескольких алгоритмов LogisticRegression в наборе
train
данных с различными гиперпараметрами - Вычислите область под кривой ROC для каждой обученной модели и выберите модель с самой высокой метрикой, вычисляемой в
test
наборе данных. - Оценка оптимальной модели в наборе
validation
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.feature import VectorAssembler
# Featurize text column
tokenizer = Tokenizer(inputCol="text", outputCol="tokenizedText")
numFeatures = 10000
hashingScheme = HashingTF(
inputCol="tokenizedText", outputCol="TextFeatures", numFeatures=numFeatures
)
tokenizedData = tokenizer.transform(data)
featurizedData = hashingScheme.transform(tokenizedData)
# Merge text and numeric features in one feature column
featureColumnsArray = ["TextFeatures", "wordCount", "wordLength"]
assembler = VectorAssembler(inputCols=featureColumnsArray, outputCol="features")
assembledData = assembler.transform(featurizedData)
# Select only columns of interest
# Convert rating column from boolean to int
processedData = assembledData.select("label", "features").withColumn(
"label", assembledData.label.cast(IntegerType())
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
# Prepare data for learning
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
evaluator = BinaryClassificationEvaluator(
rawPredictionCol="rawPrediction", metricName="areaUnderROC"
)
metrics = []
models = []
# Select the best model
for learner in logisticRegressions:
model = learner.fit(train)
models.append(model)
scoredData = model.transform(test)
metrics.append(evaluator.evaluate(scoredData))
bestMetric = max(metrics)
bestModel = models[metrics.index(bestMetric)]
# Get AUC on the validation dataset
scoredVal = bestModel.transform(validation)
print(evaluator.evaluate(scoredVal))
Классификация с помощью SynapseML
Действия, необходимые для работы, synapseml
проще:
Оценщик
TrainClassifier
показывает данные внутренне, если столбцы, выбранные вtrain
наборе данных,test
validation
представляют функции.Средство
FindBestModel
оценки находит лучшую модель из пула обученных моделей, найдя модель, которая лучше всего работает вtest
наборе данных, учитывая указанную метрику.ComputeModelStatistics
Преобразователь вычисляет различные метрики в наборе данных с оценкой (в нашем случаеvalidation
набор данных) одновременно.
from synapse.ml.train import TrainClassifier, ComputeModelStatistics
from synapse.ml.automl import FindBestModel
# Prepare data for learning
train, test, validation = data.randomSplit([0.60, 0.20, 0.20], seed=123)
# Train the models on the 'train' data
lrHyperParams = [0.05, 0.1, 0.2, 0.4]
logisticRegressions = [
LogisticRegression(regParam=hyperParam) for hyperParam in lrHyperParams
]
lrmodels = [
TrainClassifier(model=lrm, labelCol="label", numFeatures=10000).fit(train)
for lrm in logisticRegressions
]
# Select the best model
bestModel = FindBestModel(evaluationMetric="AUC", models=lrmodels).fit(test)
# Get AUC on the validation dataset
predictions = bestModel.transform(validation)
metrics = ComputeModelStatistics().transform(predictions)
print(
"Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100)
)