Inviare processi Spark usando strumenti da riga di comando

Si applica a: SQL Server 2019 (15.x)

Questo articolo fornisce indicazioni su come usare gli strumenti da riga di comando per eseguire processi Spark in cluster Big Data di SQL Server.

Importante

Il componente aggiuntivo per i cluster Big Data di Microsoft SQL Server 2019 verrà ritirato. Il supporto per i cluster Big Data di SQL Server 2019 terminerà il 28 febbraio 2025. Tutti gli utenti esistenti di SQL Server 2019 con Software Assurance saranno completamente supportati nella piattaforma e fino a quel momento il software continuerà a ricevere aggiornamenti cumulativi di SQL Server. Per altre informazioni, vedere il post di blog relativo all'annuncio e Opzioni per i Big Data nella piattaforma Microsoft SQL Server.

Prerequisiti

  • Strumenti Big Data di SQL Server 2019 configurati e connessi al cluster:
    • azdata
    • Un'applicazione curl per eseguire chiamate API REST a Livy

Processi Spark che usano azdata o Livy

Questo articolo fornisce esempi relativi all'uso di criteri da riga di comando per inviare applicazioni Spark a cluster Big Data di SQL Server.

I comandi azdata bdc spark dell'interfaccia della riga di comando di Azure Data consentono di visualizzare sulla riga di comando tutte le funzionalità di Spark per cluster Big Data di SQL Server. Questo articolo è incentrato sull'invio di processi. Ma azdata bdc spark supporta anche le modalità interattive per Python, Scala, SQL e R tramite il comando azdata bdc spark session.

Se è necessaria l'integrazione diretta con un'API REST, usare le chiamate Livy standard per inviare processi. Questo articolo usa lo strumento da riga di comando curl negli esempi di Livy per eseguire la chiamata API REST. Per un esempio dettagliato che illustra come interagire con l'endpoint Livy di Spark usando il codice Python, vedere Usare Spark dall'endpoint Livy in GitHub.

Applicazione di estrazione, trasformazione e caricamento (ETL) semplice che usa Spark per cluster Big Data

Questa applicazione di estrazione, trasformazione e caricamento (ETL) segue un modello comune di ingegneria dei dati. Carica i dati tabulari da un percorso della zona di destinazione di Apache Hadoop Distributed File System (HDFS). Usa quindi un formato tabella per scrivere in un percorso della zona elaborato da HDFS.

Scaricare il set di dati dell'applicazione di esempio. Creare quindi applicazioni PySpark usando PySpark, Spark Scala o Spark SQL.

Nelle sezioni seguenti sono disponibili esercizi di esempio per ogni soluzione. Selezionare la scheda relativa alla piattaforma in uso. L'applicazione verrà eseguita usando azdata o curl.

In questo esempio viene usata l'applicazione PySpark seguente. Viene salvato come file Python denominato parquet_etl_sample.py nel computer locale.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read clickstream_data from storage pool HDFS into a Spark data frame. Applies column renames.
df = spark.read.option("inferSchema", "true").csv('/securelake/landing/criteo/test.txt', sep='\t', 
    header=False).toDF("feat1","feat2","feat3","feat4","feat5","feat6","feat7","feat8",
    "feat9","feat10","feat11","feat12","feat13","catfeat1","catfeat2","catfeat3","catfeat4",
    "catfeat5","catfeat6","catfeat7","catfeat8","catfeat9","catfeat10","catfeat11","catfeat12",
    "catfeat13","catfeat14","catfeat15","catfeat16","catfeat17","catfeat18","catfeat19",
    "catfeat20","catfeat21","catfeat22","catfeat23","catfeat24","catfeat25","catfeat26")

# Print the data frame inferred schema
df.printSchema()

tot_rows = df.count()
print("Number of rows:", tot_rows)

# Drop the managed table
spark.sql("DROP TABLE dl_clickstream")

# Write data frame to HDFS managed table by using optimized Delta Lake table format
df.write.format("parquet").mode("overwrite").saveAsTable("dl_clickstream")

print("Sample ETL pipeline completed")

Copiare l'applicazione PySpark in HDFS

Archiviare l'applicazione in HDFS in modo che il cluster possa accedervi per l'esecuzione. Come procedura consigliata, standardizzare e gestire i percorsi delle applicazioni all'interno del cluster per semplificare l'amministrazione.

In questo caso d'uso di esempio tutte le applicazioni della pipeline ETL vengono archiviate nel percorso hdfs:/apps/ETL-Pipelines. L'applicazione di esempio viene archiviata in hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py.

Eseguire il comando seguente per caricare parquet_etl_sample.py dal computer di sviluppo locale o di gestione temporanea nel cluster HDFS.

azdata bdc hdfs cp --from-path parquet_etl_sample.py  --to-path "hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py"

Eseguire l'applicazione Spark

Usare il comando seguente per inviare l'applicazione a Spark per cluster Big Data di SQL Server per l'esecuzione.

Il comando azdata esegue l'applicazione usando parametri comunemente specificati. Per tutte le opzioni dei parametri relativi a azdata bdc spark batch create, vedere azdata bdc spark.

Questa applicazione richiede il parametro di configurazione spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation. Il comando usa quindi l'opzione --config. Questa configurazione illustra come passare le configurazioni alla sessione Spark.

È possibile usare l'opzione --config per specificare più parametri di configurazione. È anche possibile specificarli all'interno della sessione dell'applicazione impostando la configurazione nell'oggetto SparkSession.

azdata bdc spark batch create -f hdfs:/apps/ETL-Pipelines/parquet_etl_sample.py \
--config '{"spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation":"true"}' \
-n MyETLPipelinePySpark --executor-count 2 --executor-cores 2 --executor-memory 1664m

Avviso

Il parametro "name" o "n" per il nome del batch deve essere univoco ogni volta che si crea un nuovo batch.

Monitorare i processi Spark

I comandi azdata bdc spark batch forniscono azioni di gestione per i processi batch Spark.

Per elencare tutti i processi in esecuzione, eseguire il comando seguente.

  • Il comando azdata:

    azdata bdc spark batch list -o table
    
  • Comando curl, con Livy:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches
    

Per ottenere informazioni relative a un batch Spark con l'ID specificato, eseguire il comando seguente. Il valore di batch id viene restituito da spark batch create.

  • Il comando azdata:

    azdata bdc spark batch info --batch-id 0
    
  • Comando curl, con Livy:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>
    

Per ottenere informazioni sullo stato relative a un batch Spark con l'ID specificato, eseguire il comando seguente.

  • Il comando azdata:

    azdata bdc spark batch state --batch-id 0
    
  • Comando curl, con Livy:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/state
    

Per ottenere i log relativi a un batch Spark con l'ID specificato, eseguire il comando seguente.

  • Il comando azdata:

    azdata bdc spark batch log --batch-id 0
    
  • Comando curl, con Livy:

    curl -k -u <USER>:<PASSWORD> -X POST <LIVY_ENDPOINT>/batches/<BATCH_ID>/log
    

Passaggi successivi

Per informazioni sulla risoluzione dei problemi relativi al codice Spark, vedere Risolvere i problemi di un notebook PySpark.

Il codice di esempio Spark completo è disponibile negli esempi di Spark per cluster Big Data di SQL Server in GitHub.

Per altre informazioni sui cluster Big Data di SQL Server e sugli scenari correlati, vedere Cluster Big Data di SQL Server.