R で DataFrame とテーブルを操作する
この記事では、 SparkR、 sparklyr、 dplyr などの R パッケージを使用して、R data.frame
s、 Spark DataFrame、およびメモリ内テーブルを操作する方法について説明します。
SparkR、sparklyr、dplyr を使用すると、これらのすべてのパッケージで特定の操作を完了でき、最も使い慣れたパッケージを使用できることに注意してください。 たとえば、クエリを実行するには、SparkR::sql
、sparklyr::sdf_sql
、および dplyr::select
のような関数呼び出すことができます。 場合によっては、これらのパッケージの 1 つまたは 2 つだけで操作を完了できる場合があり、選択する操作は使用シナリオによって異なります。 たとえば、sparklyr::sdf_quantile
を呼び出す方法は dplyr::percentile_approx
を呼び出す方法とは少し異なりますが、どちらの関数も分位数を計算します。
SparkR と sparklyr の間のブリッジとして SQL を使用できます。 たとえば、SparkR::sql
を使用して、sparklyr で作成したテーブルにクエリを実行できます。 sparklyr::sdf_sql
を使用して、SparkR で作成したテーブルにクエリを実行できます。 また、 dplyr
コードは実行前に常にメモリ内の SQL に変換されます。 API の相互運用性と SQL 変換も参照してください。
SparkR、sparklyr、dplyr を読み込む
SparkR、sparklyr、dplyr のパッケージは、Azure Databricks クラスターにインストールされている Databricks ランタイムに含まれています。 したがって、これらのパッケージの呼び出しを開始する前に、通常の install.package
の呼び出しを行う必要はありません。 ただし、最初に library
を使用したこれらのパッケージを 読み込む必要があります。 たとえば、Azure Databricks ワークスペースの R ノートブック 内から、ノートブック セルで次のコードを実行して、SparkR、sparklyr、dplyr を読み込みます。
library(SparkR)
library(sparklyr)
library(dplyr)
sparklyr をクラスターに接続する
sparklyr を読み込んだ後、sparklyr::spark_connect
接続方法を指定してクラスターに接続するために databricks
を呼び出す必要があります。 たとえば、ノートブック セルで次のコードを実行して、ノートブックをホストするクラスターに接続します。
sc <- spark_connect(method = "databricks")
これに対し、Azure Databricks ノートブックでは、SparkR で使用するためにクラスター上に既に SparkSession
が確立されているため、SparkR の呼び出しを開始する前に SparkR::sparkR.session
を呼び出す必要はありません。
JSON データ ファイルをワークスペースにアップロードする
この記事のコード例の多くは、Azure Databricks ワークスペース内の特定の場所にある特定の場所のデータに基づいており、特定の列名とデータ型が使用されています。 このコード例のデータは、GitHub 内から book.json
という名前の JSON ファイルで生成されます。 このファイルを取得してワークスペースにアップロードするには:
- GitHub の books.json ファイルに移動し、テキスト エディタを使用してその内容をローカル マシン上の
books.json
という名前のファイルにコピーします。 - Azure Databricks ワークスペースのサイドバーで、カタログ をクリックします。
- [テーブルの作成] をクリックします。
- [ファイルのアップロード] タブで、ローカル コンピューターから
books.json
[アップロードするファイルのドロップ] ボックスにファイルをドロップします。 または、[クリックして参照] を選択し、ローカル マシンからbooks.json
ファイルを参照します。
既定では、Azure Databricks はローカルの books.json
ファイルをワークスペースの DBFS の場所にパス /FileStore/tables/books.json
でアップロードします。
UI でテーブルを作成 または ノートブックでテーブルを作成 をクリックしないでください。 この記事のコード例では、この DBFS の場所にあるアップロードされた books.json
ファイル内のデータを使用します。
DataFrame に JSON データを読み取る
アップロードされた JSON ファイルを DataFrame に読み取り、接続、JSON ファイルへのパス、およびデータの内部テーブル表現の名前を指定するには、sparklyr::spark_read_json
を使用します。 この例では、ファイルに複数の行が book.json
含まれていることを指定する必要があります。 ここで列のスキーマを指定することは省略可能です。 それ以外の場合、sparklyr は既定で列のスキーマを推論します。 たとえば、ノートブック セルで次のコードを実行して、アップロードされた JSON ファイルのデータを jsonDF
という名前の DataFrame に読み込みます。
jsonDF <- spark_read_json(
sc = sc,
name = "jsonTable",
path = "/FileStore/tables/books.json",
options = list("multiLine" = TRUE),
columns = c(
author = "character",
country = "character",
imageLink = "character",
language = "character",
link = "character",
pages = "integer",
title = "character",
year = "integer"
)
)
DataFrame の最初の数行を印刷する
DataFrame の最初の行を印刷するには、SparkR::head
、SparkR::show
、または sparklyr::collect
を使用することもできます。 既定では、 head
は最初の 6 行を出力されます。 show
および collect
は最初の 10 行を印刷します。 たとえば、ノートブック セルで次のコードを実行して、次の jsonDF
という名前の DataFrame の最初の行を出力します。
head(jsonDF)
# Source: spark<?> [?? x 8]
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Akk… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid Em… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/Ir… images… Arabic "htt… 288 One … 1200
# … with abbreviated variable names ¹imageLink, ²language
show(jsonDF)
# Source: spark<jsonTable> [?? x 8]
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Ak… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid E… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/I… images… Arabic "htt… 288 One … 1200
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350
# 8 Jane Austen United Kingd… images… English "htt… 226 Prid… 1813
# 9 Honoré de Balzac France images… French "htt… 443 Le P… 1835
# 10 Samuel Beckett Republic of … images… French… "htt… 256 Moll… 1952
# … with more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
collect(jsonDF)
# A tibble: 100 × 8
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Ak… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid E… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/I… images… Arabic "htt… 288 One … 1200
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350
# 8 Jane Austen United Kingd… images… English "htt… 226 Prid… 1813
# 9 Honoré de Balzac France images… French "htt… 443 Le P… 1835
# 10 Samuel Beckett Republic of … images… French… "htt… 256 Moll… 1952
# … with 90 more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
SQL クエリを実行し、テーブルへの書き込みとテーブルからの読み取りを行う
dplyr 関数を使用して、DataFrame で SQL クエリを実行できます。 たとえば、ノートブック セルで次のコードを実行して、dplyr::group_by
と dployr::count
を使用し、jsonDF
という名前のデータ フレームから作成者別のカウントを取得します。 dplyr::arrange
と dplyr::desc
を使用して、結果をカウントの降順で並べ替えます。 次に、既定で最初の 10 行を印刷します。
group_by(jsonDF, author) %>%
count() %>%
arrange(desc(n))
# Source: spark<?> [?? x 2]
# Ordered by: desc(n)
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Gustave Flaubert 2
# 8 Homer 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with more rows
# ℹ Use `print(n = ...)` to see more rows
その後、Azure Databricks のテーブルに結果を書き込むのに sparklyr::spark_write_table
を使用できます。 たとえば、ノートブック セルで次のコードを実行してクエリを再実行し、結果を次の json_books_agg
という名前のテーブルに書き込みます。
group_by(jsonDF, author) %>%
count() %>%
arrange(desc(n)) %>%
spark_write_table(
name = "json_books_agg",
mode = "overwrite"
)
テーブルが作成されたことを確認するには、sparklyr::sdf_sql
を SparkR::showDF
とともに使用してテーブルのデータを表示します。 たとえば、ノートブック セルで次のコードを実行してテーブルを DataFrame に照会し、既定で DataFrame の最初の 10 行を印刷するには sparklyr::collect
を使用します。
collect(sdf_sql(sc, "SELECT * FROM json_books_agg"))
# A tibble: 82 × 2
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Homer 2
# 8 Gustave Flaubert 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows
また、同様の操作を行うためには sparklyr::spark_read_table
を使用できます。 たとえば、ノートブック セルで次のコードを実行して、jsonDF
という名前の前の DataFrame を DataFrame にクエリし、sparklyr::collect
を使用して DataFrame の最初の 10 行をデフォルトで出力します。
fromTable <- spark_read_table(
sc = sc,
name = "json_books_agg"
)
collect(fromTable)
# A tibble: 82 × 2
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Homer 2
# 8 Gustave Flaubert 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows
DataFrame に列と計算列の値を追加する
dplyr 関数を使用して、DataFrames に列を追加したり、列の値を計算したりできます。
たとえば、ノートブック セルで次のコードを実行して、jsonDF
という名前の DataFrame の内容を取得します。 dplyr::mutate
を使用して today
という名前の列を追加し、この新しい列に現在のタイムスタンプを入力します。 次に、これらのコンテンツを withDate
という名前の新しい DataFrame に書き込み、dplyr::collect
を使用して新しい DataFrame の最初の 10 行をデフォルトで出力します。
注意
dplyr::mutate
は、Hive の組み込み関数 (UDF とも呼ばれます) および組み込み集約関数 (UDAF とも呼ばれます) に準拠する引数のみを受け入れます。 一般的な情報については、「Hive 関数」を参照してください。 このセクションの日付関連関数の詳細については、「日付関数」を参照してください。
withDate <- jsonDF %>%
mutate(today = current_timestamp())
collect(withDate)
# A tibble: 100 × 9
# author country image…¹ langu…² link pages title year today
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int> <dttm>
# 1 Chinua A… Nigeria images… English "htt… 209 Thin… 1958 2022-09-27 21:32:59
# 2 Hans Chr… Denmark images… Danish "htt… 784 Fair… 1836 2022-09-27 21:32:59
# 3 Dante Al… Italy images… Italian "htt… 928 The … 1315 2022-09-27 21:32:59
# 4 Unknown Sumer … images… Akkadi… "htt… 160 The … -1700 2022-09-27 21:32:59
# 5 Unknown Achaem… images… Hebrew "htt… 176 The … -600 2022-09-27 21:32:59
# 6 Unknown India/… images… Arabic "htt… 288 One … 1200 2022-09-27 21:32:59
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350 2022-09-27 21:32:59
# 8 Jane Aus… United… images… English "htt… 226 Prid… 1813 2022-09-27 21:32:59
# 9 Honoré d… France images… French "htt… 443 Le P… 1835 2022-09-27 21:32:59
# 10 Samuel B… Republ… images… French… "htt… 256 Moll… 1952 2022-09-27 21:32:59
# … with 90 more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
さて、dplyr::mutate
を使用して、withDate
DataFrame のコンテンツにさらに 2 つの列を追加します。 新しい month
列と year
列には、today
列の月と年の数値が含まれています。 次に、これらの内容を withMMyyyy
という名前の新しい DataFrame に書き込み、dplyr::select
と dplyr::collect
を使用して、既定で新しい DataFrame の最初の 10 行の author
、title
、month
、および year
列を出力します。
withMMyyyy <- withDate %>%
mutate(month = month(today),
year = year(today))
collect(select(withMMyyyy, c("author", "title", "month", "year")))
# A tibble: 100 × 4
# author title month year
# <chr> <chr> <int> <int>
# 1 Chinua Achebe Things Fall Apart 9 2022
# 2 Hans Christian Andersen Fairy tales 9 2022
# 3 Dante Alighieri The Divine Comedy 9 2022
# 4 Unknown The Epic Of Gilgamesh 9 2022
# 5 Unknown The Book Of Job 9 2022
# 6 Unknown One Thousand and One Nights 9 2022
# 7 Unknown Njál's Saga 9 2022
# 8 Jane Austen Pride and Prejudice 9 2022
# 9 Honoré de Balzac Le Père Goriot 9 2022
# 10 Samuel Beckett Molloy, Malone Dies, The Unnamable, the … 9 2022
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows
dplyr::mutate
を使用して、withMMyyyy
DataFrame のコンテンツにさらに 2 つの列を追加します。 新しい formatted_date
列には today
列の yyyy-MM-dd
部分が含まれ、新しい day
列には新しい formatted_date
列の数字の日が含まれます。 次に、これらのコンテンツを withUnixTimestamp
という名前の新しい DataFrame に書き込み、dplyr::select
と dplyr::collect
を使用して、新しい DataFrame の最初の 10 行の title
、formatted_date
、および day
列をデフォルトで出力します。
withUnixTimestamp <- withMMyyyy %>%
mutate(formatted_date = date_format(today, "yyyy-MM-dd"),
day = dayofmonth(formatted_date))
collect(select(withUnixTimestamp, c("title", "formatted_date", "day")))
# A tibble: 100 × 3
# title formatted_date day
# <chr> <chr> <int>
# 1 Things Fall Apart 2022-09-27 27
# 2 Fairy tales 2022-09-27 27
# 3 The Divine Comedy 2022-09-27 27
# 4 The Epic Of Gilgamesh 2022-09-27 27
# 5 The Book Of Job 2022-09-27 27
# 6 One Thousand and One Nights 2022-09-27 27
# 7 Njál's Saga 2022-09-27 27
# 8 Pride and Prejudice 2022-09-27 27
# 9 Le Père Goriot 2022-09-27 27
# 10 Molloy, Malone Dies, The Unnamable, the trilogy 2022-09-27 27
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows
一時ビューを作成する
既存の DataFrame に基づく名前付き一時ビューをメモリ内に作成できます。 たとえば、ノートブック セルで次のコードを実行して、SparkR::createOrReplaceTempView
を使用して、jsonTable
という名前の前の DataFrame の内容を取得し、timestampTable
という名前の一時的なビューを作成します。 次に、sparklyr::spark_read_table
を使用して一時ビューの内容を読み取ります。 sparklyr::collect
を使用して、デフォルトで一時テーブルの最初の 10 行を出力します。
createOrReplaceTempView(withTimestampDF, viewName = "timestampTable")
spark_read_table(
sc = sc,
name = "timestampTable"
) %>% collect()
# A tibble: 100 × 10
# author country image…¹ langu…² link pages title year today
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int> <dttm>
# 1 Chinua A… Nigeria images… English "htt… 209 Thin… 1958 2022-09-27 21:11:56
# 2 Hans Chr… Denmark images… Danish "htt… 784 Fair… 1836 2022-09-27 21:11:56
# 3 Dante Al… Italy images… Italian "htt… 928 The … 1315 2022-09-27 21:11:56
# 4 Unknown Sumer … images… Akkadi… "htt… 160 The … -1700 2022-09-27 21:11:56
# 5 Unknown Achaem… images… Hebrew "htt… 176 The … -600 2022-09-27 21:11:56
# 6 Unknown India/… images… Arabic "htt… 288 One … 1200 2022-09-27 21:11:56
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350 2022-09-27 21:11:56
# 8 Jane Aus… United… images… English "htt… 226 Prid… 1813 2022-09-27 21:11:56
# 9 Honoré d… France images… French "htt… 443 Le P… 1835 2022-09-27 21:11:56
# 10 Samuel B… Republ… images… French… "htt… 256 Moll… 1952 2022-09-27 21:11:56
# … with 90 more rows, 1 more variable: month <chr>, and abbreviated variable
# names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows, and `colnames()` to see all variable names
DataFrame で統計分析を実行する
sparklyr と dplyr を統計分析に使用できます。
たとえば、統計を実行する DataFrame を作成します。 これを行うには、ノートブック セルで次のコードを実行し、sparklyr::sdf_copy_to
を使用して、R に組み込まれている iris
データセットの内容を iris
という名前の DataFrame に書き込みます。 sparklyr::sdf_collect
を使用して、デフォルトで一時テーブルの最初の 10 行を出力します。
irisDF <- sdf_copy_to(
sc = sc,
x = iris,
name = "iris",
overwrite = TRUE
)
sdf_collect(irisDF, "row-wise")
# A tibble: 150 × 5
# Sepal_Length Sepal_Width Petal_Length Petal_Width Species
# <dbl> <dbl> <dbl> <dbl> <chr>
# 1 5.1 3.5 1.4 0.2 setosa
# 2 4.9 3 1.4 0.2 setosa
# 3 4.7 3.2 1.3 0.2 setosa
# 4 4.6 3.1 1.5 0.2 setosa
# 5 5 3.6 1.4 0.2 setosa
# 6 5.4 3.9 1.7 0.4 setosa
# 7 4.6 3.4 1.4 0.3 setosa
# 8 5 3.4 1.5 0.2 setosa
# 9 4.4 2.9 1.4 0.2 setosa
# 10 4.9 3.1 1.5 0.1 setosa
# … with 140 more rows
# ℹ Use `print(n = ...)` to see more rows
次に、dplyr::group_by
を使用して、Species
列で行をグループ化します。 dplyr::summarize
を dplyr::percentile_approx
とともに使用して、Sepal_Length
列の 25、50、75、および 100 番目の分位数による要約統計量を Species
で計算します。 sparklyr::collect
を使用して結果を印刷します。
注意
dplyr::summarize
は、Hive の組み込み関数 (UDF とも呼ばれます) および組み込み集約関数 (UDAF とも呼ばれます) に準拠する引数のみを受け入れます。 一般的な情報については、「Hive 関数」を参照してください。 詳細については percentile_approx
、「組み込みの集計関数 (UDAF)」を参照してください。
quantileDF <- irisDF %>%
group_by(Species) %>%
summarize(
quantile_25th = percentile_approx(
Sepal_Length,
0.25
),
quantile_50th = percentile_approx(
Sepal_Length,
0.50
),
quantile_75th = percentile_approx(
Sepal_Length,
0.75
),
quantile_100th = percentile_approx(
Sepal_Length,
1.0
)
)
collect(quantileDF)
# A tibble: 3 × 5
# Species quantile_25th quantile_50th quantile_75th quantile_100th
# <chr> <dbl> <dbl> <dbl> <dbl>
# 1 virginica 6.2 6.5 6.9 7.9
# 2 versicolor 5.6 5.9 6.3 7
# 3 setosa 4.8 5 5.2 5.8
同様の結果は、たとえば次の sparklyr::sdf_quantile
を使用して計算できます。
print(sdf_quantile(
x = irisDF %>%
filter(Species == "virginica"),
column = "Sepal_Length",
probabilities = c(0.25, 0.5, 0.75, 1.0)
))
# 25% 50% 75% 100%
# 6.2 6.5 6.9 7.9