Dataomvandling med Apache Spark-pooler (inaktuell)

GÄLLER FÖR: Python SDK azureml v1

Varning

Azure Synapse Analytics-integreringen med Azure Mašinsko učenje, som är tillgänglig i Python SDK v1, är inaktuell. Användare kan fortfarande använda Synapse-arbetsytan, registrerad med Azure Mašinsko učenje, som en länkad tjänst. Men en ny Synapse-arbetsyta kan inte längre registreras med Azure Machine Learning som en länkad tjänst. Vi rekommenderar att du använder serverlös Spark-beräkning och anslutna Synapse Spark-pooler, som är tillgängliga i CLI v2 och Python SDK v2. Mer information finns på https://aka.ms/aml-spark.

I den här artikeln får du lära dig hur du interaktivt utför dataomvandlingsuppgifter i en dedikerad Synapse-session som drivs av Azure Synapse Analytics i en Jupyter-anteckningsbok. Dessa uppgifter förlitar sig på Azure Mašinsko učenje Python SDK. Mer information om Azure Mašinsko učenje pipelines finns i Använda Apache Spark (drivs av Azure Synapse Analytics) i din maskininlärningspipeline (förhandsversion). Mer information om hur du använder Azure Synapse Analytics med en Synapse-arbetsyta finns i kom igång-serien för Azure Synapse Analytics.

Azure Mašinsko učenje- och Azure Synapse Analytics-integrering

Med Azure Synapse Analytics-integreringen med Azure Mašinsko učenje (förhandsversion) kan du bifoga en Apache Spark-pool som backas upp av Azure Synapse för interaktiv datautforskning och förberedelse. Med den här integreringen kan du ha en dedikerad beräkningsresurs för dataomvandling i stor skala, allt i samma Python-notebook-fil som du använder för att träna dina maskininlärningsmodeller.

Förutsättningar

Starta Synapse Spark-poolen för dataomvandlingsuppgifter

Om du vill starta dataförberedelsen med Apache Spark-poolen anger du det anslutna Spark Synapse-beräkningsnamnet. Du hittar det här namnet med Azure Mašinsko učenje Studio under fliken Anslutna beräkningar.

hämta bifogat beräkningsnamn

Viktigt!

Om du vill fortsätta använda Apache Spark-poolen måste du ange vilken beräkningsresurs som ska användas i dina dataomvandlingsuppgifter. Används %synapse för enstaka kodrader och %%synapse för flera rader:

%synapse start -c SynapseSparkPoolAlias

När sessionen har startat kan du kontrollera sessionens metadata:

%synapse meta

Du kan ange en Azure-Mašinsko učenje miljö som ska användas under Apache Spark-sessionen. Endast Conda-beroenden som anges i miljön börjar gälla. Docker-avbildningar stöds inte.

Varning

Python-beroenden som anges i miljön Conda-beroenden stöds inte i Apache Spark-pooler. För närvarande stöds endast fasta Python-versioner Inkludera sys.version_info i skriptet för att kontrollera din Python-version

Den här koden skaparmyenv miljövariabeln för att installera azureml-core version 1.20.0 och numpy version 1.17.0 innan sessionen startas. Du kan sedan inkludera den här miljön i apache Spark-sessionssatsen start .


from azureml.core import Workspace, Environment

# creates environment with numpy and azureml-core dependencies
ws = Workspace.from_config()
env = Environment(name="myenv")
env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0")
env.python.conda_dependencies.add_conda_package("numpy==1.17.0")
env.register(workspace=ws)

Om du vill starta dataförberedelser med Apache Spark-poolen i din anpassade miljö anger du både namnet på Apache Spark-poolen och miljön som ska användas under Apache Spark-sessionen. Du kan ange ditt prenumerations-ID, resursgruppen för maskininlärningsarbetsytan och namnet på arbetsytan för maskininlärning.

Viktigt!

Se till att aktivera Tillåt paket på sessionsnivå i den länkade Synapse-arbetsytan.

aktivera paket på sessionsnivå

%synapse start -c SynapseSparkPoolAlias -e myenv -s AzureMLworkspaceSubscriptionID -r AzureMLworkspaceResourceGroupName -w AzureMLworkspaceName

Läsa in data från lagring

När Apache Spark-sessionen har startat läser du in de data som du vill förbereda. Datainläsning stöds för Azure Blob Storage och Azure Data Lake Storage Generation 1 och 2.

Du har två alternativ för att läsa in data från dessa lagringstjänster:

För att få åtkomst till dessa lagringstjänster behöver du behörigheter för Storage Blob Data Reader . Om du vill skriva tillbaka data till dessa lagringstjänster behöver du behörigheter för Storage Blob Data Contributor . Läs mer om lagringsbehörigheter och roller.

Läsa in data med sökvägen Hadoop Distributed Files System (HDFS)

Om du vill läsa in och läsa data från lagring med motsvarande HDFS-sökväg behöver du dina autentiseringsuppgifter för dataåtkomst tillgängliga. Dessa autentiseringsuppgifter varierar beroende på din lagringstyp. Det här kodexemplet visar hur du läser data från en Azure Blob Storage till en Spark-dataram med antingen din SAS-token (signatur för delad åtkomst) eller åtkomstnyckel:

%%synapse

# setup access key or SAS token
sc._jsc.hadoopConfiguration().set("fs.azure.account.key.<storage account name>.blob.core.windows.net", "<access key>")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net", "<sas token>")

# read from blob 
df = spark.read.option("header", "true").csv("wasbs://demo@dprepdata.blob.core.windows.net/Titanic.csv")

Det här kodexemplet visar hur du läser data från Azure Data Lake Storage Generation 1 (ADLS Gen 1) med autentiseringsuppgifterna för tjänstens huvudnamn:

%%synapse

# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.access.token.provider.type","ClientCredential")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.client.id", "<client id>")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.credential", "<client secret>")

sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.refresh.url",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")

df = spark.read.csv("adl://<storage account name>.azuredatalakestore.net/<path>")

Det här kodexemplet visar hur du läser in data från Azure Data Lake Storage Generation 2 (ADLS Gen 2) med autentiseringsuppgifterna för tjänstens huvudnamn:

%%synapse

# setup service principal which has access of the data
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net","OAuth")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net", "<client id>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net", "<client secret>")
sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net",
"https://login.microsoftonline.com/<tenant id>/oauth2/token")

df = spark.read.csv("abfss://<container name>@<storage account>.dfs.core.windows.net/<path>")

Läsa in data från registrerade datamängder

Du kan också placera en befintlig registrerad datamängd på din arbetsyta och utföra dataförberedelser på den om du konverterar den till en spark-dataram. Det här exemplet autentiserar till arbetsytan, hämtar en registrerad TabularDataset –blob_dset – som refererar till filer i bloblagring och konverterar den TabularDataset till en Spark-dataram. När du konverterar dina datauppsättningar till Spark-dataramar kan du använda pyspark bibliotek för datautforskning och förberedelse.

%%synapse

from azureml.core import Workspace, Dataset

subscription_id = "<enter your subscription ID>"
resource_group = "<enter your resource group>"
workspace_name = "<enter your workspace name>"

ws = Workspace(workspace_name = workspace_name,
               subscription_id = subscription_id,
               resource_group = resource_group)

dset = Dataset.get_by_name(ws, "blob_dset")
spark_df = dset.to_spark_dataframe()

Utföra dataomvandlingsuppgifter

När du har hämtat och utforskat dina data kan du utföra dataomvandlingsuppgifter. Det här kodexemplet expanderar HDFS-exemplet i föregående avsnitt. Baserat på kolumnen Survivor filtrerar den data i spark-dataramen df och grupper som listas efter Ålder:

%%synapse

from pyspark.sql.functions import col, desc

df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10)

df.show()

Spara data till lagring och stoppa Spark-sessionen

När datautforskningen och förberedelsen är klar lagrar du dina förberedda data för senare användning i ditt lagringskonto i Azure. I det här kodexemplet skrivs förberedda data tillbaka till Azure Blob Storage och skriver över den ursprungliga Titanic.csv filen i training_data katalogen. Om du vill skriva tillbaka till lagringen behöver du behörigheter för Storage Blob Data-deltagare . Mer information finns i Tilldela en Azure-roll för åtkomst till blobdata.

%% synapse

df.write.format("csv").mode("overwrite").save("wasbs://demo@dprepdata.blob.core.windows.net/training_data/Titanic.csv")

När du har slutfört dataförberedelsen och sparat dina förberedda data till lagring avslutar du användningen av Apache Spark-poolen med det här kommandot:

%synapse stop

Skapa en datauppsättning för att representera förberedda data

När du är redo att använda dina förberedda data för modellträning ansluter du till lagringen med ett Azure-Mašinsko učenje datalager och anger den fil eller fil som du vill använda med en Azure-Mašinsko učenje datauppsättning.

Det här kodexemplet

  • Förutsätter att du redan har skapat ett datalager som ansluter till lagringstjänsten där du sparade dina förberedda data
  • Hämtar det befintliga dataarkivet – mydatastore – från arbetsytan ws med metoden get().
  • Skapar en FileDataset, train_ds, för att referera till de förberedda datafilerna mydatastore training_data som finns i katalogen
  • Skapar variabeln input1. Vid ett senare tillfälle kan den här variabeln göra datafilerna för datamängden train_ds tillgängliga för ett beräkningsmål för dina träningsaktiviteter.
from azureml.core import Datastore, Dataset

datastore = Datastore.get(ws, datastore_name='mydatastore')

datastore_paths = [(datastore, '/training_data/')]
train_ds = Dataset.File.from_files(path=datastore_paths, validate=True)
input1 = train_ds.as_mount()

Använda en ScriptRunConfig för att skicka en experimentkörning till en Synapse Spark-pool

Om du är redo att automatisera och produktionsanpassa dina dataomvandlingsuppgifter kan du skicka en experimentkörning till en bifogad Synapse Spark-pool med ScriptRunConfig-objektet . Om du har en Azure Mašinsko učenje pipeline på liknande sätt kan du använda SynapseSparkStep för att ange din Synapse Spark-pool som beräkningsmål för steget för förberedelse av data i pipelinen. Tillgängligheten för dina data till Synapse Spark-poolen beror på din datamängdstyp.

  • För en FileDataset kan du använda as_hdfs() -metoden. När körningen skickas görs datauppsättningen tillgänglig för Synapse Spark-poolen som ett Hadoop-distribuerat filsystem (HFDS)
  • För en TabularDataset kan du använda as_named_input() metoden

Följande kodexempel

  • Skapar variabeln input2 från FileDataset train_ds, som skapades i föregående kodexempel
  • Skapar variabel output med HDFSOutputDatasetConfiguration klassen . När körningen är klar kan vi med den här klassen spara utdata från körningen som datauppsättning test i datalagringen mydatastore . I Azure Mašinsko učenje-arbetsytan registreras datauppsättningen test under namnetregistered_dataset
  • Konfigurerar inställningar som körningen ska använda för att utföra på Synapse Spark-poolen
  • Definierar ScriptRunConfig-parametrarna till
    • Använda skriptet dataprep.py för körningen
    • Ange vilka data som ska användas som indata och hur du gör dessa data tillgängliga för Synapse Spark-poolen
    • Ange var utdata ska output lagras
from azureml.core import Dataset, HDFSOutputDatasetConfig
from azureml.core.environment import CondaDependencies
from azureml.core import RunConfiguration
from azureml.core import ScriptRunConfig 
from azureml.core import Experiment

input2 = train_ds.as_hdfs()
output = HDFSOutputDatasetConfig(destination=(datastore, "test").register_on_complete(name="registered_dataset")

run_config = RunConfiguration(framework="pyspark")
run_config.target = synapse_compute_name

run_config.spark.configuration["spark.driver.memory"] = "1g" 
run_config.spark.configuration["spark.driver.cores"] = 2 
run_config.spark.configuration["spark.executor.memory"] = "1g" 
run_config.spark.configuration["spark.executor.cores"] = 1 
run_config.spark.configuration["spark.executor.instances"] = 1 

conda_dep = CondaDependencies()
conda_dep.add_pip_package("azureml-core==1.20.0")

run_config.environment.python.conda_dependencies = conda_dep

script_run_config = ScriptRunConfig(source_directory = './code',
                                    script= 'dataprep.py',
                                    arguments = ["--file_input", input2,
                                                 "--output_dir", output],
                                    run_config = run_config)

Mer information om run_config.spark.configuration och allmän Spark-konfiguration finns i SparkConfiguration Class och Apache Sparks konfigurationsdokumentation.

När du har konfigurerat objektet ScriptRunConfig kan du skicka körningen.

from azureml.core import Experiment 

exp = Experiment(workspace=ws, name="synapse-spark") 
run = exp.submit(config=script_run_config) 
run

Mer information, inklusive information om skriptet dataprep.py som används i det här exemplet, finns i exempelanteckningsboken.

När du har förberett dina data kan du använda dem som indata för dina träningsjobb. I kodexemplet ovan anger registered_dataset du som indata för träningsjobb.

Exempelnotebook-filer

I de här exempelanteckningsböckerna finns fler begrepp och demonstrationer av Azure Synapse Analytics och Azure Mašinsko učenje integreringsfunktioner:

Nästa steg