Spark でデータを分析する

完了

Spark を使用する利点の 1 つは、さまざまなプログラミング言語でコードを記述して実行できることです。これにより、既にお持ちのプログラミング スキルを活用し、特定のタスクに最適な言語を使用することができます。 新しい Azure Synapse Analytics Spark ノートブックの既定の言語は PySpark です。これは、データ操作と視覚化に対する強力なサポートにより、データ サイエンティストやアナリストが一般的に使用する Python の Spark 最適化バージョンです。 さらに、Scala (対話形式で使用できる Java 派生言語) やSQL (Spark SQL ライブラリに含まれる一般的に使用される SQL 言語のバリアント) などの言語を使用して、リレーショナル データ構造を操作できます。 ソフトウェア エンジニアは、JavaMicrosoft .NET などのフレームワークを使用して Spark 上で実行されるコンパイル済みソリューションを作成することもできます。

データフレームを使用してデータを探索する

Spark では、"耐障害性分散データセット" (RDD) と呼ばれるデータ構造がネイティブで使用されます。ただし、RDD で直接動作するコードを記述 "できる" ものの、Spark で構造化データを操作するために最もよく使用されるデータ構造は、Spark SQL ライブラリの一部として提供される "データフレーム"です。 Spark のデータフレームは、ユビキタス Pandas Python ライブラリのデータフレームと似ていますが、Spark の分散処理環境で動作するように最適化されています。

注意

データフレーム API に加えて、Spark SQL では、Java と Scala でサポートされている厳密に型指定された "データセット" API が提供されます。 このモジュールでは、Dataframe API に焦点を当てます。

データフレームにデータを読み込む

仮説の例を見て、データフレームを使用してデータを操作する方法を確認しましょう。 Azure Synapse Analytics ワークスペースのプライマリ ストレージ アカウントで、products.csv という名前のコンマ区切りのテキスト ファイルに次のデータが含まれるとします。

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Spark ノートブックでは、次の PySpark コードを使用してデータフレームにデータを読み込み、最初の 10 行を表示できます。

%%pyspark
df = spark.read.load('abfss://container@store.dfs.core.windows.net/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

先頭の %%pyspark 行は "マジック" と呼ばれ、このセルで使用される言語が PySpark であることを Spark に伝えます。 ノートブック インターフェイスのツール バーで既定として使用する言語を選択し、マジックを使用して特定のセルの選択をオーバーライドできます。 たとえば、製品データの同等の Scala コードの例を次に示します。

%%spark
val df = spark.read.format("csv").option("header", "true").load("abfss://container@store.dfs.core.windows.net/products.csv")
display(df.limit(10))

マジック %%spark は Scala を指定するために使用されます。

これらのコード サンプルの両方で、次のような出力が生成されます。

ProductID ProductName カテゴリ ListPrice
771 Mountain-100 Silver, 38 マウンテン バイク 3399.9900
772 Mountain-100 Silver, 42 マウンテン バイク 3399.9900
773 Mountain-100 Silver, 44 マウンテン バイク 3399.9900
... ... ... ...

データフレーム スキーマを指定する

前の例では、CSV ファイルの最初の行に列名が含まれており、Spark により、含まれているデータから各列のデータ型を推論できました。 また、データの明示的なスキーマを指定することもできます。これは、次の CSV の例のように、データ ファイルに列名が含まれていない場合に便利です。

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

次の PySpark の例は、product-data.csv という名前のファイルからデータフレームを読み込むスキーマをこの形式で指定する方法を示しています。

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('abfss://container@store.dfs.core.windows.net/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

ここでも、結果は次のようになります。

ProductID ProductName カテゴリ ListPrice
771 Mountain-100 Silver, 38 マウンテン バイク 3399.9900
772 Mountain-100 Silver, 42 マウンテン バイク 3399.9900
773 Mountain-100 Silver, 44 マウンテン バイク 3399.9900
... ... ... ...

データフレームのフィルター処理とグループ化を行う

Dataframe クラスのメソッドを使用して、含まれているデータをフィルター処理、並べ替え、グループ化、操作できます。 たとえば、次のコード例では、select メソッドを使用して、前の例の製品データを含む df データフレームから ProductName 列と ListPrice 列を取得します。

pricelist_df = df.select("ProductID", "ListPrice")

このコード例の結果は次のようになります。

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

ほとんどのデータ操作メソッドと同様に、select は新しいデータフレーム オブジェクトを返します。

ヒント

データフレームから列のサブセットを選択することは一般的な操作であり、次の短い構文を使用して実現することもできます。

pricelist_df = df["ProductID", "ListPrice"]

メソッドを "チェーン" して、変換されたデータフレームを作成する一連の操作を実行できます。 たとえば、次のコード例では、selectメソッドと where メソッドをチェーンして、Mountain Bikes または Road Bikes のカテゴリを持つ製品に対して ProductName 列と ListPrice 列を含む新しいデータフレームを作成します。

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

このコード例の結果は次のようになります。

ProductName ListPrice
Mountain-100 Silver, 38 3399.9900
Road-750 Black, 52 539.9900
... ...

データをグループ化して集計するには、groupBy メソッドと集計関数を使用します。 たとえば、次の PySpark コードでは、各カテゴリの製品数をカウントします。

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

このコード例の結果は次のようになります。

カテゴリ count
ヘッドセット 3
ホイール 14
マウンテン バイク 32
... ...

Spark で SQL 式を使用する

Dataframe API は Spark SQL という名前の Spark ライブラリの一部であり、データ アナリストは SQL 式を使用してデータのクエリと操作を行います。

Spark カタログでデータベース オブジェクトを作成する

Spark カタログは、ビューやテーブルなどのリレーショナル データ オブジェクトのメタストアです。 Spark ランタイムでは、このカタログを使用して、任意の Spark 対応言語で記述されたコードと、一部のデータ アナリストや開発者にとってより自然な SQL 式をシームレスに統合できます。

Spark カタログでクエリを実行するためにデータフレーム内のデータを使用できるようにする最も簡単な方法の 1 つは、次のコード例に示すように、一時ビューを作成することです。

df.createOrReplaceTempView("products")

"ビュー" は一時的なもので、現在のセッションの終了時に自動的に削除されます。 また、カタログに保持される "テーブル" を作成して、Spark SQL を使用してクエリを実行できるデータベースを定義することもできます。

注意

このモジュールでは Spark カタログ テーブルについて詳しく説明しませんが、いくつかの重要な点を確認しておくことをお勧めします。

  • spark.catalog.createTable メソッドを使用して、空のテーブルを作成できます。 テーブルは、カタログに関連付けられているストレージの場所に、基になるデータを格納するメタデータ構造です。 テーブルを削除すると、基になるデータも削除されます。
  • データフレームをテーブルとして保存するには、saveAsTable メソッドを使用します。
  • spark.catalog.createExternalTable メソッドを使用して "外部" テーブルを作成できます。 外部テーブルではカタログ内のメタデータが定義されますが、外部ストレージの場所 (通常は、データ レイク内のフォルダー) から基になるデータが取得されます。 外部テーブルを削除しても、基になるデータは削除されません。

Spark SQL API を使用してデータのクエリを実行する

任意の言語で記述されたコードで Spark SQL API を使用して、カタログ内のデータに対してクエリを実行できます。 たとえば、次の PySpark コードでは、SQL クエリを使用して、products ビューからデータフレームとしてデータを返します。

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

このコード例の結果は、次の表のようになります。

ProductID ProductName ListPrice
38 Mountain-100 Silver, 38 3399.9900
52 Road-750 Black, 52 539.9900
... ... ...

SQL コードを使用する

前の例では、Spark SQL API を使用して、Spark コードに SQL 式を埋め込む方法を示しました。 また、ノートブックで %%sql マジックを使用して、次のようにカタログ内のオブジェクトに対してクエリを行う SQL コードを実行することもできます。

%%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

この SQL コード例では、次のように、ノートブックにテーブルとして自動的に表示される結果セットが返されます。

カテゴリ ProductCount
ビブショーツ 3
バイク ラック 1
バイク スタンド 1
... ...