Funções escalares definidas pelo usuário – Python

Este artigo contém exemplos de UDF (função definida pelo usuário) do Python. Ele mostra como registrar UDFs, como invocar UDFs e fornece advertências sobre a ordem de avaliação de subexpressões no Spark SQL.

No Databricks Runtime 14.0 e superior, você pode usar funções de tabela definidas pelo usuário (UDTFs) do Python para registrar funções que retornam relações inteiras em vez de valores escalares. Consulte Funções de tabela definidas pelo usuário (UDTFs) em Python.

Observação

No Databricks Runtime 12.2 LTS e inferior, não há suporte para UDFs do Python e do Pandas na computação do Catálogo do Unity que usa o modo de acesso compartilhado. As UDFs escalares do Python e as UDFs do Pandas têm suporte no Databricks Runtime 13.3 LTS e superior para todos os modos de acesso.

No Databricks Runtime 13.3 LTS e versões superiores, é possível registrar UDFs escalares do Python no Catálogo do Unity usando a sintaxe SQL. Consulte UDFs (funções definidas pelo usuário) no Catálogo do Unity.

Registrar uma função como uma UDF

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

Opcionalmente, você pode definir o tipo de retorno de seu UDF. O tipo de retorno padrão é StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Chamar a UDF no Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Usar UDF com DataFrames

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Como alternativa, você pode declarar o mesmo UDF usando a sintaxe de anotação:

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Ordem de avaliação e verificação nula

SQL do Spark (incluindo SQL e o DataFrame e a API do Dataset) não garante a ordem de avaliação de subexpressões. Em especial, as entradas de um operador ou função não são necessariamente avaliadas da esquerda para a direita ou em qualquer outra ordem fixa. Por exemplo, as expressões lógicas AND e OR não têm semântica de "curto-circuito" da esquerda para a direita.

Portanto, é perigoso confiar nos efeitos colaterais ou na ordem de avaliação de expressões booleanas, e na ordem das cláusulas WHERE e HAVING, pois essas expressões e cláusulas podem ser reordenadas durante a otimização e o planejamento da consulta. Especificamente, se uma UDF depender de semântica de curto-circuito no SQL para verificação nula, não há nenhuma garantia de que a verificação nula ocorrerá antes de invocar a UDF. Por exemplo,

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Essa cláusula WHERE não garante que a UDF strlen seja invocada após a filtragem de nulos.

Para executar a verificação nula adequada, é recomendável que você faça o seguinte:

  • Fazer a própria UDF reconhecer nulos e verificar nulos dentro da própria UDF
  • Usar IF expressões ou CASE WHEN para fazer a verificação nula e invocar a UDF em uma ramificação condicional
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Limitações

  • UDFs do PySpark em clusters compartilhados ou computação sem servidor não podem acessar pastas Git, arquivos do workspace ou Volumes UC para importar módulos no Databricks Runtime 14.2 e versões anteriores.