Запросы к базам данных с помощью JDBC

Azure Databricks поддерживает подключение к внешним базам данных с помощью JDBC. В этой статье представлен базовый синтаксис для настройки и использования этих подключений с примерами в Python, SQL и Scala.

Внимание

Конфигурации, описанные в этой статье, являются экспериментальными. Экспериментальные функции предоставляются как есть и не поддерживаются Databricks через техническую поддержку клиентов. Чтобы получить полную поддержку федерации запросов, следует вместо этого использовать федерацию Lakehouse, которая позволяет пользователям Azure Databricks воспользоваться синтаксисом каталога Unity и средствами управления данными.

Partner Connect предоставляет оптимизированные интеграции для синхронизации данных со многими внешними источниками данных. См. статью Что такое Databricks Partner Connect?.

Внимание

Примеры в этой статье не содержат имен пользователей и паролей в URL-адресах JDBC. Databricks рекомендует использовать секреты для хранения учетных данных базы данных. Например:

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

Чтобы ссылаться на секреты Databricks с помощью SQL, необходимо настроить свойство конфигурации Spark во время инициализации кластера.

Полный пример управления секретами см. в разделе Пример рабочего процесса с секретами.

Чтение данных с помощью JDBC

Необходимо настроить ряд параметров для чтения данных с помощью JDBC. Обратите внимание, что для каждой базы данных используется другой формат <jdbc-url>.

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark автоматически считывает схему из таблицы базы данных и сопоставляет ее типы с типами Spark SQL.

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

Вы можете выполнять к этой таблице JDBC различные запросы:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

Запись данных с помощью JDBC

Сохранение данных в таблицы с помощью JDBC использует аналогичные конфигурации для чтения. См. следующий пример.

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Поведение по умолчанию пытается создать новую таблицу и выдает ошибку, если таблица с этим именем уже существует.

Можно добавить данные в существующую таблицу с помощью следующего синтаксиса:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

Вы можете перезаписать существующую таблицу с помощью следующего синтаксиса:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

Управление параллелизмом для запросов JDBC

По умолчанию драйвер JDBC запрашивает исходную базу данных только с одним потоком. Чтобы повысить производительность операций чтения, необходимо указать ряд параметров для управления количеством одновременных запросов Azure Databricks в базу данных. Для небольших кластеров установка numPartitions параметра равно количеству ядер исполнителя в кластере гарантирует, что все узлы запрашивают данные параллельно.

Предупреждение

Установка numPartitions высокого значения в большом кластере может привести к отрицательной производительности для удаленной базы данных, так как слишком много одновременных запросов может перегружать службу. Это особенно сложно для баз данных приложений. Будьте осторожны, чтобы задать это значение выше 50.

Примечание.

Ускоряйте запросы, выбрав столбец с индексом, вычисляемым в исходной базе данных.partitionColumn

В следующем примере кода демонстрируется настройка параллелизма для кластера с восемью ядрами:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

Примечание.

Azure Databricks поддерживает все параметры Apache Spark для настройки JDBC.

При записи в базы данных с помощью JDBC Apache Spark использует количество секций в памяти для управления параллелизмом. Перед записью для управления параллелизмом можно пересекать данные. Избегайте большого количества секций в больших кластерах, чтобы избежать перегрузки удаленной базы данных. В следующем примере демонстрируется повторное разделение до восьми секций перед записью:

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

Передача запроса в ядро СУБД

Вы можете передать в базу данных весь запрос и возвратить только результат. Параметр table определяет таблицу JDBC, из которой нужно считать данные. В предложении FROM запроса SQL можно использовать все допустимые элементы.

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Контрольный номер строк, извлекаемых на запрос

Драйверы JDBC имеют fetchSize параметр, который управляет количеством строк, полученных за раз из удаленной базы данных.

Параметр Результат
Слишком низкая Высокая задержка из-за множества циклов (несколько строк, возвращаемых на запрос)
Слишком высокая Ошибка нехватки памяти (слишком много данных, возвращаемых в одном запросе)

Оптимальное значение зависит от рабочей нагрузки. С этим связаны такие аспекты:

  • Сколько столбцов возвращает запрос?
  • Какие типы данных возвращаются?
  • Сколько времени возвращаются строки в каждом столбце?

В системах может быть очень мало значений по умолчанию и преимущества настройки. Например, значение по умолчанию fetchSize Oracle равно 10. Увеличение его до 100 уменьшает количество общих запросов, которые должны выполняться в 10 раз. Результаты JDBC — это сетевой трафик, поэтому избежать очень большого числа, но оптимальные значения могут находиться в тысячах для многих наборов данных.

fetchSize Используйте этот параметр, как показано в следующем примере:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()