ユーザー定義関数 (UDF) とは

ユーザー定義関数 (UDF) はユーザーによって定義された関数であり、ユーザー環境でカスタム ロジックを再利用できます。 Azure Databricks では、拡張可能なロジックを配布できるように、さまざまな種類の UDF がサポートされています。 この記事では、UDF の一般的な長所と制限事項をいくつか紹介します。

注意

Azure Databricks 上のすべての実行環境ですべての形式の UDF を使用できるわけではありません。 Unity Catalog を使用している場合は、「Unity Catalog のユーザー定義関数 (UDF)」を参照してください。

シリアル化のペナルティを伴わないカスタム ロジックの定義

Azure Databricks は、その UDF 動作の多くを Apache Spark から継承します。これには、多くの UDF の種類についての効率性に関する制限事項が含まれます。 「最も効率的な UDF はどれか」を参照してください。

UDF に関連する潜在的な効率性のトレードオフを気にすることなく、コードを安全にモジュール化できます。 そのためには、SQL または Spark DataFrames を使用して、ロジックを一連の Spark 組み込みメソッドとして定義する必要があります。 たとえば、次の SQL と Python の関数は、Spark の組み込みメソッドを組み合わせて、ユニット変換を再利用可能な関数として定義します。

SQL

CREATE FUNCTION convert_f_to_c(unit STRING, temp DOUBLE)
RETURNS DOUBLE
RETURN CASE
  WHEN unit = "F" THEN (temp - 32) * (5/9)
  ELSE temp
END;

SELECT convert_f_to_c(unit, temp) AS c_temp
FROM tv_temp;

Python

def convertFtoC(unitCol, tempCol):
  from pyspark.sql.functions import when
  return when(unitCol == "F", (tempCol - 32) * (5/9)).otherwise(tempCol)

from pyspark.sql.functions import col

df_query = df.select(convertFtoC(col("unit"), col("temp"))).toDF("c_temp")
display(df_query)

上記の UDF を実行するために、サンプル データを作成できます。

最も効率的な UDF はどれか

UDF により、コードの実行に重大な処理ボトルネックが発生する可能性があります。 Azure Databricks では、含まれている Apache Spark、SQL、Delta Lake 構文を使用して記述されたコードに対してさまざまなオプティマイザーが自動的に使用されます。 カスタム ロジックが UDF によって導入されると、これらのオプティマイザーには、このカスタム ロジックに関するタスクを効率的に計画する機能がありません。 さらに、JVM の外部で実行されるロジックには、データのシリアル化に関する追加コストが発生します。

注意

Photon 対応コンピューティングを使用する場合、Azure Databricks では Photon を使用して多くの関数が最適化されます。 Photon によって最適化できるのは、DataFrame コマンドの Spark SQL を連結する関数のみです。

UDF の中には、他のものよりも効率的なものもあります。 パフォーマンスの観点から:

  • Azure Databricks オプティマイザーにより、組み込みの関数が最も高速になります。
  • JVM (Scala、Java、Hive UDF) で実行されるコードは、Python UDF よりも高速になります。
  • Pandas UDF は、Arrow を使用して、Python UDF に関連するシリアル化コストを削減します。
  • Python UDF は手続き型のロジックには適していますが、大規模なデータセットでの運用 ETL ワークロードでは避ける必要があります。

Note

Databricks Runtime 12.2 LTS 以前では、Python スカラー UDF と Pandas UDF は、共有アクセス モードを使用するクラスターの Unity Catalog ではサポートされていません。 これらの UDF は、Databricks Runtime 13.3 LTS 以降では、すべてのアクセス モードでサポートされています。

Databricks Runtime 14.1 以下では、Scala スカラー UDF は、共有アクセス モードを使用するクラスターの Unity Catalog ではサポートされていません。 これらの UDF は、Databricks Runtime 14.2 以降では、すべてのアクセス モードでサポートされています。

Databricks Runtime 13.3 LTS 以降では、SQL 構文を使用してスカラー Python UDF を Unity Catalog に登録できます。 「Unity Catalog のユーザー定義関数 (UDF)」を参照してください。

Type 最適化 実行環境
Hive UDF いいえ JVM
Python UDF いいえ Python
Pandas UDF いいえ Python (Arrow)
Scala UDF いいえ JVM
Spark SQL はい JVM (Photon)
Spark DataFrame はい JVM (Photon)

UDF を使用する必要がある場合

UDF の主な利点は、ユーザーが使い慣れた言語でロジックを表現できるため、コードのリファクタリングに関連する人的コストを削減できることです。 アドホック クエリ、手動データ クレンジング、探索的データ分析、小規模または中規模のデータセットに対するほとんどの操作では、UDF に関連する待機時間のオーバーヘッド コストが、コードのリファクタリングに関連するコストを上回ることはほとんどありません。

ETL ジョブ、ストリーミング操作、非常に大規模なデータセットに対する操作、定期的または継続的に実行されるその他のワークロードの場合、ロジックをリファクタリングしてネイティブ Apache Spark メソッドを使用すると、すぐに利益が得られます。

サンプル UDF のサンプル データ

この記事のコード例では、UDF を使用して摂氏と華氏の間で温度を変換しています。 これらの関数を実行する場合は、次の Python コードを使用してサンプル データセットを作成できます。

import numpy as np
import pandas as pd

Fdf = pd.DataFrame(np.random.normal(55, 25, 10000000), columns=["temp"])
Fdf["unit"] = "F"

Cdf = pd.DataFrame(np.random.normal(10, 10, 10000000), columns=["temp"])
Cdf["unit"] = "C"

df = spark.createDataFrame(pd.concat([Fdf, Cdf]).sample(frac=1))

df.cache().count()
df.createOrReplaceTempView("tv_temp")