Использование sparklyr

sparklyr — это интерфейс R для Apache Spark. Он предоставляет механизм взаимодействия с Spark с помощью знакомых интерфейсов R. С помощью sparklyr можно использовать определения пакетного задания Spark или интерактивные записные книжки Microsoft Fabric.

sparklyr используется вместе с другими пакетами tidyverse , такими как dplyr. Microsoft Fabric распространяет последнюю стабильную версию sparklyr и tidyverse с каждым выпуском среды выполнения. Их можно импортировать и начать использовать API.

Необходимые компоненты

  • Получение подписки Microsoft Fabric. Или зарегистрируйте бесплатную пробную версию Microsoft Fabric.

  • Войдите в Microsoft Fabric.

  • Используйте переключатель интерфейса в левой части домашней страницы, чтобы перейти на интерфейс Synapse Обработка и анализ данных.

    Screenshot of the experience switcher menu, showing where to select Data Science.

  • Откройте или создайте записную книжку. Узнайте, как использовать записные книжки Microsoft Fabric.

  • Задайте для параметра языка значение SparkR (R), чтобы изменить основной язык.

  • Подключите записную книжку к lakehouse. В левой части нажмите кнопку "Добавить ", чтобы добавить существующее озеро или создать озеро.

Подключение sparklyr в кластер Synapse Spark

Чтобы установить sparklyr соединение, используйте следующий метод spark_connect() подключения. Мы поддерживаем новый метод synapseподключения, который позволяет подключаться к существующему сеансу Spark. Это значительно сокращает время начала сеанса sparklyr . Кроме того, мы внесли этот метод подключения в проект открытый код sparklyr. В method = "synapse"этом случае можно использовать оба sparklyr сеанса и SparkR легко обмениваться данными.

# connect sparklyr to your spark cluster
spark_version <- sparkR.version()
config <- spark_config()
sc <- spark_connect(master = "yarn", version = spark_version, spark_home = "/opt/spark", method = "synapse", config = config)

Использование sparklyr для чтения данных

Новый сеанс Spark не содержит данных. Первым шагом является загрузка данных в память сеанса Spark или указание Spark на расположение данных, чтобы получить доступ к данным по запросу.

# load the sparklyr package
library(sparklyr)

# copy data from R environment to the Spark session's memory
mtcars_tbl <- copy_to(sc, mtcars, "spark_mtcars", overwrite = TRUE)

head(mtcars_tbl)

С помощью sparklyrможно также write и read данные из файла Lakehouse с помощью пути ABFS. Чтобы прочитать и записать в Lakehouse, сначала добавьте его в сеанс. В левой части записной книжки нажмите кнопку "Добавить", чтобы добавить существующий Lakehouse или создать Lakehouse.

Чтобы найти путь ABFS, щелкните правой кнопкой мыши папку "Файлы " в Lakehouse, а затем выберите путь Копировать ABFS. Вставьте путь для замены abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files в этом коде:

temp_csv = "abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files/data/mtcars.csv"

# write the table to your lakehouse using the ABFS path
spark_write_csv(mtcars_tbl, temp_csv, header = TRUE, mode = 'overwrite')

# read the data as CSV from lakehouse using the ABFS path
mtcarsDF <- spark_read_csv(sc, temp_csv) 
head(mtcarsDF)

Использование sparklyr для управления данными

sparklyr предоставляет несколько методов обработки данных внутри Spark с помощью следующих методов:

  • Команды dplyr.
  • SparkSQL
  • Преобразователи функций Spark

Использование dplyr

Для подготовки данных в Spark можно использовать знакомые dplyr команды. Команды выполняются внутри Spark, поэтому нет ненужных передач данных между R и Spark.

Щелкните элемент "Управление данными", dplyr чтобы просмотреть дополнительную документацию по использованию dplyr с Spark.

# count cars by the number of cylinders the engine contains (cyl), order the results descendingly
library(dplyr)

cargroup <- group_by(mtcars_tbl, cyl) %>%
  count() %>%
  arrange(desc(n))

cargroup

sparklyr и dplyr преобразуйте команды R в Spark SQL для нас. Чтобы просмотреть результирующий запрос, выполните show_query()следующие действия:

# show the dplyr commands that are to run against the Spark connection
dplyr::show_query(cargroup)

Использование SQL

Кроме того, можно выполнять sql-запросы непосредственно в таблицах в кластере Spark. Объект spark_connection() реализует интерфейс DBI для Spark, поэтому можно использовать dbGetQuery() для выполнения SQL и возврата результата в виде кадра данных R:

library(DBI)
dbGetQuery(sc, "select cyl, count(*) as n from spark_mtcars
GROUP BY cyl
ORDER BY n DESC")

Использование преобразователей компонентов

Оба предыдущих метода используют инструкции SQL. Spark предоставляет команды, которые упрощают преобразование данных и не используют SQL.

Например, ft_binarizer() команда упрощает создание нового столбца, указывающего, превышает ли значение другого столбца определенное пороговое значение.

Полный список преобразователей компонентов Spark можно найти по sparklyr ссылке -FT.

mtcars_tbl %>% 
  ft_binarizer("mpg", "over_20", threshold = 20) %>% 
  select(mpg, over_20) %>% 
  head(5)

Совместное использование данных между sparklyr и SparkR

При подключении sparklyr к кластеруmethod = "synapse" synapse spark с помощью можно использовать оба sparklyr сеанса и SparkR легко обмениваться данными между ними. Вы можете создать таблицу sparklyr Spark и прочитать ее из SparkR.

# load the sparklyr package
library(sparklyr)

# Create table in `sparklyr`
mtcars_sparklyr <- copy_to(sc, df = mtcars, name = "mtcars_tbl", overwrite = TRUE, repartition = 3L)

# Read table from `SparkR`
mtcars_sparklr <- SparkR::sql("select cyl, count(*) as n
from mtcars_tbl
GROUP BY cyl
ORDER BY n DESC")

head(mtcars_sparklr)

Машинное обучение

Ниже приведен пример, в котором мы используем ml_linear_regression() для соответствия модели линейной регрессии. Мы используем встроенный mtcars набор данных и видим, можно ли прогнозировать потребление топлива автомобиля (mpg) на основе его веса (wt), а также количество цилиндров, содержащих (cyl). Мы предполагаем в каждом случае, что связь между mpg каждым из наших признаков является линейной.

Создание наборов данных тестирования и обучения

Используйте разделение, 70 % для обучения и 30 % для тестирования модели. Игра с этим соотношением приводит к различным моделям.

# split the dataframe into test and training dataframes

partitions <- mtcars_tbl %>%
  select(mpg, wt, cyl) %>% 
  sdf_random_split(training = 0.7, test = 0.3, seed = 2023)

Обучение модели

Обучение модели логистической регрессии.

fit <- partitions$training %>%
  ml_linear_regression(mpg ~ .)

fit

Теперь используйте summary() , чтобы узнать больше о качестве нашей модели, и статистической важности каждого из наших прогнозаторов.

summary(fit)

Использование модели

Модель можно применить к тестовом набору данных, вызвав вызов ml_predict().

pred <- ml_predict(fit, partitions$test)

head(pred)

Список моделей машинного обучения Spark, доступных с помощью sparklyr, см . в справочнике по ml

Отключение от кластера Spark

Вы можете вызвать spark_disconnect() или нажать кнопку "Остановить сеанс " в верхней части ленты записной книжки, завершаемой сеансом Spark.

spark_disconnect(sc)

Дополнительные сведения о функциях R: