Skapa en Apache Spark-maskininlärningspipeline
Apache Sparks skalbara maskininlärningsbibliotek (MLlib) ger modelleringsfunktioner till en distribuerad miljö. Spark-paketet spark.ml
är en uppsättning högnivå-API:er som bygger på DataFrames. Dessa API:er hjälper dig att skapa och finjustera praktiska maskininlärningspipelines.
Spark-maskininlärning refererar till det här MLlib DataFrame-baserade API:et, inte det äldre RDD-baserade pipeline-API:et.
En pipeline för maskininlärning (ML) är ett komplett arbetsflöde som kombinerar flera maskininlärningsalgoritmer tillsammans. Det kan krävas många steg för att bearbeta och lära av data, vilket kräver en sekvens med algoritmer. Pipelines definierar faser och ordning för en maskininlärningsprocess. I MLlib representeras faserna i en pipeline av en specifik sekvens med PipelineStages, där en transformerare och en beräknare utför uppgifter.
En Transformerare är en algoritm som transformerar en DataFrame till en annan med hjälp transform()
av metoden . En funktionstransformator kan till exempel läsa en kolumn i en DataFrame, mappa den till en annan kolumn och mata ut en ny DataFrame med den mappade kolumnen tillagd.
En beräknare är en abstraktion av inlärningsalgoritmer och ansvarar för att anpassa eller träna på en datamängd för att skapa en transformerare. En beräknare implementerar en metod med namnet fit()
, som accepterar en DataFrame och skapar en DataFrame, som är en transformerare.
Varje tillståndslös instans av en transformerare eller en beräknare har sin egen unika identifierare, som används när parametrar anges. Båda använder ett enhetligt API för att ange dessa parametrar.
Pipelineexempel
För att demonstrera en praktisk användning av en ML-pipeline använder det här exemplet exempeldatafilen HVAC.csv
som är förinstallerad på standardlagringen för ditt HDInsight-kluster, antingen Azure Storage eller Data Lake Storage. Om du vill visa innehållet i filen går du till /HdiSamples/HdiSamples/SensorSampleData/hvac
katalogen .
HVAC.csv
innehåller en uppsättning tider med både mål- och faktiska temperaturer för HVAC-system (uppvärmning, ventilation och luftkonditionering) i olika byggnader. Målet är att träna modellen på data och skapa en prognostemperatur för en viss byggnad.
Följande kod:
- Definierar en
LabeledDocument
, som lagrarBuildingID
,SystemInfo
(ett systems identifierare och ålder) och enlabel
(1,0 om byggnaden är för varm, 0,0 annars). - Skapar en anpassad parserfunktion
parseDocument
som tar en rad med data och avgör om byggnaden är "varm" genom att jämföra måltemperaturen med den faktiska temperaturen. - Använder parsern när källdata extraheras.
- Skapar träningsdata.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
# The data structure (column meanings) of the data array:
# 0 Date
# 1 Time
# 2 TargetTemp
# 3 ActualTemp
# 4 System
# 5 SystemAge
# 6 BuildingID
LabeledDocument = Row("BuildingID", "SystemInfo", "label")
# Define a function that parses the raw CSV file and returns an object of type LabeledDocument
def parseDocument(line):
values = [str(x) for x in line.split(',')]
if (values[3] > values[2]):
hot = 1.0
else:
hot = 0.0
textValue = str(values[4]) + " " + str(values[5])
return LabeledDocument((values[6]), textValue, hot)
# Load the raw HVAC.csv file, parse it using the function
data = sc.textFile(
"wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()
Den här exempelpipelinen har tre steg: Tokenizer
och HashingTF
(båda Transformatorer) och Logistic Regression
(en beräknare). Extraherade och parsade data i training
DataFrame flödar genom pipelinen när pipeline.fit(training)
anropas.
- Den första fasen,
Tokenizer
, delar upp indatakolumnenSystemInfo
(bestående av systemidentifieraren och åldersvärdena) i enwords
utdatakolumn. Den här nyawords
kolumnen läggs till i DataFrame. - Det andra steget,
HashingTF
, konverterar den nyawords
kolumnen till funktionsvektorer. Den här nyafeatures
kolumnen läggs till i DataFrame. Dessa två första steg är Transformatorer. - Den tredje fasen,
LogisticRegression
, är en beräknare och därför anropar pipelinenLogisticRegression.fit()
metoden för att skapa enLogisticRegressionModel
.
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Build the pipeline with our tokenizer, hashingTF, and logistic regression stages
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
Om du vill se de nya words
kolumnerna Tokenizer
och features
som lagts till av transformatorerna och HashingTF
och ett exempel på LogisticRegression
beräknaren kör du en PipelineModel.transform()
metod på den ursprungliga DataFrame. I produktionskoden är nästa steg att skicka in en testdataram för att verifiera träningen.
peek = model.transform(training)
peek.show()
# Outputs the following:
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|BuildingID|SystemInfo|label| words| features| rawPrediction| probability|prediction|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
| 4| 13 20| 0.0|[13, 20]|(262144,[250802,2...|[0.11943986671420...|[0.52982451901740...| 0.0|
| 17| 3 20| 0.0| [3, 20]|(262144,[89074,25...|[0.17511205617446...|[0.54366648775222...| 0.0|
| 18| 17 20| 1.0|[17, 20]|(262144,[64358,25...|[0.14620993833623...|[0.53648750722548...| 0.0|
| 15| 2 23| 0.0| [2, 23]|(262144,[31351,21...|[-0.0361327091023...|[0.49096780538523...| 1.0|
| 3| 16 9| 1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...| 1.0|
| 4| 13 28| 0.0|[13, 28]|(262144,[69821,25...|[0.14630166986618...|[0.53651031790592...| 0.0|
| 2| 12 24| 0.0|[12, 24]|(262144,[187043,2...|[-0.0509556393066...|[0.48726384581522...| 1.0|
| 16| 20 26| 1.0|[20, 26]|(262144,[128319,2...|[0.33829638728900...|[0.58377663577684...| 0.0|
| 9| 16 9| 1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...| 1.0|
| 12| 6 5| 0.0| [6, 5]|(262144,[18659,89...|[0.07513008136562...|[0.51877369045183...| 0.0|
| 15| 10 17| 1.0|[10, 17]|(262144,[64358,25...|[-0.0291988646553...|[0.49270080242078...| 1.0|
| 7| 2 11| 0.0| [2, 11]|(262144,[212053,2...|[0.03678030020834...|[0.50919403860812...| 0.0|
| 15| 14 2| 1.0| [14, 2]|(262144,[109681,2...|[0.06216423725633...|[0.51553605651806...| 0.0|
| 6| 3 2| 0.0| [3, 2]|(262144,[89074,21...|[0.00565582077537...|[0.50141395142468...| 0.0|
| 20| 19 22| 0.0|[19, 22]|(262144,[139093,2...|[-0.0769288695989...|[0.48077726176073...| 1.0|
| 8| 19 11| 0.0|[19, 11]|(262144,[139093,2...|[0.04988910033929...|[0.51246968885151...| 0.0|
| 6| 15 7| 0.0| [15, 7]|(262144,[77099,20...|[0.14854929135994...|[0.53706918109610...| 0.0|
| 13| 12 5| 0.0| [12, 5]|(262144,[89689,25...|[-0.0519932532562...|[0.48700461408785...| 1.0|
| 4| 8 22| 0.0| [8, 22]|(262144,[98962,21...|[-0.0120753606650...|[0.49698119651572...| 1.0|
| 7| 17 5| 0.0| [17, 5]|(262144,[64358,89...|[-0.0721054054871...|[0.48198145477106...| 1.0|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
only showing top 20 rows
Objektet model
kan nu användas för att göra förutsägelser. Det fullständiga exemplet på det här maskininlärningsprogrammet och stegvisa instruktioner för att köra det finns i Skapa Apache Spark-maskininlärningsprogram i Azure HDInsight.