Использование SparkR

SparkR — это пакет R, который предоставляет интерфейс с легким весом для использования Apache Spark из R. SparkR предоставляет реализацию распределенного кадра данных, которая поддерживает такие операции, как выбор, фильтрация, агрегирование и т. д. SparkR также поддерживает распределенное машинное обучение с помощью MLlib.

Используйте SparkR с помощью определений пакетного задания Spark или интерактивных записных книжек Microsoft Fabric.

Поддержка R доступна только в Spark3.1 или более поздней версии. R в Spark 2.4 не поддерживается.

Необходимые компоненты

  • Получение подписки Microsoft Fabric. Или зарегистрируйте бесплатную пробную версию Microsoft Fabric.

  • Войдите в Microsoft Fabric.

  • Используйте переключатель интерфейса в левой части домашней страницы, чтобы перейти на интерфейс Synapse Обработка и анализ данных.

    Screenshot of the experience switcher menu, showing where to select Data Science.

  • Откройте или создайте записную книжку. Узнайте, как использовать записные книжки Microsoft Fabric.

  • Задайте для параметра языка значение SparkR (R), чтобы изменить основной язык.

  • Подключите записную книжку к lakehouse. В левой части нажмите кнопку "Добавить ", чтобы добавить существующее озеро или создать озеро.

Чтение и запись кадров данных SparkR

Чтение кадра данных SparkR из локального кадра данных R.frame

Самый простой способ создания кадра данных — преобразовать локальный кадр данных R.frame в кадр данных Spark.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Чтение и запись Кадра данных SparkR из Lakehouse

Данные можно хранить в локальной файловой системе узлов кластера. Общие методы для чтения и записи кадра данных SparkR из Lakehouse и read.df write.df. Эти методы позволяют загрузить файл и тип источника данных. SparkR поддерживает чтение файлов в формате CSV, JSON, файлов текстовом формате и файлов Parquet по умолчанию.

Чтобы прочитать и записать в Lakehouse, сначала добавьте его в сеанс. В левой части записной книжки нажмите кнопку "Добавить", чтобы добавить существующий Lakehouse или создать Lakehouse.

Примечание.

Чтобы получить доступ к файлам Lakehouse с помощью пакетов Spark, таких как read.df или write.df, используйте его путь ADFS или относительный путь для Spark. В обозревателе Lakehouse щелкните правой кнопкой мыши файлы или папку, к которым вы хотите получить доступ, и скопируйте путь ADFS или относительный путь к Spark из контекстного меню.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric предварительно tidyverse установлен. Вы можете получить доступ к файлам Lakehouse в знакомых пакетах R, таких как чтение и запись файлов Lakehouse с помощью readr::read_csv() и readr::write_csv().

Примечание.

Чтобы получить доступ к файлам Lakehouse с помощью пакетов R, необходимо использовать путь к API файлов. В обозревателе Lakehouse щелкните правой кнопкой мыши файл или папку, к которой требуется получить доступ, и скопируйте путь к API файла из контекстного меню.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

Вы также можете прочитать кадр данных SparkR в Lakehouse с помощью запросов SparkSQL.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

Чтение и запись таблиц SQL с помощью RODBC

Используйте RODBC для подключения к базам данных на основе SQL через интерфейс ODBC. Например, можно подключиться к выделенному пулу SQL Synapse, как показано в следующем примере кода. Замените собственные сведения о подключении для <database>, <uid>и <password><table>.

# load RODBC package
library(RODBC)


# config connection string

DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"

ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)


# connect to driver
channel <-odbcDriverConnect(ConnectionString)

# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)

# use SparkR::as.DataFrame to convert R data.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)

Операции с кадрами данных

Кадры данных SparkR поддерживают многие функции для структурированной обработки данных. Ниже приводятся несколько простых примеров. Полный список можно найти в документации по API SparkR.

Выбор строк и столбцов

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Группирование и статистический анализ

Кадры данных SparkR поддерживают множество часто используемых функций для агрегирования данных после группировки. Например, можно вычислить гистограмму времени ожидания в верном наборе данных, как показано ниже.

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Операции столбцов

SparkR предоставляет множество функций, которые можно напрямую применять к столбцам для обработки и агрегирования данных. В следующем примере показано применение базовых арифметических функций.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Применение определяемой пользователем функции

SparkR поддерживает несколько типов определяемых пользователем функций:

Запуск функции в большом наборе данных с dapply помощью или dapplyCollect

dapply

Примените функцию к каждой секции объекта SparkDataFrame. Функция, применяемая к каждой SparkDataFrame секции и должна иметь только один параметр, к которому будет передана функция data.frame, соответствующая каждой секции. Выходные данные функции должны быть data.frame. Схема задает формат строки результирующего SparkDataFrameобъекта. Он должен соответствовать типам данных возвращаемого значения.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

Как и dapply, примените функцию к каждой секции и SparkDataFrame соберите результат обратно. Выходные данные функции должны быть data.frame. Но на этот раз схема не требуется передавать. Обратите внимание, что может завершиться ошибкой, dapplyCollect если выходные данные функции выполняются во всех разделах, не могут быть извлечены в драйвер и помещаются в память драйвера.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

Выполнение функции в большом наборе данных сгруппированием по входным столбцам или gapplygapplyCollect

gapply

Примените функцию к каждой группе объекта SparkDataFrame. Функция должна применяться к каждой SparkDataFrame группе и должна иметь только два параметра: группирование ключей и R data.frame , соответствующих этому ключу. Группы выбираются из SparkDataFrames столбцов. Выходные данные функции должны быть data.frame. Схема задает формат строки результирующего SparkDataFrameобъекта. Она должна представлять выходную схему функции R из типов данных Spark. Имена возвращаемых data.frame столбцов задаются пользователем.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

Например gapply, применяет функцию к каждой SparkDataFrame группе и собирает результат обратно в R data.frame. Выходные данные функции должны быть data.frame. Но для прохождения схемы не требуется. Обратите внимание, что может завершиться ошибкой, gapplyCollect если выходные данные функции выполняются во всех разделах, не могут быть извлечены в драйвер и помещаются в память драйвера.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

Выполнение локальных функций R, распределенных с помощью spark.lapply

spark.lapply

lapply Как и в машинном коде R, spark.lapply выполняет функцию по списку элементов и распределяет вычисления с помощью Spark. Применяет функцию таким образом, как и doParallel lapply к элементам списка. Результаты всех вычислений должны соответствовать одному компьютеру. Если это не так, они могут сделать что-то подобное df <- createDataFrame(list) , а затем использовать dapply.

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

Выполнение запросов SQL из SparkR

Кадр данных SparkR также можно зарегистрировать в качестве временного представления, позволяющего выполнять запросы SQL по своим данным. Функция SQL позволяет приложениям выполнять sql-запросы программным способом и возвращать результат в виде кадра данных SparkR.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

Машинное обучение

SparkR включает большую часть алгоритмов MLLib. SparkR применяет MLlib для обучения модели.

В следующем примере показано, как создать модель GLM Gaussian с помощью SparkR. Чтобы выполнить линейную регрессию, установите для параметра family значение "gaussian". Чтобы выполнить логистическую регрессию, установите для параметра family значение "binomial". При использовании SparkML GLM SparkR автоматически выполняет одноохотливную кодировку категориальных функций, чтобы ее не нужно было выполнять вручную. Помимо функций строкового и двойного типа, также можно разместить над функциями MLlib Vector для совместимости с другими компонентами MLlib.

Дополнительные сведения о поддерживаемых алгоритмах машинного обучения см. в документации по SparkR и MLlib.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)