Использование SparkR
SparkR — это пакет R, который предоставляет интерфейс с легким весом для использования Apache Spark из R. SparkR предоставляет реализацию распределенного кадра данных, которая поддерживает такие операции, как выбор, фильтрация, агрегирование и т. д. SparkR также поддерживает распределенное машинное обучение с помощью MLlib.
Используйте SparkR с помощью определений пакетного задания Spark или интерактивных записных книжек Microsoft Fabric.
Поддержка R доступна только в Spark3.1 или более поздней версии. R в Spark 2.4 не поддерживается.
Необходимые компоненты
Получение подписки Microsoft Fabric. Или зарегистрируйте бесплатную пробную версию Microsoft Fabric.
Войдите в Microsoft Fabric.
Используйте переключатель интерфейса в левой части домашней страницы, чтобы перейти на интерфейс Synapse Обработка и анализ данных.
Откройте или создайте записную книжку. Узнайте, как использовать записные книжки Microsoft Fabric.
Задайте для параметра языка значение SparkR (R), чтобы изменить основной язык.
Подключите записную книжку к lakehouse. В левой части нажмите кнопку "Добавить ", чтобы добавить существующее озеро или создать озеро.
Чтение и запись кадров данных SparkR
Чтение кадра данных SparkR из локального кадра данных R.frame
Самый простой способ создания кадра данных — преобразовать локальный кадр данных R.frame в кадр данных Spark.
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
Чтение и запись Кадра данных SparkR из Lakehouse
Данные можно хранить в локальной файловой системе узлов кластера. Общие методы для чтения и записи кадра данных SparkR из Lakehouse и read.df
write.df
. Эти методы позволяют загрузить файл и тип источника данных. SparkR поддерживает чтение файлов в формате CSV, JSON, файлов текстовом формате и файлов Parquet по умолчанию.
Чтобы прочитать и записать в Lakehouse, сначала добавьте его в сеанс. В левой части записной книжки нажмите кнопку "Добавить", чтобы добавить существующий Lakehouse или создать Lakehouse.
Примечание.
Чтобы получить доступ к файлам Lakehouse с помощью пакетов Spark, таких как read.df
или write.df
, используйте его путь ADFS или относительный путь для Spark. В обозревателе Lakehouse щелкните правой кнопкой мыши файлы или папку, к которым вы хотите получить доступ, и скопируйте путь ADFS или относительный путь к Spark из контекстного меню.
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
Microsoft Fabric предварительно tidyverse
установлен. Вы можете получить доступ к файлам Lakehouse в знакомых пакетах R, таких как чтение и запись файлов Lakehouse с помощью readr::read_csv()
и readr::write_csv()
.
Примечание.
Чтобы получить доступ к файлам Lakehouse с помощью пакетов R, необходимо использовать путь к API файлов. В обозревателе Lakehouse щелкните правой кнопкой мыши файл или папку, к которой требуется получить доступ, и скопируйте путь к API файла из контекстного меню.
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
Вы также можете прочитать кадр данных SparkR в Lakehouse с помощью запросов SparkSQL.
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
Чтение и запись таблиц SQL с помощью RODBC
Используйте RODBC для подключения к базам данных на основе SQL через интерфейс ODBC. Например, можно подключиться к выделенному пулу SQL Synapse, как показано в следующем примере кода. Замените собственные сведения о подключении для <database>
, <uid>
и <password>
<table>
.
# load RODBC package
library(RODBC)
# config connection string
DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"
ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)
# connect to driver
channel <-odbcDriverConnect(ConnectionString)
# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)
# use SparkR::as.DataFrame to convert R data.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)
Операции с кадрами данных
Кадры данных SparkR поддерживают многие функции для структурированной обработки данных. Ниже приводятся несколько простых примеров. Полный список можно найти в документации по API SparkR.
Выбор строк и столбцов
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
Группирование и статистический анализ
Кадры данных SparkR поддерживают множество часто используемых функций для агрегирования данных после группировки. Например, можно вычислить гистограмму времени ожидания в верном наборе данных, как показано ниже.
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
Операции столбцов
SparkR предоставляет множество функций, которые можно напрямую применять к столбцам для обработки и агрегирования данных. В следующем примере показано применение базовых арифметических функций.
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
Применение определяемой пользователем функции
SparkR поддерживает несколько типов определяемых пользователем функций:
Запуск функции в большом наборе данных с dapply
помощью или dapplyCollect
dapply
Примените функцию к каждой секции объекта SparkDataFrame
. Функция, применяемая к каждой SparkDataFrame
секции и должна иметь только один параметр, к которому будет передана функция data.frame, соответствующая каждой секции. Выходные данные функции должны быть data.frame
. Схема задает формат строки результирующего SparkDataFrame
объекта. Он должен соответствовать типам данных возвращаемого значения.
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
dapplyCollect
Как и dapply, примените функцию к каждой секции и SparkDataFrame
соберите результат обратно. Выходные данные функции должны быть data.frame
. Но на этот раз схема не требуется передавать. Обратите внимание, что может завершиться ошибкой, dapplyCollect
если выходные данные функции выполняются во всех разделах, не могут быть извлечены в драйвер и помещаются в память драйвера.
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
Выполнение функции в большом наборе данных сгруппированием по входным столбцам или gapply
gapplyCollect
gapply
Примените функцию к каждой группе объекта SparkDataFrame
. Функция должна применяться к каждой SparkDataFrame
группе и должна иметь только два параметра: группирование ключей и R data.frame
, соответствующих этому ключу. Группы выбираются из SparkDataFrames
столбцов. Выходные данные функции должны быть data.frame
. Схема задает формат строки результирующего SparkDataFrame
объекта. Она должна представлять выходную схему функции R из типов данных Spark. Имена возвращаемых data.frame
столбцов задаются пользователем.
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
gapplyCollect
Например gapply
, применяет функцию к каждой SparkDataFrame
группе и собирает результат обратно в R data.frame
. Выходные данные функции должны быть data.frame
. Но для прохождения схемы не требуется. Обратите внимание, что может завершиться ошибкой, gapplyCollect
если выходные данные функции выполняются во всех разделах, не могут быть извлечены в драйвер и помещаются в память драйвера.
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
Выполнение локальных функций R, распределенных с помощью spark.lapply
spark.lapply
lapply
Как и в машинном коде R, spark.lapply
выполняет функцию по списку элементов и распределяет вычисления с помощью Spark. Применяет функцию таким образом, как и doParallel
lapply
к элементам списка. Результаты всех вычислений должны соответствовать одному компьютеру. Если это не так, они могут сделать что-то подобное df <- createDataFrame(list)
, а затем использовать dapply
.
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
Выполнение запросов SQL из SparkR
Кадр данных SparkR также можно зарегистрировать в качестве временного представления, позволяющего выполнять запросы SQL по своим данным. Функция SQL позволяет приложениям выполнять sql-запросы программным способом и возвращать результат в виде кадра данных SparkR.
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
Машинное обучение
SparkR включает большую часть алгоритмов MLLib. SparkR применяет MLlib для обучения модели.
В следующем примере показано, как создать модель GLM Gaussian с помощью SparkR. Чтобы выполнить линейную регрессию, установите для параметра family значение "gaussian"
. Чтобы выполнить логистическую регрессию, установите для параметра family значение "binomial"
. При использовании SparkML GLM
SparkR автоматически выполняет одноохотливную кодировку категориальных функций, чтобы ее не нужно было выполнять вручную. Помимо функций строкового и двойного типа, также можно разместить над функциями MLlib Vector для совместимости с другими компонентами MLlib.
Дополнительные сведения о поддерживаемых алгоритмах машинного обучения см. в документации по SparkR и MLlib.
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)