สร้างแบบจําลองด้วย ML อัตโนมัติ (ตัวอย่าง)

การเรียนรู้ของเครื่องอัตโนมัติ (AutoML) ครอบคลุมชุดของเทคนิคและเครื่องมือที่ออกแบบมาเพื่อปรับปรุงกระบวนการของการฝึกอบรมและปรับแบบจําลองการเรียนรู้ของเครื่องให้เหมาะสมด้วยการแทรกแซงของมนุษย์น้อยที่สุด วัตถุประสงค์หลักของ AutoML คือเพื่อลดความซับซ้อนและเร่งการเลือกแบบจําลองการเรียนรู้ของเครื่องที่เหมาะสมที่สุดและ hyperparameters สําหรับชุดข้อมูลที่กําหนดงานที่โดยทั่วไปต้องการความเชี่ยวชาญและทรัพยากรการคํานวณจํานวนมาก ภายในเฟรมเวิร์ก Fabric นักวิทยาศาสตร์ข้อมูลสามารถใช้ประโยชน์จาก flaml.AutoML โมดูลเพื่อปรับแง่มุมต่าง ๆ ของเวิร์กโฟลว์การเรียนรู้ของเครื่องให้เป็นอัตโนมัติ

ในบทความนี้ เราจะเจาะลึกกระบวนการสร้างการทดลองใช้ AutoML โดยตรงจากโค้ดโดยใช้ชุดข้อมูล Spark นอกจากนี้ เราจะสํารวจวิธีการสําหรับการแปลงข้อมูลนี้เป็นกรอบข้อมูล Pandas และพูดคุยเกี่ยวกับเทคนิคสําหรับการทดลองใช้งานการทดลองของคุณแบบขนานกัน

สำคัญ

คุณลักษณะนี้อยู่ในตัวอย่าง

ข้อกำหนดเบื้องต้น

  • สร้างสภาพแวดล้อม Fabric ใหม่ หรือตรวจสอบว่าคุณกําลังทํางานบน Fabric Runtime 1.2 (Spark 3.4 (หรือสูงกว่า) และ Delta 2.4)
  • สร้าง สมุดบันทึกใหม่
  • แนบสมุดบันทึกของคุณเข้ากับเลคเฮ้าส์ ทางด้านซ้ายของสมุดบันทึกของคุณ ให้เลือก เพิ่ม เพื่อเพิ่มเลคเฮ้าส์ที่มีอยู่แล้ว หรือสร้างขึ้นใหม่

โหลดและเตรียมข้อมูล

ในส่วนนี้ เราจะระบุการตั้งค่าการดาวน์โหลดสําหรับข้อมูล และจากนั้นบันทึกไปยังเลคเฮ้าส์

ดาวน์โหลดข้อมูล

บล็อกรหัสนี้จะดาวน์โหลดข้อมูลจากแหล่งข้อมูลระยะไกลและบันทึกไปยังเลคเฮ้าส์

import os
import requests

IS_CUSTOM_DATA = False  # if TRUE, dataset has to be uploaded manually

if not IS_CUSTOM_DATA:
    # Specify the remote URL where the data is hosted
    remote_url = "https://synapseaisolutionsa.blob.core.windows.net/public/bankcustomerchurn"
    
    # List of data files to download
    file_list = ["churn.csv"]
    
    # Define the download path within the lakehouse
    download_path = "/lakehouse/default/Files/churn/raw"
    
    # Check if the lakehouse directory exists; if not, raise an error
    if not os.path.exists("/lakehouse/default"):
        raise FileNotFoundError("Default lakehouse not found. Please add a lakehouse and restart the session.")
    
    # Create the download directory if it doesn't exist
    os.makedirs(download_path, exist_ok=True)
    
    # Download each data file if it doesn't already exist in the lakehouse
    for fname in file_list:
        if not os.path.exists(f"{download_path}/{fname}"):
            r = requests.get(f"{remote_url}/{fname}", timeout=30)
            with open(f"{download_path}/{fname}", "wb") as f:
                f.write(r.content)
    
    print("Downloaded demo data files into lakehouse.")

โหลดข้อมูลลงในกรอบข้อมูล Spark

บล็อกรหัสต่อไปนี้โหลดข้อมูลจากไฟล์ CSV ลงใน Spark DataFrame และแคชเพื่อการประมวลผลที่มีประสิทธิภาพ

df = (
    spark.read.option("header", True)
    .option("inferSchema", True)
    .csv("Files/churn/raw/churn.csv")
    .cache()
)

รหัสนี้ถือว่ามีการดาวน์โหลดไฟล์ข้อมูลแล้วและอยู่ในเส้นทางที่ระบุ ซึ่งจะอ่านไฟล์ CSV ลงใน Spark DataFrame อนุมาน schema และแคชสําหรับการเข้าถึงได้เร็วขึ้นในระหว่างการดําเนินการที่ตามมา

เตรียมข้อมูล

ในส่วนนี้ เราจะดําเนินการทําความสะอาดข้อมูลและวิศวกรรมคุณลักษณะบนชุดข้อมูล

ทำความสะอาดข้อมูล

ก่อนอื่น เรากําหนดฟังก์ชันเพื่อทําความสะอาดข้อมูล ซึ่งรวมถึงการทิ้งแถวที่มีข้อมูลที่ขาดหายไป ลบแถวที่ซ้ํากันโดยยึดตามคอลัมน์ที่ระบุ และลบคอลัมน์ที่ไม่จําเป็นออก

# Define a function to clean the data
def clean_data(df):
    # Drop rows with missing data across all columns
    df = df.dropna(how="all")
    # Drop duplicate rows based on 'RowNumber' and 'CustomerId'
    df = df.dropDuplicates(subset=['RowNumber', 'CustomerId'])
    # Drop columns: 'RowNumber', 'CustomerId', 'Surname'
    df = df.drop('RowNumber', 'CustomerId', 'Surname')
    return df

# Create a copy of the original dataframe by selecting all the columns
df_copy = df.select("*")

# Apply the clean_data function to the copy
df_clean = clean_data(df_copy)

ฟังก์ชัน clean_data นี้ช่วยให้มั่นใจว่าชุดข้อมูลไม่มีค่าและรายการซ้ําที่ขาดหายไปในขณะที่ลบคอลัมน์ที่ไม่จําเป็นออก

วิศวกรรมคุณลักษณะ

ถัดไป เราจะดําเนินการวิศวกรรมคุณลักษณะโดยการสร้างคอลัมน์ตัวอย่างสําหรับคอลัมน์ 'ภูมิศาสตร์' และ 'เพศ' โดยใช้การเข้ารหัสที่ร้อนเพียงตัวเดียว

# Import PySpark functions
from pyspark.sql import functions as F

# Create dummy columns for 'Geography' and 'Gender' using one-hot encoding
df_clean = df_clean.select(
    "*",
    F.when(F.col("Geography") == "France", 1).otherwise(0).alias("Geography_France"),
    F.when(F.col("Geography") == "Germany", 1).otherwise(0).alias("Geography_Germany"),
    F.when(F.col("Geography") == "Spain", 1).otherwise(0).alias("Geography_Spain"),
    F.when(F.col("Gender") == "Female", 1).otherwise(0).alias("Gender_Female"),
    F.when(F.col("Gender") == "Male", 1).otherwise(0).alias("Gender_Male")
)

# Drop the original 'Geography' and 'Gender' columns
df_clean = df_clean.drop("Geography", "Gender")

ที่นี่เราใช้การเข้ารหัสแบบหนึ่งร้อนเพื่อแปลงคอลัมน์ตามประเภทเป็นคอลัมน์ตัวอย่างไบนารีทําให้เหมาะสําหรับอัลกอริทึมการเรียนรู้ของเครื่อง

แสดงข้อมูลที่ได้รับการทําความสะอาดแล้ว

ในตอนท้าย เราแสดงชุดข้อมูลที่ได้รับการทําความสะอาดและออกแบบด้วยคุณลักษณะโดยใช้ฟังก์ชันแสดงผล


display(df_clean)

ขั้นตอนนี้ช่วยให้คุณสามารถตรวจสอบ DataFrame ที่เกิดขึ้นด้วยการแปลงที่ใช้

บันทึกไปยังเลคเฮาส์

ในตอนนี้ เราจะบันทึกชุดข้อมูลที่ได้รับการทําความสะอาดและออกแบบมาสําหรับวิศกรไปยังเลคเฮ้าส์

# Create PySpark DataFrame from Pandas
df_clean.write.mode("overwrite").format("delta").save(f"Tables/churn_data_clean")
print(f"Spark dataframe saved to delta table: churn_data_clean")

ที่นี่ เรานําการทําความสะอาดและการแปลง PySpark DataFrame และ df_cleanบันทึกเป็นตาราง Delta ที่ชื่อ "churn_data_clean" ในเลคเฮ้าส์ เราใช้รูปแบบ Delta สําหรับการกําหนดรุ่นและการจัดการที่มีประสิทธิภาพของชุดข้อมูล mode("overwrite")ทําให้แน่ใจว่าตารางใด ๆ ที่มีอยู่ที่มีชื่อเดียวกันถูกเขียนทับ และเวอร์ชันใหม่ของตารางจะถูกสร้างขึ้น

สร้างชุดข้อมูลการทดสอบและการฝึกอบรม

ถัดไป เราจะสร้างชุดข้อมูลการทดสอบและการฝึกอบรมจากข้อมูลที่ได้รับการทําความสะอาดและออกแบบโดยคุณลักษณะ

ในส่วนโค้ดที่ให้มา เราโหลดชุดข้อมูลที่ได้รับการทําความสะอาดและออกแบบคุณลักษณะจาก lakehouse โดยใช้รูปแบบ Delta แยกชุดข้อมูลลงในการฝึกอบรมและชุดการทดสอบด้วยอัตราส่วน 80-20 และเตรียมข้อมูลสําหรับการเรียนรู้ของเครื่อง การเตรียมการนี้เกี่ยวข้องกับการนําเข้า VectorAssembler จาก PySpark ML เพื่อรวมคอลัมน์คุณลักษณะลงในคอลัมน์ "คุณลักษณะ" เดียว ต่อมา เราใช้ VectorAssembler เพื่อแปลงชุดข้อมูลการฝึกอบรมและการทดสอบ ส่งผลให้ train_data test_data DataFrame ที่มีตัวแปรเป้าหมาย "Exited" และเวกเตอร์คุณลักษณะ ชุดข้อมูลเหล่านี้พร้อมสําหรับการใช้งานในการสร้างและประเมินแบบจําลองการเรียนรู้ของเครื่องแล้ว

# Import the necessary library for feature vectorization
from pyspark.ml.feature import VectorAssembler

# Load the cleaned and feature-engineered dataset from the lakehouse
df_final = spark.read.format("delta").load("Tables/churn_data_clean")

# Train-Test Separation
train_raw, test_raw = df_final.randomSplit([0.8, 0.2], seed=41)

# Define the feature columns (excluding the target variable 'Exited')
feature_cols = [col for col in df_final.columns if col != "Exited"]

# Create a VectorAssembler to combine feature columns into a single 'features' column
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform the training and testing datasets using the VectorAssembler
train_data = featurizer.transform(train_raw)["Exited", "features"]
test_data = featurizer.transform(test_raw)["Exited", "features"]

ฝึกแบบจําลองข้อมูลพื้นฐาน

ด้วยการใช้ข้อมูลที่ถูกแนะนํา เราจะฝึกแบบจําลองการเรียนรู้ของเครื่องพื้นฐาน กําหนดค่า MLflow สําหรับการติดตามการทดลอง กําหนดฟังก์ชันการคาดการณ์สําหรับการคํานวณเมตริก และสุดท้าย ดูและบันทึกคะแนน ROC AUC ที่เกิดขึ้น

ตั้งค่าระดับการบันทึก

ที่นี่ เรากําหนดค่าระดับการบันทึกเพื่อระงับเอาต์พุตที่ไม่จําเป็นจากไลบรารี Synapse.ml รักษาตัวล้างบันทึก

import logging
 
logging.getLogger('synapse.ml').setLevel(logging.ERROR)

กําหนดค่า MLflow

ในส่วนนี้ เรากําหนดค่า MLflow สําหรับการติดตามการทดลอง เราตั้งชื่อการทดลองเป็น "automl_sample" เพื่อจัดระเบียบการทํางาน นอกจากนี้ เรายังเปิดใช้งานการบันทึกอัตโนมัติ เพื่อให้แน่ใจว่าพารามิเตอร์แบบจําลอง เมตริก และวัตถุจะถูกบันทึกไปยัง MLflow โดยอัตโนมัติ

import mlflow

# Set the MLflow experiment to "automl_sample" and enable automatic logging
mlflow.set_experiment("automl_sample")
mlflow.autolog(exclusive=False)

ฝึกและประเมินแบบจําลอง

ในตอนท้าย เราฝึกแบบจําลอง LightGBMClassifier บนข้อมูลการฝึกที่ให้มา แบบจําลองได้รับการกําหนดค่าด้วยการตั้งค่าที่จําเป็นสําหรับการจัดประเภทไบนารีและการจัดการความไม่สมดุล จากนั้นเราใช้แบบจําลองที่ได้รับการฝึกนี้เพื่อทําการคาดการณ์ข้อมูลทดสอบ เราแยกความน่าจะเป็นที่คาดการณ์สําหรับคลาสบวกและป้ายชื่อจริงจากข้อมูลทดสอบ หลังจากนั้นเราคํานวณคะแนน ROC AUC โดยใช้ฟังก์ชันของ roc_auc_score sklearn

from synapse.ml.lightgbm import LightGBMClassifier
from sklearn.metrics import roc_auc_score

# Assuming you have already defined 'train_data' and 'test_data'

with mlflow.start_run(run_name="default") as run:
    # Create a LightGBMClassifier model with specified settings
    model = LightGBMClassifier(objective="binary", featuresCol="features", labelCol="Exited", dataTransferMode="bulk")
    
    # Fit the model to the training data
    model = model.fit(train_data)

    # Get the predictions
    predictions = model.transform(test_data)

    # Extract the predicted probabilities for the positive class
    y_pred = predictions.select("probability").rdd.map(lambda x: x[0][1]).collect()

    # Extract the true labels from the 'test_data' DataFrame
    y_true = test_data.select("Exited").rdd.map(lambda x: x[0]).collect()

    # Compute the ROC AUC score
    roc_auc = roc_auc_score(y_true, y_pred)

    # Log the ROC AUC score with MLflow
    mlflow.log_metric("ROC_AUC", roc_auc)

    # Print or log the ROC AUC score
    print("ROC AUC Score:", roc_auc)

จากที่นี่ เราจะเห็นว่าแบบจําลองผลลัพธ์ของเราให้คะแนน ROC AUC ที่ 84%

สร้างการทดลองใช้ AutoML ด้วย FLAML

ในส่วนนี้ เราจะสร้างการทดลองใช้ AutoML โดยใช้แพคเกจ FLAML กําหนดค่าการตั้งค่ารุ่นทดลองใช้ แปลงชุดข้อมูล Spark เป็น Pandas บนชุดข้อมูล Spark เรียกใช้การทดลองใช้ AutoML และดูเมตริกที่เป็นผลลัพธ์

กําหนดค่าการทดลองใช้ AutoML

ที่นี่ เรานําเข้าคลาสและโมดูลที่จําเป็นจากแพคเกจ FLAML และสร้างอินสแตนซ์ของ AutoML ซึ่งจะถูกใช้เพื่อทําให้ไปป์ไลน์การเรียนรู้ของเครื่องเป็นอัตโนมัติ

# Import the AutoML class from the FLAML package
from flaml import AutoML
from flaml.automl.spark.utils import to_pandas_on_spark

# Create an AutoML instance
automl = AutoML()

กําหนดค่าการตั้งค่า

ในส่วนนี้ เรากําหนดการตั้งค่าการกําหนดค่าสําหรับการทดลองใช้ AutoML

# Define AutoML settings
settings = {
    "time_budget": 250,         # Total running time in seconds
    "metric": 'roc_auc',       # Optimization metric (ROC AUC in this case)
    "task": 'classification',  # Task type (classification)
    "log_file_name": 'flaml_experiment.log',  # FLAML log file
    "seed": 41,                # Random seed
    "force_cancel": True,      # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"      # MLflow experiment name
}

แปลงเป็น Pandas บน Spark

เมื่อต้องการเรียกใช้ AutoML ด้วยชุดข้อมูลที่ใช้ Spark เราจําเป็นต้องแปลงเป็น Pandas บนชุดข้อมูล Spark โดยใช้ to_pandas_on_spark ฟังก์ชัน ซึ่งทําให้ FLAML ทํางานกับข้อมูลได้อย่างมีประสิทธิภาพ

# Convert the Spark training dataset to a Pandas on Spark dataset
df_automl = to_pandas_on_spark(train_data)

เรียกใช้การทดลองใช้ AutoML

ในตอนนี้ เราดําเนินการทดลองใช้ AutoML เราใช้ MLflow ที่ซ้อนกันเพื่อติดตามการทดลองภายในบริบทการเรียกใช้ MLflow ที่มีอยู่ การทดลองใช้ AutoML จะดําเนินการบน Pandas บนชุดข้อมูล Spark (df_automl) ที่มีตัวแปรเป้าหมาย "Exited และการตั้งค่าที่กําหนดไว้จะถูกส่งผ่านไปยัง fit ฟังก์ชันสําหรับการกําหนดค่า

'''The main flaml automl API'''

with mlflow.start_run(nested=True):
    automl.fit(dataframe=df_automl, label='Exited', isUnbalance=True, **settings)

ดูเมตริกที่เป็นผลลัพธ์

ในส่วนสุดท้ายนี้ เราเรียกใช้และแสดงผลลัพธ์ของการทดลองใช้ AutoML เมตริกเหล่านี้ให้ข้อมูลเชิงลึกเกี่ยวกับประสิทธิภาพการทํางานและการกําหนดค่าของแบบจําลอง AutoML บนชุดข้อมูลที่ระบุ

# Retrieve and display the best hyperparameter configuration and metrics
print('Best hyperparameter config:', automl.best_config)
print('Best ROC AUC on validation data: {0:.4g}'.format(1 - automl.best_loss))
print('Training duration of the best run: {0:.4g} s'.format(automl.best_config_train_time))

รวมการทดลองใช้ AutoML ของคุณแบบคู่ขนานกับ Apache Spark

ในสถานการณ์ที่ชุดข้อมูลของคุณสามารถใส่ได้พอดีกับโหนดเดียวและคุณต้องการใช้ประโยชน์จากพลังของ Spark สําหรับการเรียกใช้การทดลองใช้ AutoML แบบขนานหลายรายการพร้อมกัน คุณสามารถทําตามขั้นตอนเหล่านี้ได้:

แปลงเป็นดาต้าเฟรมของ Pandas

เมื่อต้องการเปิดใช้งานแบบขนาน ข้อมูลของคุณต้องถูกแปลงเป็น DataFrame ของ Pandas ก่อน

pandas_df = train_raw.toPandas()

ที่นี่ เราแปลง train_raw Spark DataFrame เป็น Pandas DataFrame ที่มี pandas_df ชื่อว่าเพื่อให้เหมาะสําหรับการประมวลผลแบบขนาน

กําหนดค่าการตั้งค่าแบบขนาน

ตั้งค่า use_spark เป็น True เพื่อเปิดใช้งานการทํางานแบบขนานตาม Spark ตามค่าเริ่มต้น FLAML จะเปิดใช้หนึ่งรุ่นทดลองใช้ต่อเครื่องปฏิบัติการ คุณสามารถกําหนดจํานวนการทดลองใช้พร้อมกันโดยใช้ n_concurrent_trials อาร์กิวเมนต์ได้

settings = {
    "time_budget": 250,           # Total running time in seconds
    "metric": 'roc_auc',         # Optimization metric (ROC AUC in this case)
    "task": 'classification',    # Task type (classification)
    "seed": 41,                  # Random seed
    "use_spark": True,           # Enable Spark-based parallelism
    "n_concurrent_trials": 3,    # Number of concurrent trials to run
    "force_cancel": True,        # Force stop training once time_budget is used up
    "mlflow_exp_name": "automl_sample"  # MLflow experiment name

}

ในการตั้งค่าเหล่านี้ เราระบุว่าเราต้องการใช้ Spark สําหรับความขนานโดยการตั้งค่าuse_sparkเป็นTrue นอกจากนี้เรายังตั้งค่าจํานวนการทดลองใช้พร้อมกันเป็น 3 ซึ่งหมายความว่าการทดลองใช้สามตัวจะทํางานคู่ขนานบน Spark

เมื่อต้องการเรียนรู้เพิ่มเติมเกี่ยวกับวิธีการขนานกับเส้นทาง AutoML ของคุณ คุณสามารถเยี่ยมชม เอกสาร FLAML สําหรับงาน Spark ขนาน

เรียกใช้การทดลองใช้ AutoML แบบขนาน

ในตอนนี้ เราจะเรียกใช้การทดลองใช้ AutoML ควบคู่ไปกับการตั้งค่าที่ระบุ เราจะใช้การเรียกใช้ MLflow ที่ซ้อนกันเพื่อติดตามการทดลองภายในบริบทการเรียกใช้ MLflow ที่มีอยู่

'''The main FLAML AutoML API'''
with mlflow.start_run(nested=True, run_name="parallel_trial"):
    automl.fit(dataframe=pandas_df, label='Exited', **settings)

การดําเนินการนี้จะเป็นการดําเนินการทดลองใช้ AutoML ด้วยการเปิดใช้งานแบบขนาน อาร์กิวเมนต์ dataframe ถูกตั้งค่าเป็น Pandas DataFrame pandas_dfและการตั้งค่าอื่น ๆ จะถูกส่งผ่านไปยัง fit ฟังก์ชันสําหรับการดําเนินการแบบขนาน

ดูเมตริก

หลังจากเรียกใช้การทดลองใช้ AutoML แบบขนาน แล้วให้เรียกใช้และแสดงผลลัพธ์รวมถึงการกําหนดค่า hyperparameter ที่ดีที่สุด ROC AUC ในข้อมูลการตรวจสอบความถูกต้องและระยะเวลาการฝึกอบรมของการเรียกใช้ที่มีประสิทธิภาพที่ดีที่สุด

''' retrieve best config'''
print('Best hyperparmeter config:', automl.best_config)
print('Best roc_auc on validation data: {0:.4g}'.format(1-automl.best_loss))
print('Training duration of best run: {0:.4g} s'.format(automl.best_config_train_time))