Funções escalares definidas pelo usuário – Scala
Este artigo contém exemplos de UDF (função definida pelo usuário) do Scala. Ele mostra como registrar UDFs, como invocar UDFs e advertências sobre a ordem de avaliação de subexpressões no Spark SQL. Consulte Fnções escalares definidas pelo usuário (UDFs) para obter mais detalhes.
Observação
UDFs Scala em recursos de computação habilitados para o Catálogo do Unity com modo de acesso compartilhado requer o Databricks Runtime 14.2 e superior.
Registrar uma função como uma UDF
val squared = (s: Long) => {
s * s
}
spark.udf.register("square", squared)
Chamar a UDF no Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test
Usar UDF com DataFrames
import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))
Ordem de avaliação e verificação nula
O Spark SQL (inclusive a SQL, as APIs do conjunto de dados e do DataFrame) não garante a ordem de avaliação de subexpressões. Em especial, as entradas de um operador ou função não são necessariamente avaliadas da esquerda para a direita ou em qualquer outra ordem fixa. Por exemplo, as expressões lógicas AND
e OR
não têm semântica de "curto-circuito" da esquerda para a direita.
Portanto, é perigoso confiar nos efeitos colaterais ou na ordem de avaliação de expressões booleanas, e na ordem das cláusulas WHERE
e HAVING
, pois essas expressões e cláusulas podem ser reordenadas durante a otimização e o planejamento da consulta. Especificamente, se uma UDF depender de semântica de curto-circuito no SQL para verificação nula, não há nenhuma garantia de que a verificação nula ocorrerá antes de invocar a UDF. Por exemplo,
spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee
Essa cláusula WHERE
não garante que a UDF strlen
seja invocada após a filtragem de nulos.
Para executar a verificação nula adequada, é recomendável que você faça o seguinte:
- Fazer a própria UDF reconhecer nulos e verificar nulos dentro da própria UDF
- Usar
IF
expressões ouCASE WHEN
para fazer a verificação nula e invocar a UDF em uma ramificação condicional
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
APIs do conjunto de dados tipado
Observação
O suporte para UDFs Scala em clusters habilitados para o Catálogo do Unity com modo de acesso compartilhado requer o Databricks Runtime 15.4 e superior.
As APIs do conjunto de dados tipado permitem a você executar transformações como mapa, filtro e agregações nos conjuntos de dados resultantes com uma função definida pelo usuário.
Por exemplo, o aplicativo Scala a seguir usa a API map()
para modificar um número em uma coluna de resultado para uma sequência de caracteres prefixada.
spark.range(3).map(f => s"row-$f").show()
Embora este exemplo use a API map()
, ele também se aplica a outras APIs do conjunto de dados tipado, como filter()
, mapPartitions()
, foreach()
, foreachPartition()
, reduce()
e flatMap()
.