Conector do Apache Spark: SQL Server e SQL do Azure

O Conector do Apache Spark para SQL Server e SQL do Azure é um conector de alto desempenho que permite usar dados transacionais na análise de Big Data e persiste resultados para consultas ou relatórios ad hoc. O conector permite que você use qualquer banco de dados SQL, local ou na nuvem, como uma fonte de dados de entrada ou coletor de dados de saída para trabalhos do Spark.

Esta biblioteca contém o código-fonte para o conector do Apache Spark para SQL Server e SQL do Azure.

O Apache Spark é um mecanismo de análise unificado para processamento de dados em grande escala.

Há duas versões do conector disponíveis por meio do Maven, uma compatível com a versão 2.4.x e uma compatível com a versão 3.0.x. Ambas as versões podem ser encontradas aqui e podem ser importadas usando as coordenadas abaixo:

Connector Coordenada do Maven
Conector compatível com o Spark 2.4.x com.microsoft.azure:spark-mssql-connector:1.0.2
Conector compatível com o Spark 3.0.x com.microsoft.azure:spark-mssql-connector_2.12:1.1.0
Conector compatível com o Spark 3.1.x com.microsoft.azure:spark-mssql-connector_2.12:1.2.0

Crie também o conector com base na origem ou baixe o JAR da seção Versão no GitHub. Para obter as informações mais recentes sobre o conector, confira Repositório GitHub do conector do SQL Spark.

Recursos com suporte

  • Suporte para todas as associações do Spark (Scala, Python, R)
  • Suporte à Guia Chave do AD (Active Directory) e autenticação básica
  • Suporte à gravação do dataframe reordenado
  • Suporte para gravação em instância única do SQL Server e Pool de Dados em Clusters de Big Data do SQL Server
  • Suporte a conector confiável para instância única do SQL Server
Componente Versões com suporte
Apache Spark 2.4.x, 3.0.x, 3.1.x
Scala 2.11, 2.12
Microsoft JDBC Driver para SQL Server 8.4
Microsoft SQL Server SQL Server 2008 ou posterior
Bancos de Dados SQL do Azure Com suporte

Opções Suportadas

O Conector do Apache Spark para SQL Server e SQL do Azure dá suporte às opções definidas aqui: JDBC do DataSource SQL

Além disso, há suporte para as opções a seguir

Opção Padrão Descrição
reliabilityLevel BEST_EFFORT BEST_EFFORT ou NO_DUPLICATES. NO_DUPLICATES implementa uma inserção confiável em cenários de reinicialização do executor
dataPoolDataSource none none implica que o valor não está definido e o conector deve fazer a gravação na instância única do SQL Server. Defina esse valor como o nome da fonte de dados para gravar uma tabela de pool de dados em Clusters de Big Data
isolationLevel READ_COMMITTED Especificar o nível de isolamento
tableLock false Implementa uma inserção com a opção TABLOCK para melhorar o desempenho de gravação
schemaCheckEnabled true Desabilita a verificação estrita de quadro de dados e esquema de tabela SQL quando definido como falso

Outras opções de cópia em massa podem ser definidas como opções no dataframe e serão passadas para APIs bulkcopy na gravação

Comparação de desempenho

O Conector do Apache Spark para SQL Server e Azure SQL é até 15x mais rápido do que o conector JDBC genérico para gravar no SQL Server. As características de desempenho variam em termos de tipo, volume de dados e opções usadas, podendo apresentar variações entre execuções. Os resultados de desempenho a seguir são o tempo necessário para substituir uma tabela SQL por 143,9 milhões de linhas em um dataframe do Spark. O dataframe do Spark é construído lendo a tabela HDFS store_sales gerada usando Benchmark TPCDS do Spark. O tempo para ler store_sales para dataframe foi excluído. A média dos resultados é calculada em três execuções.

Tipo de Conector Opções Descrição Tempo para gravar
JDBCConnector Padrão Conector JDBC genérico com opções padrão 1\.385 segundos
sql-spark-connector BEST_EFFORT Melhor esforço sql-spark-connector com opções padrão 580 segundos
sql-spark-connector NO_DUPLICATES sql-spark-connector confiável 709 segundos
sql-spark-connector BEST_EFFORT + tabLock=true Melhor esforço sql-spark-connector com o bloqueio de tabela habilitado 72 segundos
sql-spark-connector NO_DUPLICATES + tabLock=true sql-spark-connector confiável com o bloqueio de tabela habilitado 198 segundos

Config

  • Spark config: num_executors = 20, executor_memory = '1664 m', executor_cores = 2
  • Data Gen config: scale_factor=50, partitioned_tables=true
  • Arquivo de dados store_sales com número de linhas 143.997.590

Ambiente

Problemas geralmente enfrentados

java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException

Esse problema surge com o uso de uma versão mais antiga do driver mssql (que agora está incluída nesse conector) em seu ambiente do hadoop. Se você usava o conector SQL do Azure anterior e está instalado manualmente os drivers nesse cluster para compatibilidade com a autenticação do Microsoft Entra, é necessário remover esses drivers.

Etapas para corrigir o problema:

  1. Se você estiver usando um ambiente Hadoop genérico, verifique e remova o mssql jar: rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar. Se você está usando o Databricks, adicione um script de inicialização global ou de cluster para remover versões antigas do driver mssql da pasta /databricks/jars ou adicione essa linha a um script existente: rm /databricks/jars/*mssql*

  2. Adicione os pacotes de adal4j e mssql. Por exemplo, você pode usar o Maven, mas qualquer forma deve funcionar.

    Cuidado

    Não instale o conector do SQL Spark dessa maneira.

  3. Adicione a classe de driver à sua configuração de conexão. Por exemplo:

    connectionProperties = {
      `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver`
    }`
    

Para obter mais informações e explicações, confira a resolução para https://github.com/microsoft/sql-spark-connector/issues/26.

Introdução

O Conector do Apache Spark para SQL Server e Azure SQL é baseado na API do Spark DataSourceV1 e na API em Massa do SQL Server e usa a mesma interface do conector JDBC Spark-SQL interno. Essa integração permite que você integre facilmente o conector e migre seus trabalhos do Spark simplesmente atualizando o parâmetro format com com.microsoft.sqlserver.jdbc.spark.

Para incluir o conector em seus projetos, baixe esse repositório e compile o jar usando SBT.

Gravar em uma nova Tabela SQL

Aviso

O modo overwrite primeiro descartará a tabela se ela já existir no banco de dados por padrão. Use essa opção com cuidado para evitar perda de dados inesperada.

Ao usar o modo overwrite, os índices serão perdidos na recriação da tabela se você não usar a opção truncate. , uma tabela columnstore agora seria um heap. Se você quiser manter a indexação existente, especifique também a opção truncate com o valor true. Por exemplo, .option("truncate","true").

server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Acrescentar à Tabela SQL

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

Especificar o nível de isolamento

Por padrão, o conector usará o nível de isolamento READ_COMMITTED ao executar a inserção em massa no banco de dados. Se você quiser substituir o nível de isolamento, use a opção mssqlIsolationLevel, conforme mostrado abaixo.

    .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \

Ler da Tabela SQL

jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password).load()

autenticação do Microsoft Entra

Exemplo do Python com Entidade de Serviço

context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

jdbc_db = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("accessToken", access_token) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Exemplo do Python com Senha do Active Directory

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("authentication", "ActiveDirectoryPassword") \
        .option("user", user_name) \
        .option("password", password) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

Uma dependência necessária deve ser instalada para autenticar usando o Active Directory.

O formato de user ao usar ActiveDirectoryPassword deve ser o formato UPN, por exemplo, username@domainname.com.

Para Scala, o artefato _com.microsoft.aad.adal4j_ precisará ser instalado.

Para Python, a biblioteca _adal_ precisará ser instalada. Isso está disponível por meio de pip.

Verifique os notebooks de exemplo para obter exemplos.

Suporte

O Conector do Apache Spark para SQL do Azure e SQL Server é um projeto de software livre. Esse conector não vem com nenhum suporte da Microsoft. Para problemas com o ou perguntas sobre o conector, crie um problema neste repositório de projeto. A comunidade do conector está ativa e monitorando envios.

Próximas etapas

Acesse o repositório GitHub do conector do SQL Spark.

Para obter mais informações sobre os níveis de isolamento, confira SET TRANSACTION ISOLATION LEVEL (Transact-SQL).