Conector do Pool de SQL Dedicado do Azure Synapse para Apache Spark

Introdução

O conector do pool de SQL dedicado do Azure Synapse para Apache Spark no Azure Synapse Analytics permite a transferência eficiente de grandes conjuntos de dados entre o runtime do Apache Spark e o Pool de SQL dedicado. O conector é enviado como uma biblioteca padrão com o Workspace do Azure Synapse. O conector é implementado usando a linguagem Scala. O conector é compatível com Scala e Python. Para usar o Conector com outras opções de idioma do notebook, use o comando magic do Spark %%spark.

Em geral, o conector fornece as seguintes funcionalidades:

  • Ler no Pool de SQL Dedicado do Azure Synapse:
    • Leia grandes conjuntos de dados de Tabelas do Pool de SQL Dedicado do Synapse (internas e externas) e exibições.
    • Suporte abrangente ao empilhamento de predicado, em que os filtros no DataFrame são mapeados para o empilhamento de predicado do SQL correspondente.
    • Suporte para remoção de coluna.
    • Suporte para push de consulta.
  • Gravar no Pool de SQL Dedicado do Azure Synapse:
    • Ingerir dados com grande volume para tipos de tabela interno e externo.
    • Dá suporte às seguintes preferências de modo de salvamento do DataFrame:
      • Append
      • ErrorIfExists
      • Ignore
      • Overwrite
    • O tipo de gravação na tabela externa dá suporte ao formato de arquivo Parquet ou de texto delimitado (exemplo – CSV).
    • Para gravar dados em tabelas internas, o conector agora usa instrução COPY em vez da abordagem CETAS/CTAS.
    • Aprimoramentos para otimizar o desempenho da taxa de transferência de gravação de ponta a ponta.
    • Introduz um identificador de retorno de chamada opcional (um argumento da função do Scala) que os clientes podem usar para receber métricas pós-gravação.
      • Alguns exemplos incluem o número de registros, a duração para concluir uma determinada ação e o motivo da falha.

Abordagem de orquestração

Ler

A high-level data flow diagram to describe the connector's orchestration of a read request.

Gravar

A high-level data flow diagram to describe the connector's orchestration of a write request.

Pré-requisitos

Os pré-requisitos, como a configuração dos recursos necessários do Azure e as etapas para configurá-los, são discutidos nesta seção.

Recursos do Azure

Examine e configure os seguintes Recursos do Azure dependentes:

Preparar o banco de dados

Conecte o banco de dados do Pool de SQL Dedicado do Azure Synapse e execute as seguintes instruções de configuração:

  • Crie um usuário de banco de dados mapeado para a Identidade do usuário do Microsoft Entra usada para entrar no Workspace do Azure Synapse.

    CREATE USER [username@domain.com] FROM EXTERNAL PROVIDER;      
    
  • Crie o esquema no qual as tabelas serão definidas, de modo que o Conector possa gravar e ler com êxito as respectivas tabelas.

    CREATE SCHEMA [<schema_name>];
    

Autenticação

Autenticação baseada na ID do Microsoft Entra

A autenticação baseada na ID do Microsoft Entra é uma abordagem de autenticação integrada. O usuário precisa fazer se conectar no workspace do Azure Synapse Analytics.

Autenticação Básica

A abordagem de autenticação básica exige que o usuário configure as opções username e password. Confira a seção Opções de configuração para saber mais sobre os parâmetros de configuração relevantes para leitura e gravação em tabelas no Pool de SQL Dedicado do Azure Synapse.

Autorização

Azure Data Lake Storage Gen2

Há duas maneiras de conceder permissões de acesso ao Azure Data Lake Storage Gen2 – Conta de armazenamento:

  • Função de controle de acesso baseado em função – Função de colaborador de dados do blob de armazenamento
    • Atribuir Storage Blob Data Contributor Role concede às permissões de usuário de leitura, gravação e exclusão dos contêineres do Azure Storage Blob.
    • O RBAC oferece uma abordagem de controle de alta granularidade no nível do contêiner.
  • ACL (Listas de Controle de Acesso)
    • A abordagem de ACL permite controles com baixa granularidade sobre caminhos e/ou arquivos específicos em uma determinada pasta.
    • As verificações de ACL não são impostas se o usuário já tiver concedido permissões usando a abordagem RBAC.
    • Há dois tipos abrangentes de permissões de ACL:
      • Permissões de acesso (aplicadas em um nível ou objeto específico).
      • Permissões padrão (aplicadas automaticamente a todos os objetos filho no momento da criação).
    • Os tipos de permissões incluem:
      • Execute habilita a capacidade de percorrer ou navegar pelas hierarquias de pasta.
      • Read habilita a capacidade de leitura.
      • Write habilita a capacidade de gravação.
    • É importante configurar as ACLs para que o conector possa gravar e ler com êxito nos locais de armazenamento.

Observação

  • Se você quiser executar notebooks usando pipelines do Workspace do Synapse, também precisará conceder as permissões de acesso listadas acima para a identidade gerenciada padrão do Workspace do Synapse. O nome da identidade gerenciada padrão do workspace é o mesmo do workspace.

  • Para usar o workspace do Synapse com contas de armazenamento protegidas, um ponto de extremidade privado gerenciado precisará ser configurado no notebook. O ponto de extremidade privado gerenciado precisa ser aprovado na seção Private endpoint connections da conta de armazenamento do ADLS Gen2 no painel Networking.

Pool de SQL Dedicado do Azure Synapse

Para habilitar a interação com êxito com o pool de SQL dedicado do Azure Synapse, a seguinte autorização será necessária, exceto se você for um usuário também configurado como um Active Directory Admin no Ponto de Extremidade do SQL Dedicado:

  • Cenário de leitura

    • Conceda ao usuário db_exporter usando o procedimento armazenado do sistema sp_addrolemember.

      EXEC sp_addrolemember 'db_exporter', [<your_domain_user>@<your_domain_name>.com];
      
  • Cenário de gravação

    • O conector usa o comando COPY para gravar dados de preparação na localização gerenciada da tabela interna.
      • Configure as permissões necessárias descritas aqui.

      • Veja abaixo um snippet de acesso rápido do mesmo:

        --Make sure your user has the permissions to CREATE tables in the [dbo] schema
        GRANT CREATE TABLE TO [<your_domain_user>@<your_domain_name>.com];
        GRANT ALTER ON SCHEMA::<target_database_schema_name> TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has ADMINISTER DATABASE BULK OPERATIONS permissions
        GRANT ADMINISTER DATABASE BULK OPERATIONS TO [<your_domain_user>@<your_domain_name>.com];
        
        --Make sure your user has INSERT permissions on the target table
        GRANT INSERT ON <your_table> TO [<your_domain_user>@<your_domain_name>.com]
        

Documentação da API

Conector do Pool de SQL Dedicado do Azure Synapse para Apache Spark – Documentação da API.

Opções de configuração

Para inicializar e orquestrar com êxito a operação de leitura ou gravação, o Conector espera determinados parâmetros de configuração. A definição de objeto - com.microsoft.spark.sqlanalytics.utils.Constants fornece uma lista de constantes padronizadas para cada chave de parâmetro.

Veja abaixo a lista de opções de configuração com base no cenário de uso:

  • Leitura usando a autenticação baseada na ID do Microsoft Entra
    • As credenciais são mapeadas automaticamente e o usuário não precisará fornecer as opções de configuração específicas.
    • O argumento de nome de tabela de três partes no método synapsesql é necessário para ler a tabela respectiva no pool de SQL dedicado do Azure Synapse.
  • Ler usando a Autenticação Básica
    • Ponto de Extremidade do SQL dedicado do Azure Synapse
      • Constants.SERVER: ponto de extremidade do Pool do SQL Dedicado do Azure Synapse (FQDN do servidor)
      • Constants.USERSQL_USER_NAME.
      • Constants.PASSWORD: senha de usuário SQL.
    • Ponto de Extremidade Azure Data Lake Storage (Gen 2): pastas de preparo
      • Constants.DATA_SOURCE: o caminho do armazenamento definido no parâmetro de localização da fonte de dados é usado para o preparo de dados.
  • Gravar usando a autenticação baseada na ID do Microsoft Entra
    • Ponto de Extremidade do SQL dedicado do Azure Synapse
      • Por padrão, o Conector infere o ponto de extremidade SQL dedicado do Synapse usando o conjunto de nomes de banco de dados no parâmetro de nome da tabela de três partes do método synapsesql.
      • Como alternativa, os usuários podem usar a opção Constants.SERVER para especificar o ponto de extremidade do SQL. Verifique se o ponto de extremidade hospeda o banco de dados correspondente com o respectivo esquema.
    • Ponto de Extremidade Azure Data Lake Storage (Gen 2): pastas de preparo
      • Para o tipo de tabela interna:
        • Configure a opção Constants.TEMP_FOLDER ou Constants.DATA_SOURCE.
        • Se o usuário escolher fornecer a opção Constants.DATA_SOURCE, a pasta de preparo será derivada com o uso do valor location de DataSource.
        • Se ambas forem fornecidas, o valor da opção Constants.TEMP_FOLDER será usado.
        • Na ausência de uma opção de pasta de preparo, o Conector derivará uma com base na configuração de runtime spark.sqlanalyticsconnector.stagingdir.prefix.
      • Para o tipo de tabela externa:
        • Constants.DATA_SOURCE é uma opção de configuração necessária.
        • O conector usa o caminho de armazenamento definido no parâmetro de localização da fonte de dados em combinação com o argumento location para o método synapsesql e deriva o caminho absoluto para persistir dados de tabela externa.
        • Se o argumento location para o método synapsesql não for especificado, o conector derivará o valor da localização como <base_path>/dbName/schemaName/tableName.
  • Gravar usando a Autenticação Básica
    • Ponto de Extremidade do SQL dedicado do Azure Synapse
      • Constants.SERVER: ponto de extremidade do Pool do SQL Dedicado do Azure Synapse (FQDN do servidor).
      • Constants.USERSQL_USER_NAME.
      • Constants.PASSWORD: senha de usuário SQL.
      • Constants.STAGING_STORAGE_ACCOUNT_KEY associado à Conta de Armazenamento que hospeda Constants.TEMP_FOLDERS (somente tipos de tabela interna) ou Constants.DATA_SOURCE.
    • Ponto de Extremidade Azure Data Lake Storage (Gen 2): pastas de preparo
      • As credenciais de autenticação básica do SQL não se aplicam a pontos de extremidade de armazenamento de acesso.
      • Ou seja, atribua permissões de acesso de armazenamento pertinentes conforme descrito na seção Azure Data Lake Storage Gen2.

Modelos de código

Esta seção apresenta modelos de código de referência para descrever como usar e invocar o Conector do Pool de SQL Dedicado do Azure Synapse para Apache Spark.

Observação

Como usar o conector no Python

  • O conector tem suporte apenas no Python para Spark 3. No Spark 2.4 (sem suporte), podemos usar a API do conector Scala para interagir com o conteúdo de um DataFrame no PySpark usando DataFrame.createOrReplaceTempView ou DataFrame.createOrReplaceGlobalTempView. Veja a seção Usando dados materializados entre células.
  • O identificador de chamada de volta não está disponível no Python.

Ler do pool de SQL dedicado do Azure Synapse

Solicitação de gravação: assinatura de leitura synapsesql

synapsesql(tableName:String="") => org.apache.spark.sql.DataFrame

Ler de uma tabela usando a autenticação baseada na ID do Microsoft Entra

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)

//Show contents of the dataframe
dfToReadFromTable.show()

Ler de uma consulta usando a autenticação baseada na ID do Microsoft Entra

Observação

Restrições ao ler da consulta:

  • O nome da tabela e a consulta não podem ser especificados ao mesmo tempo.
  • Somente consultas selecionadas são permitidas. SQLs DDL e DML não são permitidos.
  • As opções de seleção e filtro no dataframe não são enviadas por push para o pool dedicado do SQL quando uma consulta é especificada.
  • A leitura de uma consulta só está disponível no Spark 3.1 e 3.2.
//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._


// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
    // Name of the SQL Dedicated Pool or database where to run the query
    // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .option(Constants.QUERY, "select <column_name>, count(*) as cnt from <schema_name>.<table_name> group by <column_name>")
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     // Name of the SQL Dedicated Pool or database where to run the query
     // Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
     option(Constants.DATABASE, "<database_name>")
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Defaults to storage path defined in the runtime configurations
    option(Constants.TEMP_FOLDER, "abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<some_base_path_for_temporary_staging_folders>")
    //query from which data will be read
    .synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")


//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

Ler de uma tabela usando a autenticação básica

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Read from existing internal table
val dfToReadFromTable:DataFrame = spark.read.
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the table will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Three-part table name from where data will be read.
    synapsesql("<database_name>.<schema_name>.<table_name>").
    //Column-pruning i.e., query select column values.
    select("<some_column_1>", "<some_column_5>", "<some_column_n>"). 
    //Push-down filter criteria that gets translated to SQL Push-down Predicates.    
    filter(col("Title").startsWith("E")).
    //Fetch a sample of 10 records 
    limit(10)
    

//Show contents of the dataframe
dfToReadFromTable.show()

Ler de uma consulta usando a autenticação básica

//Use case is to read data from an internal table in Synapse Dedicated SQL Pool DB
//Azure Active Directory based authentication approach is preferred here.
import org.apache.spark.sql.DataFrame
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

// Name of the SQL Dedicated Pool or database where to run the query
// Database can be specified as a Spark Config or as a Constant - Constants.DATABASE
spark.conf.set("spark.sqlanalyticsconnector.dw.database", "<database_name>")

// Read from a query
// Query can be provided either as an argument to synapsesql or as a Constant - Constants.QUERY
val dfToReadFromQueryAsOption:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    option(Constants.QUERY, "select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>" ).
    synapsesql()

val dfToReadFromQueryAsArgument:DataFrame = spark.read.
     //Name of the SQL Dedicated Pool or database where to run the query
     //Database can be specified as a Spark Config - spark.sqlanalyticsconnector.dw.database or as a Constant - Constants.DATABASE
      option(Constants.DATABASE, "<database_name>").
    //If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument 
    //to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
    option(Constants.SERVER, "<sql-server-name>.sql.azuresynapse.net").
    //Set database user name
    option(Constants.USER, "<user_name>").
    //Set user's password to the database
    option(Constants.PASSWORD, "<user_password>").
    //Set name of the data source definition that is defined with database scoped credentials.
    //Data extracted from the SQL query will be staged to the storage path defined on the data source's location setting.
    option(Constants.DATA_SOURCE, "<data_source_name>").
    //Query where data will be read.  
    synapsesql("select <column_name>, count(*) as counts from <schema_name>.<table_name> group by <column_name>")
    

//Show contents of the dataframe
dfToReadFromQueryAsOption.show()
dfToReadFromQueryAsArgument.show()

Gravar no pool de SQL dedicado do Azure Synapse

Solicitação de gravação: assinatura de método synapsesql

A assinatura do método da versão do Conector criada para Spark 2.4.8 tem um argumento a menos do que o aplicado na versão do Spark 3.1.2. Veja abaixo as duas assinaturas de método:

  • Pool do Spark versão 2.4.8
synapsesql(tableName:String, 
           tableType:String = Constants.INTERNAL, 
           location:Option[String] = None):Unit
  • Pool do Spark versão 3.1.2
synapsesql(tableName:String, 
           tableType:String = Constants.INTERNAL, 
           location:Option[String] = None,
           callBackHandle=Option[(Map[String, Any], Option[Throwable])=>Unit]):Unit

Gravar usando a autenticação baseada na ID do Microsoft Entra

Abaixo está um modelo de código abrangente descreve como usar o Conector para cenários de gravação:

//Add required imports
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import com.microsoft.spark.sqlanalytics.utils.Constants
import org.apache.spark.sql.SqlAnalyticsConnector._

//Define read options for example, if reading from CSV source, configure header and delimiter options.
val pathToInputSource="abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_folder>/<some_dataset>.csv"

//Define read configuration for the input CSV
val dfReadOptions:Map[String, String] = Map("header" -> "true", "delimiter" -> ",")

//Initialize DataFrame that reads CSV data from a given source 
val readDF:DataFrame=spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(1000) //Reads first 1000 rows from the source CSV input.

//Setup and trigger the read DataFrame for write to Synapse Dedicated SQL Pool.
//Fully qualified SQL Server DNS name can be obtained using one of the following methods:
//    1. Synapse Workspace - Manage Pane - SQL Pools - <Properties view of the corresponding Dedicated SQL Pool>
//    2. From Azure Portal, follow the bread-crumbs for <Portal_Home> -> <Resource_Group> -> <Dedicated SQL Pool> and then go to Connection Strings/JDBC tab. 
//If `Constants.SERVER` is not provided, the value will be inferred by using the `database_name` in the three-part table name argument to the `synapsesql` method.
//Like-wise, if `Constants.TEMP_FOLDER` is not provided, the connector will use the runtime staging directory config (see section on Configuration Options for details).
val writeOptionsWithAADAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                            Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Setup optional callback/feedback function that can receive post write metrics of the job performed.
var errorDuringWrite:Option[Throwable] = None
val callBackFunctionToReceivePostWriteMetrics: (Map[String, Any], Option[Throwable]) => Unit =
    (feedback: Map[String, Any], errorState: Option[Throwable]) => {
    println(s"Feedback map - ${feedback.map{case(key, value) => s"$key -> $value"}.mkString("{",",\n","}")}")
    errorDuringWrite = errorState
}

//Configure and submit the request to write to Synapse Dedicated SQL Pool (note - default SaveMode is set to ErrorIfExists)
//Sample below is using AAD-based authentication approach; See further examples to leverage SQL Basic auth.
readDF.
    write.
    //Configure required configurations.
    options(writeOptionsWithAADAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite).
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL, 
                //Optional parameter that is used to specify external table's base folder; defaults to `database_name/schema_name/table_name`
                location = None, 
                //Optional parameter to receive a callback.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

//If write request has failed, raise an error and fail the Cell's execution.
if(errorDuringWrite.isDefined) throw errorDuringWrite.get

Gravar usando a Autenticação Básica

O snippet de código a seguir substitui a definição de gravação descrita na seção Gravar usando a autenticação baseada na ID do Microsoft Entra, para enviar a solicitação de gravação usando a abordagem de autenticação básica do SQL:

//Define write options to use SQL basic authentication
val writeOptionsWithBasicAuth:Map[String, String] = Map(Constants.SERVER -> "<dedicated-pool-sql-server-name>.sql.azuresynapse.net",
                                           //Set database user name
                                           Constants.USER -> "<user_name>",
                                           //Set database user's password
                                           Constants.PASSWORD -> "<user_password>",
                                           //Required only when writing to an external table. For write to internal table, this can be used instead of TEMP_FOLDER option.
                                           Constants.DATA_SOURCE -> "<Name of the datasource as defined in the target database>"
                                           //To be used only when writing to internal tables. Storage path will be used for data staging.
                                           Constants.TEMP_FOLDER -> "abfss://<storage_container_name>@<storage_account_name>.dfs.core.windows.net/<some_temp_folder>")

//Configure and submit the request to write to Synapse Dedicated SQL Pool. 
readDF.
    write.
    options(writeOptionsWithBasicAuth).
    //Choose a save mode that is apt for your use case.
    mode(SaveMode.Overwrite). 
    synapsesql(tableName = "<database_name>.<schema_name>.<table_name>", 
                //For external table type value is Constants.EXTERNAL
                tableType = Constants.INTERNAL,
                //Not required for writing to an internal table 
                location = None,
                //Optional parameter.
                callBackHandle = Some(callBackFunctionToReceivePostWriteMetrics))

Em uma abordagem de autenticação básica, para ler dados de um caminho de armazenamento de origem, outras opções de configuração serão necessárias. O seguinte snippet de código fornece um exemplo para ler de uma fonte de dados do Azure Data Lake Storage Gen2 usando as credenciais da Entidade de Serviço:

//Specify options that Spark runtime must support when interfacing and consuming source data
val storageAccountName="<storageAccountName>"
val storageContainerName="<storageContainerName>"
val subscriptionId="<AzureSubscriptionID>"
val spnClientId="<ServicePrincipalClientID>"
val spnSecretKeyUsedAsAuthCred="<spn_secret_key_value>"
val dfReadOptions:Map[String, String]=Map("header"->"true",
                                "delimiter"->",", 
                                "fs.defaultFS" -> s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net",
                                s"fs.azure.account.auth.type.$storageAccountName.dfs.core.windows.net" -> "OAuth",
                                s"fs.azure.account.oauth.provider.type.$storageAccountName.dfs.core.windows.net" -> 
                                    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
                                "fs.azure.account.oauth2.client.id" -> s"$spnClientId",
                                "fs.azure.account.oauth2.client.secret" -> s"$spnSecretKeyUsedAsAuthCred",
                                "fs.azure.account.oauth2.client.endpoint" -> s"https://login.microsoftonline.com/$subscriptionId/oauth2/token",
                                "fs.AbstractFileSystem.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.Abfs",
                                "fs.abfss.impl" -> "org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem")
//Initialize the Storage Path string, where source data is maintained/kept.
val pathToInputSource=s"abfss://$storageContainerName@$storageAccountName.dfs.core.windows.net/<base_path_for_source_data>/<specific_file (or) collection_of_files>"
//Define data frame to interface with the data source
val df:DataFrame = spark.
            read.
            options(dfReadOptions).
            csv(pathToInputSource).
            limit(100)

Modos de salvamento do DataFrame com suporte

Há suporte aos seguintes modos de salvamento ao gravar dados de origem em uma tabela de destino no pool de SQL dedicado do Azure Synapse:

  • ErrorIfExists (modo de salvamento padrão)
    • Se a tabela de destino existir, a gravação será anulada com uma exceção retornada ao receptor. Caso contrário, uma tabela será criada com os dados das pastas de preparo.
  • Ignorar
    • Se a tabela de destino existir, a gravação vai ignorar a solicitação de gravação sem retornar um erro. Caso contrário, uma tabela será criada com os dados das pastas de preparo.
  • Overwrite
    • Se a tabela de destino existir, os dados existentes no destino serão substituídos pelos dados das pastas de preparo. Caso contrário, uma tabela será criada com os dados das pastas de preparo.
  • Acrescentar
    • Se a tabela de destino existir, os novos dados serão acrescentados a ela. Caso contrário, uma tabela será criada com os dados das pastas de preparo.

Gravar identificador de retorno de chamada de solicitação

As novas alterações da API do caminho de gravação apresentaram um recurso experimental para fornecer ao cliente um mapa de chave/valor das métricas de pós-gravação. As chaves para as métricas são definidas na nova definição de objeto - Constants.FeedbackConstants. As métricas podem ser recuperadas como uma cadeia de caracteres JSON passando o identificador de retorno de chamada (uma Scala Function). Abaixo há uma assinatura de função:

//Function signature is expected to have two arguments - a `scala.collection.immutable.Map[String, Any]` and an Option[Throwable]
//Post-write if there's a reference of this handle passed to the `synapsesql` signature, it will be invoked by the closing process.
//These arguments will have valid objects in either Success or Failure case. In case of Failure the second argument will be a `Some(Throwable)`.
(Map[String, Any], Option[Throwable]) => Unit

A seguir estão algumas métricas notáveis (apresentadas em letras concatenadas):

  • WriteFailureCause
  • DataStagingSparkJobDurationInMilliseconds
  • NumberOfRecordsStagedForSQLCommit
  • SQLStatementExecutionDurationInMilliseconds
  • rows_processed

Veja abaixo uma cadeia de caracteres JSON de exemplo com métricas de pós-gravação:

{
 SparkApplicationId -> <spark_yarn_application_id>,
 SQLStatementExecutionDurationInMilliseconds -> 10113,
 WriteRequestReceivedAtEPOCH -> 1647523790633,
 WriteRequestProcessedAtEPOCH -> 1647523808379,
 StagingDataFileSystemCheckDurationInMilliseconds -> 60,
 command -> "COPY INTO [schema_name].[table_name] ...",
 NumberOfRecordsStagedForSQLCommit -> 100,
 DataStagingSparkJobEndedAtEPOCH -> 1647523797245,
 SchemaInferenceAssertionCompletedAtEPOCH -> 1647523790920,
 DataStagingSparkJobDurationInMilliseconds -> 5252,
 rows_processed -> 100,
 SaveModeApplied -> TRUNCATE_COPY,
 DurationInMillisecondsToValidateFileFormat -> 75,
 status -> Completed,
 SparkApplicationName -> <spark_application_name>,
 ThreePartFullyQualifiedTargetTableName -> <database_name>.<schema_name>.<table_name>,
 request_id -> <query_id_as_retrieved_from_synapse_dedicated_sql_db_query_reference>,
 StagingFolderConfigurationCheckDurationInMilliseconds -> 2,
 JDBCConfigurationsSetupAtEPOCH -> 193,
 StagingFolderConfigurationCheckCompletedAtEPOCH -> 1647523791012,
 FileFormatValidationsCompletedAtEPOCHTime -> 1647523790995,
 SchemaInferenceCheckDurationInMilliseconds -> 91,
 SaveModeRequested -> Overwrite,
 DataStagingSparkJobStartedAtEPOCH -> 1647523791993,
 DurationInMillisecondsTakenToGenerateWriteSQLStatements -> 4
}

Mais exemplos de código

Usando dados materializados entre células

O createOrReplaceTempView do Spark DataFrame poderá ser usado para acessar dados obtidos em outra célula registrando uma exibição temporária.

  • Célula onde os dados efetuam fetch (por exemplo, com preferência de linguagem do Notebook como Scala)
    //Necessary imports
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.SaveMode
    import com.microsoft.spark.sqlanalytics.utils.Constants
    import org.apache.spark.sql.SqlAnalyticsConnector._
    
    //Configure options and read from Synapse Dedicated SQL Pool.
    val readDF = spark.read.
        //Set Synapse Dedicated SQL End Point name.
        option(Constants.SERVER, "<synapse-dedicated-sql-end-point>.sql.azuresynapse.net").
        //Set database user name.
        option(Constants.USER, "<user_name>").
        //Set database user's password. 
        option(Constants.PASSWORD, "<user_password>").
        //Set name of the data source definition that is defined with database scoped credentials.
        option(Constants.DATA_SOURCE,"<data_source_name>").
        //Set the three-part table name from which the read must be performed.
        synapsesql("<database_name>.<schema_name>.<table_name>").
        //Optional - specify number of records the DataFrame would read.
        limit(10)
    //Register the temporary view (scope - current active Spark Session)
    readDF.createOrReplaceTempView("<temporary_view_name>")
  • Agora, altere a preferência de linguagem no Notebook para PySpark (Python) e efetue fetch de dados da exibição registrada <temporary_view_name>
        spark.sql("select * from <temporary_view_name>").show()

Tratamento de resposta

A invocação de synapsesql tem dois estados finais possíveis : êxito ou com falha. Esta seção descreve como tratar a resposta da solicitação de cada cenário.

Ler a resposta da solicitação

Após a conclusão, o snippet de resposta de leitura será exibido na saída da célula. A falha na célula atual também cancelará as execuções de células subsequentes. As informações detalhadas de erros estarão disponíveis nos Logs do Aplicativo Spark.

Gravar a resposta da solicitação

Por padrão, uma resposta de gravação é impressa na saída da célula. Em caso de falha, a célula atual será marcada como com falha e as execuções de células subsequentes serão anuladas. A outra abordagem é passar a opção identificador de retorno de chamada para o método synapsesql. O identificador de retorno de chamada fornecerá acesso programático à resposta de gravação.

Outras considerações

  • Ao ler as tabelas do Pool de SQL Dedicado do Azure Synapse:
    • Considere aplicar os filtros necessários no DataFrame para aproveitar o recurso de remoção de coluna do Conector.
    • O cenário de leitura não dá suporte à cláusula TOP(n-rows) ao enquadrar as instruções de consulta SELECT. A opção para limitar os dados é usar a cláusula limit(.) do DataFrame.
  • Ao gravar em tabelas do pool de SQL dedicado do Azure Synapse:
    • Para tipos de tabela interna:
      • As tabelas são criadas com distribuição de dados ROUND_ROBIN.
      • Os tipos de coluna são inferidos do DataFrame que lê os dados da origem. As colunas de cadeia de caracteres são mapeadas para NVARCHAR(4000).
    • Para os tipos de tabelas externas:
      • O paralelismo inicial do DataFrame orienta a organização dos dados para a tabela externa.
      • Os tipos de coluna são inferidos do DataFrame que lê os dados da origem.
    • Uma melhor distribuição de dados entre os executores poderá ser obtida ajustando spark.sql.files.maxPartitionBytes e o parâmetro repartition do DataFrame.
    • Ao gravar grandes conjuntos de dados, é importante considerar o impacto da configuração do Nível de desempenho de DWU que limita o tamanho da transação.
  • Monitore as tendências de utilização do Azure Data Lake Storage Gen2 para detectar os comportamentos de limitação que podem afetar o desempenho de leitura e gravação.

Referências