Applying a custom function on PySpark Columns with user-defined functions
Start your free 7-days trial now!
What is a user-defined function in PySpark?
PySpark comes with a rich set of built-in functions that you can leverage to implement most tasks, but there may be cases when you would have to roll out your own custom function. In PySpark, we can easily register a custom function that takes as input a column value and returns an updated value. This guide will go over how we can register a user-defined function and use it to manipulate data in PySpark.
Applying a custom function on a column
Consider the following PySpark DataFrame:
+-----+---+| name|age|+-----+---+| Alex| 10|| Bob| 20||Cathy| 30|+-----+---+
Let's define a custom function that takes in as argument a single column value:
def to_upper(some_string):
Here, our custom to_upper(~)
function returns the uppercased version of the input string.
Now, we must register our custom function using the udf(~)
function, which returns a function that can be used just like any other function in the pyspark.sql.functions
library:
This basic example is only for demonstration - there already exists a built-in function upper(~)
in the pyspark.sql.functions
library that uppercases string values:
User-defined functions are treated as black-box by PySpark so these functions cannot be optimized under the hood. Therefore, use built-in functions whenever possible and define custom functions only when necessary.
Applying a custom function on multiple columns
We can easily extend our user-defined function such that it takes multiple columns as argument:
+--------------------+| my_func(name, age)|+--------------------+|Alex is 10 years old|| Bob is 20 years old||Cathy is 30 years...|+--------------------+
Here, note the following:
our custom function
my_func(~)
now takes in two column valueswhen calling
my_udf(~)
, we now pass in two columns
Specifying the resulting column type
By default, the column returned will always be of type string regardless of the actual return type of your custom function. For instance, consider the following custom function:
Here, the return type of our function my_double(~)
is obviously an integer, but the resulting column type is actually set to a string:
root |-- my_double(age): string (nullable = true)
We can specify the resulting column type using the second argument in udf(~)
:
root |-- my_double(age): integer (nullable = true)
Here, we have indicated that the resulting column type should be integer.
Equivalently, we could also import an explicit PySpark type like so:
from pyspark.sql.types import IntegerTypeudf_double = udf(my_double, IntegerType())
root |-- my_double(age): integer (nullable = true)
Calling user-defined functions in SQL expressions
To use user-defined functions in SQL expressions, register the custom function using spark.udf.register(~)
:
def to_upper(some_string):
spark.udf.register('udf_upper', to_upper)
+---------------+|udf_upper(name)|+---------------+| ALEX|| BOB|| CATHY|+---------------+
Here, the method selectExpr(~)
method takes in as argument a SQL expression.
We could also register the DataFrame as a SQL table so that we can run full SQL expressions like so:
# Register PySpark DataFrame as a SQL tabledf.createOrReplaceTempView('my_table')
+---------------+|udf_upper(name)|+---------------+| ALEX|| BOB|| CATHY|+---------------+
Specifying the return type
Again, the type of the resulting column is string regardless of what your custom function returns. Just like we did earlier when registering with udf(~)
, we can specify the type of the returned column like so:
def my_double(int_age): return 2 * int_age
spark.udf.register('udf_double', my_double, 'int')
root |-- udf_double(age): integer (nullable = true)
Equivalently, we could import the explicit type from pyspark.sql.types
:
from pyspark.sql.types import IntegerTypespark.udf.register('udf_double', my_double, IntegerType())
root |-- udf_double(age): integer (nullable = true)
Limitations of user-defined functions
Ordering of execution in sub-expressions is not fixed
The ordering in which sub-expressions in SQL (e.g. WHERE
and HAVING
) are performed is not guaranteed. As an example, consider the following:
spark.udf.register('my_double', lambda val: 2 * val, 'int')
Here, we have the sub-expression defined by WHERE
that specifies two conditions linked using the AND
clause. There is no guarantee that the SQL parser will check age IS NOT NULL
before my_double(age)>5
. This means that the input supplied to our custom function my_double(~)
may be null, which can cause your custom function to break if you do not handle this case specifically.
The way to get around this problem is to use an IF
statement that guarantees the ordering of execution:
spark.udf.register('my_double', lambda val: 2 * val, 'int')spark.sql('SELECT * from my_table WHERE IF(age IS NOT NULL, my_double(age) > 5, null) IS NOT NULL').show()
Here, the input supplied to our my_double(~)
function is guaranteed to be not null.
Slow compared to built-in PySpark functions
Since PySpark does not know how to optimize user-defined functions, these functions will always be slower compared to built-in functions. Therefore, only turn to user-defined functions when built-in functions cannot be used to achieve your task.