PySpark の基本

この記事では、PySpark の使用法を示す簡単な例について説明します。 基本的な Apache Spark の概念を理解しており、コンピューティングに接続された Azure Databricks ノートブックでコマンドを実行していることを前提としています。 サンプル データを使用して DataFrame を作成し、このデータに対して行操作や列操作などの基本的な変換を実行し、複数の DataFrame を結合してこのデータを集計し、このデータを視覚化して、テーブルまたはファイルに保存します。

データをアップロードする

この記事の一部の例では、Databricks が提供するサンプル データを使用して、DataFrame を使用したデータの読み込み、変換、保存を行う方法を示します。 Databricks にまだ存在していない独自のデータを使用する場合は、まずそれをアップロードし、そこから DataFrame を作成できます。 「ファイルのアップロードを使用してテーブルを作成または変更する」と「Unity Catalog ボリュームにファイルをアップロードする」をご覧ください。

Databricks サンプル データについて

Databricks は、samples カタログと /databricks-datasets ディレクトリにサンプル データを提供します。

  • samples カタログ内のサンプル データにアクセスするには、形式 samples.<schema-name>.<table-name> を使用します。 この記事では、架空の企業からのデータが含まれる samples.tpch スキーマのテーブルを使用します。 customer テーブルには顧客に関する情報が含まれており、orders にはそれらの顧客による注文に関する情報が含まれています。
  • dbutils.fs.ls を使用して、/databricks-datasets のデータを調べます。 Spark SQL または DataFrame を使用し、ファイルのパスを使ってこの場所のデータのクエリを実行します。 Databricks が提供するサンプル データの詳細については、「サンプル データセット」を参照してください。

データ型のインポート

多くの PySpark 操作では、SQL 関数を使用するか、ネイティブ Spark 型と対話する必要があります。 必要な関数と型のみを直接インポートすることも、モジュール全体をインポートすることもできます。

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

インポートされた関数によっては Python の組み込み関数をオーバーライドする場合があるため、一部のユーザーはエイリアスを使用してこれらのモジュールをインポートすることを選びます。 次の例は、Apache Spark のコード例で使用される一般的なエイリアスを示しています。

import pyspark.sql.types as T
import pyspark.sql.functions as F

データ型の包括的な一覧については、Spark のデータ型に関するページを参照してください。

PySpark SQL 関数の包括的な一覧については、Spark の関数に関するページを参照してください。

DataFrame の作成

DataFrame を作成する方法はいくつかあります。 通常、テーブルやファイルのコレクションなどのデータ ソースに対して DataFrame を定義します。 次に、Apache Spark の基本概念のセクションで説明されているように、display などのアクションを使用して、実行する変換をトリガーします。 display メソッドは DataFrame を出力します。

指定した値を使用して DataFrame を作成する

指定した値を使用して DataFrame を作成するには、行がタプルのリストとして表現される createDataFrame メソッドを使用します。

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

出力では、df_children の列のデータ型が自動的に推論されることに注目してください。 または、スキーマを追加して型を指定することもできます。 スキーマは、名前、データ型、null 値が含まれるかどうかを示すブール値フラグを指定する StructFields で構成される StructType を使用して定義されます。 pyspark.sql.types からデータ型をインポートする必要があります。

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Unity Catalog のテーブルから DataFrame を作成する

Unity Catalog 内のテーブルから DataFrame を作成するには、形式 <catalog-name>.<schema-name>.<table-name> を使用してテーブルを特定する table メソッドを使用します。 左側のナビゲーションバーの [カタログ] をクリックし、カタログ エクスプローラーを使用してテーブルに移動します。 それをクリックし、[テーブル パスのコピー] を選んでテーブル パスをノートブックに挿入します。

次の例ではテーブル samples.tpch.customer を読み込みますが、代わりに独自のテーブルへのパスを指定することもできます。

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

アップロードしたファイルから DataFrame を作成する

Unity Catalog ボリュームにアップロードしたファイルから DataFrame を作成するには、read プロパティを使用します。 このメソッドは DataFrameReader を返します。これを使用して適切な形式を読み取ることができます。 左側の小さなサイド バーにあるカタログ オプションをクリックし、カタログ ブラウザーを使用してファイルを見つけます。 それを選び、[ボリュームのファイル パスをコピー] をクリックします。

次の例では *.csv ファイルから読み取りますが、DataFrameReader は他の多くの形式でのファイルのアップロードをサポートしています。 DataFrameReader メソッドに関するページを参照してください。

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Unity Catalog ボリュームの詳細については、「Unity Catalog ボリュームとは」を参照してください。

JSON 応答から DataFrame を作成する

REST API によって返された JSON 応答ペイロードから DataFrame を作成するには、Python requests パッケージを使用して応答のクエリと解析を行います。 パッケージを使用するには、インポートする必要があります。 この例では、米国食品医薬品局の医薬品申請データベースのデータを使用します。

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

Databricks での JSON やその他の半構造化データの操作については、「半構造化データをモデル化する」を参照してください。

JSON フィールドまたはオブジェクトを選択する

変換された JSON から特定のフィールドまたはオブジェクトを選ぶには、[] 表記を使用します。 たとえば、それ自体が製品の配列である products フィールドを選ぶには、次のようにします。

display(df_drugs.select(df_drugs["products"]))

メソッド呼び出しを連鎖して複数のフィールドをトラバースすることもできます。 たとえば、医薬品申請の最初の製品のブランド名を出力するには、次のようにします。

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

ファイルから DataFrame を作成する

ファイルからの DataFrame の作成を示すために、この例では /databricks-datasets ディレクトリの CSV データを読み込みます。

サンプル データセットに移動するには、Databricks Utilties ファイル システム コマンドを使用できます。 次の例では、dbutils を使用して、/databricks-datasets で使用できるデータセットを一覧表示します。

display(dbutils.fs.ls('/databricks-datasets'))

または、次の例に示すように、%fs を使用して Databricks CLI ファイル システム コマンドにアクセスできます。

%fs ls '/databricks-datasets'

ファイルまたはファイルのディレクトリから DataFrame を作成するには、load メソッドでパスを指定します。

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

DataFrame を使用したデータ変換

DataFrame を使用すると、データの並べ替え、フィルター処理、集計を行う組み込みメソッドを使用してデータを簡単に変換できます。 多くの変換は DataFrame のメソッドとして指定されておらず、代わりに spark.sql.functions パッケージで提供されます。 Databricks Spark SQL 関数に関するページを参照してください。

列の操作

Spark には、多くの基本的な列操作が用意されています。

ヒント

DataFrame 内のすべての列を出力するには、columns (たとえば df_customer.columns) を使用します。

列を選択する

selectcol を使用して特定の列を選択できます。 col 関数は pyspark.sql.functions サブモジュール内にあります。

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

文字列として定義された式を受け取る expr を使用して列を参照することもできます。

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

SQL 式を受け入れる selectExpr を使用することもできます。

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

文字列リテラルを使用して列を選択するには、以下を行います。

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

特定の DataFrame から列を明示的に選択するには、[] 演算子または . 演算子を使用します (. 演算子は、整数で始まる列、またはスペースや特殊文字を含む列の選択には使用できません)。これは、一部の列が同じ名前を持つ DataFrame を結合する場合に特に役立ちます。

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

列を作成する

新しい列を作成するには、withColumn メソッドを使用します。 次の例では、顧客の勘定残高 c_acctbal1000 を超えているかどうかに基づいて、ブール値を含む新しい列を作成します。

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

列名の変更

列の名前を変更するには、既存の列名と新しい列名を受け入れる withColumnRenamed メソッドを使用します。

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

alias メソッドは、集計の一部として列の名前を変更する場合に特に役立ちます。

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

列の型のキャスト

場合によっては、DataFrame 内の 1 つ以上の列のデータ型を変更する必要があります。 これを行うには、cast メソッドを使用して列のデータ型間で変換します。 次の例は、col メソッドを使用して列を参照し、列を整数から文字列型に変換する方法を示しています。

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

列を削除する

列を削除するには、select または select(*) except で列を省略するか、drop メソッドを使用します。

df_customer_flag_renamed.drop("balance_flag_renamed")

複数の列を一度に削除することもできます。

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

行の操作

Spark には、多くの基本的な行操作が用意されています。

行のフィルター処理

行をフィルター処理するには、DataFrame で filter または where メソッドを使用して、特定の行のみを返します。 フィルター処理する列を特定するには、col メソッドまたは列として評価される式を使用します。

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

複数の条件でフィルター処理するには、論理演算子を使用します。 たとえば、&| は、それぞれ ANDOR の条件を有効にできます。 次の例では、c_nationkey20 に等しく、c_acctbal1000 より大きい行をフィルター処理します。

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

重複する行の削除

行の重複を除外するには、一意の行のみを返す distinct を使用します。

df_unique = df_customer.distinct()

null 値の処理

null 値を処理するには、na.drop メソッドを使用して null 値を含む行を削除します。 このメソッドでは、削除する行が、1 つでも null 値を含む (any) のか、すべてが null 値である (all) のかを指定できます。

あらゆる null 値を削除するには、次のいずれかの例を使用します。

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

代わりに、すべて null 値のものが含まれる行のみをフィルターで除外する場合は、次を使用します。

df_customer_no_nulls = df_customer.na.drop("all")

次に示すように、これを列のサブセットに適用するには、これを指定します。

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

欠損値を埋めるには、fill メソッドを使用します。 これをすべての列に適用するか、列のサブセットに適用するかを選択できます。 以下の例では、勘定残高 c_acctbal の値が null である勘定残高には、0 が入力されます。

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

文字列を他の値に置き換えるには、replace メソッドを使用します。 以下の例では、空のアドレス文字列は単語 UNKNOWN に置き換えられます。

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

行の追加

行を追加するには、union メソッドを使用して新しい DataFrame を作成する必要があります。 次の例では、先ほど作成した DataFrame df_that_one_customerdf_filtered_customer が結合され、3 人の顧客を含む DataFrame が返されます。

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

Note

DataFrame をテーブルに書き込んでから新しい行を追加することで、DataFrame を結合することもできます。 運用ワークロードの場合、ターゲット テーブルへのデータ ソースの増分処理により、データのサイズが大きくなるにつれて待機時間とコンピューティング コストが大幅に削減されます。 「Databricks レイクハウスにデータを取り込む」を参照してください。

行の並べ替え

重要

並べ替えは大規模になるとコストがかかる場合があり、並べ替えられたデータを格納し、そのデータを Spark で再読み込みする場合、順序は保証されません。 並べ替えを意図的に使用していることを確認します。

1 つ以上の列で行を並べ替えるには、sort メソッドまたは orderBy メソッドを使用します。 既定では、これらのメソッドは昇順に並べ替えられます。

df_customer.orderBy(col("c_acctbal"))

降順でフィルター処理するには、desc を使用します。

df_customer.sort(col("c_custkey").desc())

次の例は、2 つの列で並べ替える方法を示しています。

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

DataFrame の並べ替え後に返される行数を制限するには、limit メソッドを使用します。 次の例では、上位 10 件の結果のみを表示します。

display(df_sorted.limit(10))

DataFrame を結合する

複数の DataFrame を結合するには、join メソッドを使用します。 how (結合の種類) および on (結合のベースとなる列) パラメーターで DataFrame を結合する方法を指定できます。 一般的な結合の種類は次のとおりです。

  • inner: これは既定の結合の種類であり、DataFrame 全体で on パラメーターに一致する行のみを保持する DataFrame を返します。
  • left: 最初に指定された DataFrame のすべての行と、2 番目に指定された DataFrame の行のうち、最初のものと一致する行のみが保持されます。
  • outer: 外部結合では、一致に関係なく両方の DataFrame のすべての行が保持されます。

結合の詳細については、「Azure Databricks での結合の操作」を参照してください。 PySpark でサポートされている結合の一覧については、DataFrame 結合に関するページを参照してください。

次の例は、orders DataFrame の各行が customers DataFrame の対応する行と結合された 1 つの DataFrame を返します。 すべての注文が正確に 1 人の顧客に対応することが想定されるため、内部結合が使用されます。

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

複数の条件で結合するには、&| などのブール演算子を使用して、それぞれ ANDOR を指定します。 次の例では、さらなる条件を追加して、500,000 より大きい o_totalprice を持つ行のみをフィルター処理します。

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

データの集計

DataFrame 内のデータを集計するには、SQL の GROUP BY と同様に、groupBy メソッドを使用してグループ化する列を指定し、agg メソッドを使用して集計を指定します。 avgsummaxmin などの一般的な集計を pyspark.sql.functions からインポートします。 次の例は、市場セグメント別の平均顧客残高を示しています。

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

一部の集計はアクションです。これは、集計が計算をトリガーすることを意味します。 この場合、結果を出力するために他のアクションを使用する必要はありません。

DataFrame 内の行をカウントするには、count メソッドを使用します。

df_customer.count()

呼び出しのチェーン

DataFrame を変換するメソッドは DataFrame を返し、Spark はアクションが呼び出されるまで変換に対して作用しません。 この遅延評価は、利便性と可読性を高めるために複数のメソッドを連鎖できることを意味します。 次の例は、フィルター処理、集計、順序付けを連鎖する方法を示しています。

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

DataFrame を視覚化する

ノートブックで DataFrame を視覚化するには、DataFrame の左上隅にあるテーブルの横の + 記号をクリックし、[視覚化] を選んで DataFrame に基づいて 1 つ以上のグラフを追加します。 視覚化の詳細については、「Databricks ノートブックでの視覚化」を参照してください。

display(df_order)

追加の視覚化を実行するには、Databricks では Spark 用 pandas API を使用することをお勧めします。 .pandas_api() を使用すると、Spark DataFrame の対応する Pandas API にキャストできます。 詳細については、「Spark の Pandas API」を参照してください。

データを保存する

データを変換したら、DataFrameWriter メソッドを使用してデータを保存できます。 これらのメソッドの完全な一覧は、DataFrameWriter に関するページにあります。 次のセクションでは、DataFrame をテーブルとして、およびデータ ファイルのコレクションとして保存する方法を示します。

DataFrame をテーブルとして保存する

DataFrame を Unity Catalog のテーブルとして保存するには、write.saveAsTable メソッドを使用し、<catalog-name>.<schema-name>.<table-name> の形式でパスを指定します。

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

DataFrame を CSV として書き込む

DataFrame を *.csv 形式で書き込むには、write.csv メソッドを使用して形式とオプションを指定します。 既定では、指定されたパスにデータが存在する場合、書き込み操作は失敗します。 次のモードのいずれかを指定して、別のアクションを実行できます。

  • overwrite は、ターゲット パス内のすべての既存のデータを DataFrame の内容で上書きします。
  • append は、DataFrame のコンテンツをターゲット パス内のデータに追加します。
  • データがターゲット パスに存在する場合、ignore は書き込みを警告なしで失敗します。

次の例は、CSV ファイルとして DataFrame コンテンツでデータを上書きする方法を示しています。

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

次のステップ

Databricks で Spark 機能を活用するには、以下を参照してください。