Запросы к базам данных с помощью JDBC
Azure Databricks поддерживает подключение к внешним базам данных с помощью JDBC. В этой статье представлен базовый синтаксис для настройки и использования этих подключений с примерами в Python, SQL и Scala.
Внимание
Конфигурации, описанные в этой статье, являются экспериментальными. Экспериментальные функции предоставляются как есть и не поддерживаются Databricks через техническую поддержку клиентов. Чтобы получить полную поддержку федерации запросов, следует вместо этого использовать федерацию Lakehouse, которая позволяет пользователям Azure Databricks воспользоваться синтаксисом каталога Unity и средствами управления данными.
Partner Connect предоставляет оптимизированные интеграции для синхронизации данных со многими внешними источниками данных. См. статью Что такое Databricks Partner Connect?.
Внимание
Примеры в этой статье не содержат имен пользователей и паролей в URL-адресах JDBC. Databricks рекомендует использовать секреты для хранения учетных данных базы данных. Например:
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
Чтобы ссылаться на секреты Databricks с помощью SQL, необходимо настроить свойство конфигурации Spark во время инициализации кластера.
Полный пример управления секретами см. в разделе Пример рабочего процесса с секретами.
Чтение данных с помощью JDBC
Необходимо настроить ряд параметров для чтения данных с помощью JDBC. Обратите внимание, что для каждой базы данных используется другой формат <jdbc-url>
.
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark автоматически считывает схему из таблицы базы данных и сопоставляет ее типы с типами Spark SQL.
Python
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
Вы можете выполнять к этой таблице JDBC различные запросы:
Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
Запись данных с помощью JDBC
Сохранение данных в таблицы с помощью JDBC использует аналогичные конфигурации для чтения. См. следующий пример.
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Поведение по умолчанию пытается создать новую таблицу и выдает ошибку, если таблица с этим именем уже существует.
Можно добавить данные в существующую таблицу с помощью следующего синтаксиса:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
Вы можете перезаписать существующую таблицу с помощью следующего синтаксиса:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
Управление параллелизмом для запросов JDBC
По умолчанию драйвер JDBC запрашивает исходную базу данных только с одним потоком. Чтобы повысить производительность операций чтения, необходимо указать ряд параметров для управления количеством одновременных запросов Azure Databricks в базу данных. Для небольших кластеров установка numPartitions
параметра равно количеству ядер исполнителя в кластере гарантирует, что все узлы запрашивают данные параллельно.
Предупреждение
Установка numPartitions
высокого значения в большом кластере может привести к отрицательной производительности для удаленной базы данных, так как слишком много одновременных запросов может перегружать службу. Это особенно сложно для баз данных приложений. Будьте осторожны, чтобы задать это значение выше 50.
Примечание.
Ускоряйте запросы, выбрав столбец с индексом, вычисляемым в исходной базе данных.partitionColumn
В следующем примере кода демонстрируется настройка параллелизма для кластера с восемью ядрами:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Примечание.
Azure Databricks поддерживает все параметры Apache Spark для настройки JDBC.
При записи в базы данных с помощью JDBC Apache Spark использует количество секций в памяти для управления параллелизмом. Перед записью для управления параллелизмом можно пересекать данные. Избегайте большого количества секций в больших кластерах, чтобы избежать перегрузки удаленной базы данных. В следующем примере демонстрируется повторное разделение до восьми секций перед записью:
Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Передача запроса в ядро СУБД
Вы можете передать в базу данных весь запрос и возвратить только результат. Параметр table
определяет таблицу JDBC, из которой нужно считать данные. В предложении FROM
запроса SQL можно использовать все допустимые элементы.
Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
Контрольный номер строк, извлекаемых на запрос
Драйверы JDBC имеют fetchSize
параметр, который управляет количеством строк, полученных за раз из удаленной базы данных.
Параметр | Результат |
---|---|
Слишком низкая | Высокая задержка из-за множества циклов (несколько строк, возвращаемых на запрос) |
Слишком высокая | Ошибка нехватки памяти (слишком много данных, возвращаемых в одном запросе) |
Оптимальное значение зависит от рабочей нагрузки. С этим связаны такие аспекты:
- Сколько столбцов возвращает запрос?
- Какие типы данных возвращаются?
- Сколько времени возвращаются строки в каждом столбце?
В системах может быть очень мало значений по умолчанию и преимущества настройки. Например, значение по умолчанию fetchSize
Oracle равно 10. Увеличение его до 100 уменьшает количество общих запросов, которые должны выполняться в 10 раз. Результаты JDBC — это сетевой трафик, поэтому избежать очень большого числа, но оптимальные значения могут находиться в тысячах для многих наборов данных.
fetchSize
Используйте этот параметр, как показано в следующем примере:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()