search
Search
Join our weekly DS/ML newsletter layers DS/ML Guides
menu
menu search toc more_vert
Robocat
Guest 0reps
Thanks for the thanks!
close
Comments
Log in or sign up
Cancel
Post
account_circle
Profile
exit_to_app
Sign out
help Ask a question
Share on Twitter
search
keyboard_voice
close
Searching Tips
Search for a recipe:
"Creating a table in MySQL"
Search for an API documentation: "@append"
Search for code: "!dataframe"
Apply a tag filter: "#python"
Useful Shortcuts
/ to open search panel
Esc to close search panel
to navigate between search results
d to clear all current filters
Enter to expand content preview
icon_star
Doc Search
icon_star
Code Search Beta
SORRY NOTHING FOUND!
mic
Start speaking...
Voice search is only supported in Safari and Chrome.
Navigate to
A
A
brightness_medium
share
arrow_backShare
Twitter
Facebook
0
thumb_down
0
chat_bubble_outline
0
auto_stories new
settings

Applying a custom function on PySpark Columns with user-defined functions

Machine Learning
chevron_right
PySpark
chevron_right
PySpark Guides
schedule Jul 8, 2022
Last updated
local_offer PySpark
Tags

What is a user-defined function in PySpark?

PySpark comes with a rich set of built-in functions that you can leverage to implement most tasks, but there may be cases when you would have to roll out your own custom function. In PySpark, we can easily register a custom function that takes as input a column value and returns an updated value. This guide will go over how we can register a user-defined function and use it to manipulate data in PySpark.

Applying a custom function on a column

Consider the following PySpark DataFrame:

df = spark.createDataFrame([['Alex',10], ['Bob',20], ['Cathy',30]], ['name','age'])
df.show()
+-----+---+
| name|age|
+-----+---+
| Alex| 10|
| Bob| 20|
|Cathy| 30|
+-----+---+

Let's define a custom function that takes in as argument a single column value:

def to_upper(some_string):
return some_string.upper()

Here, our custom to_upper(~) function returns the uppercased version of the input string.

Now, we must register our custom function using the udf(~) function, which returns a function that can be used just like any other function in the pyspark.sql.functions library:

from pyspark.sql.functions import udf
# Register our custom function
udf_upper = udf(to_upper)
# We can use our custom function just like we would for any SQL function
df.select(udf_upper('name')).show()
+--------------+
|to_upper(name)|
+--------------+
| ALEX|
| BOB|
| CATHY|
+--------------+
WARNING

This basic example is only for demonstration - there already exists a built-in function upper(~) in the pyspark.sql.functions library that uppercases string values:

from pyspark.sql import functions as F
df.select(F.upper('name')).show()
+-----------+
|upper(name)|
+-----------+
| ALEX|
| BOB|
| CATHY|
+-----------+

User-defined functions are treated as black-box by PySpark so these functions cannot be optimized under the hood. Therefore, use built-in functions whenever possible and define custom functions only when necessary.

Applying a custom function on multiple columns

We can easily extend our user-defined function such that it takes multiple columns as argument:

# Takes in as argument two column values
def my_func(str_name, int_age):
return f'{str_name} is {int_age} years old'

my_udf = udf(my_func)
# Pass in two columns to our my_udf
df_result = df.select(my_udf('name', 'age'))
df_result.show()
+--------------------+
| my_func(name, age)|
+--------------------+
|Alex is 10 years old|
| Bob is 20 years old|
|Cathy is 30 years...|
+--------------------+

Here, note the following:

  • our custom function my_func(~) now takes in two column values

  • when calling my_udf(~), we now pass in two columns

Specifying the resulting column type

By default, the column returned will always be of type string regardless of the actual return type of your custom function. For instance, consider the following custom function:

def my_double(int_age):
return 2 * int_age

# Register the function
udf_double = udf(my_double)
df_result = df.select(udf_double('age'))
df_result.show()
+--------------+
|my_double(age)|
+--------------+
| 20|
| 40|
| 60|
+--------------+

Here, the return type of our function my_double(~) is obviously an integer, but the resulting column type is actually set to a string:

df_result.printSchema()
root
|-- my_double(age): string (nullable = true)

We can specify the resulting column type using the second argument in udf(~):

udf_double = udf(my_double, 'int')
df_result = df.select(udf_double('age'))
df_result.printSchema()
root
|-- my_double(age): integer (nullable = true)

Here, we have indicated that the resulting column type should be integer.

Equivalently, we could also import an explicit PySpark type like so:

from pyspark.sql.types import IntegerType
udf_double = udf(my_double, IntegerType())
df_result = df.select(udf_double('age'))
df_result.printSchema()
root
|-- my_double(age): integer (nullable = true)

Calling user-defined functions in SQL expressions

To use user-defined functions in SQL expressions, register the custom function using spark.udf.register(~):

def to_upper(some_string):
return some_string.upper()

spark.udf.register('udf_upper', to_upper)
df.selectExpr('udf_upper(name)').show()
+---------------+
|udf_upper(name)|
+---------------+
| ALEX|
| BOB|
| CATHY|
+---------------+

Here, the method selectExpr(~) method takes in as argument a SQL expression.

We could also register the DataFrame as a SQL table so that we can run full SQL expressions like so:

# Register PySpark DataFrame as a SQL table
df.createOrReplaceTempView('my_table')
spark.sql('SELECT udf_upper(name) FROM my_table').show()
+---------------+
|udf_upper(name)|
+---------------+
| ALEX|
| BOB|
| CATHY|
+---------------+

Specifying the return type

Again, the type of the resulting column is string regardless of what your custom function returns. Just like we did earlier when registering with udf(~), we can specify the type of the returned column like so:

def my_double(int_age):
return 2 * int_age

spark.udf.register('udf_double', my_double, 'int')
df.selectExpr('udf_double(age)').printSchema()
root
|-- udf_double(age): integer (nullable = true)

Equivalently, we could import the explicit type from pyspark.sql.types:

from pyspark.sql.types import IntegerType
spark.udf.register('udf_double', my_double, IntegerType())
df.selectExpr('udf_double(age)').printSchema()
root
|-- udf_double(age): integer (nullable = true)

Limitations of user-defined functions

Ordering of execution in sub-expressions is not fixed

The ordering in which sub-expressions in SQL (e.g. WHERE and HAVING) are performed is not guaranteed. As an example, consider the following:

spark.udf.register('my_double', lambda val: 2 * val, 'int')
spark.sql('SELECT * from my_table WHERE age IS NOT NULL AND my_double(age) > 5').show()

Here, we have the sub-expression defined by WHERE that specifies two conditions linked using the AND clause. There is no guarantee that the SQL parser will check age IS NOT NULL before my_double(age)>5. This means that the input supplied to our custom function my_double(~) may be null, which can cause your custom function to break if you do not handle this case specifically.

The way to get around this problem is to use an IF statement that guarantees the ordering of execution:

spark.udf.register('my_double', lambda val: 2 * val, 'int')
spark.sql('SELECT * from my_table WHERE IF(age IS NOT NULL, my_double(age) > 5, null) IS NOT NULL').show()

Here, the input supplied to our my_double(~) function is guaranteed to be not null.

Slow compared to built-in PySpark functions

Since PySpark does not know how to optimize user-defined functions, these functions will always be slower compared to built-in functions. Therefore, only turn to user-defined functions when built-in functions cannot be used to achieve your task.

mail
Join our newsletter for updates on new DS/ML comprehensive guides (spam-free)
robocat
Published by Isshin Inada
Edited by 0 others
Did you find this page useful?
Ask a question or leave a feedback...