search
Search
Login
Unlock 100+ guides
menu
menu
web
search toc
close
Comments
Log in or sign up
Cancel
Post
account_circle
Profile
exit_to_app
Sign out
What does this mean?
Why is this true?
Give me some examples!
search
keyboard_voice
close
Searching Tips
Search for a recipe:
"Creating a table in MySQL"
Search for an API documentation: "@append"
Search for code: "!dataframe"
Apply a tag filter: "#python"
Useful Shortcuts
/ to open search panel
Esc to close search panel
to navigate between search results
d to clear all current filters
Enter to expand content preview
icon_star
Doc Search
icon_star
Code Search Beta
SORRY NOTHING FOUND!
mic
Start speaking...
Voice search is only supported in Safari and Chrome.
Navigate to

PySpark RDD | map method

schedule Aug 12, 2023
Last updated
local_offer
PySpark
Tags
mode_heat
Master the mathematics behind data science with 100+ top-tier guides
Start your free 7-days trial now!

PySpark RDD's map(~) method applies a function on each element of the RDD.

Parameters

1. f | function

The function to apply.

2. preservesPartitioning | boolean | optional

Whether or not to let Spark assume that partitioning is still valid. This is only relevant to PairRDD. Consult examples below for clarification. By default, preservesPartitioning=False.

Return Value

A PySpark RDD (pyspark.rdd.PipelinedRDD).

Examples

Applying a function to each element of RDD

To make all values in the RDD lowercased:

# Create a RDD with 5 partitions
rdd = sc.parallelize(["A","B","C","D","E","F"], numSlices=5)
new_rdd = rdd.map(lambda x: x.lower())
new_rdd.collect()
['a', 'b', 'c', 'd', 'e', 'f']

Preserving partition while applying the map method to RDD

The preservesPartitioning parameter only comes into play when the RDD contains a list of tuples (pair RDD).

When a RDD is re-partitioned via partitionBy(~) (using a hash partitioner), we guarantee that the tuples with the same key end up in the same partition:

rdd = sc.parallelize([("A",1),("B",1),("C",1),("A",1),("D",1)], numSlices=3)
new_rdd = rdd.partitionBy(numPartitions=2)
new_rdd.glom().collect()
[[('C', 1)], [('A', 1), ('B', 1), ('A', 1), ('D', 1)]]

Indeed, we see that the tuple ('A',1) and ('A',1) lie in the same partition.

Let us now perform a map(~) operation with preservesPartitioning set to False (default):

mapped_rdd = new_rdd.map(lambda my_tuple: (my_tuple[0], my_tuple[1]+3))
mapped_rdd.glom().collect()
[[('C', 4)], [('A', 4), ('B', 4), ('A', 4), ('D', 4)]]

Here, we are applying a map(~) that returns a tuple with the same key, but with a different value. We can see that the partitioning has not changed. Behind the scenes, however, Spark internally has a flag that indicates whether or not the partitioning has been destroyed, and this flag has now been set to True (i.e. partitioning has been destroyed) due to setting preservesPartitioning=False by default. This is naive of Spark to do so, since the tuples key have not been changed, and so the partitioning should still be valid.

We can confirm that Spark is now naively unaware that the data is partitioned by the tuple key by performing a shuffling operation like reduceByKey(~):

mapped_rdd_reduced = mapped_rdd.reduceByKey(lambda x: x+y)
print(mapped_rdd_reduced.toDebugString().decode("utf-8"))
(2) PythonRDD[238] at RDD at PythonRDD.scala:58 []
| MapPartitionsRDD[237] at mapPartitions at PythonRDD.scala:183 []
| ShuffledRDD[236] at partitionBy at <unknown>:0 []
+-(2) PairwiseRDD[235] at reduceByKey at <command-1339085475381822>:1 []
| PythonRDD[234] at reduceByKey at <command-1339085475381822>:1 []
| MapPartitionsRDD[223] at mapPartitions at PythonRDD.scala:183 []
| ShuffledRDD[222] at partitionBy at <unknown>:0 []
+-(3) PairwiseRDD[221] at partitionBy at <command-1339085475381815>:2 []
| PythonRDD[220] at partitionBy at <command-1339085475381815>:2 []
| ParallelCollectionRDD[219] at readRDDFromInputStream at PythonRDD.scala:413 []

You can see that a shuffling has indeed occurred. However, this is completely unnecessary because we know that the tuples with the same key reside in the same partition (machine), and so this operation can be done locally.

Now, consider the case when we set preservesPartitioning to True:

mapped_rdd_preserved = new_rdd.map(lambda my_tuple: (my_tuple[0], my_tuple[1]+3), preservesPartitioning=True)
mapped_rdd_preserved_reduced = mapped_rdd_preserved.reduceByKey(lambda x: x+y)
print(mapped_rdd_preserved_reduced.toDebugString().decode("utf-8"))
(2) PythonRDD[239] at RDD at PythonRDD.scala:58 []
| MapPartitionsRDD[223] at mapPartitions at PythonRDD.scala:183 []
| ShuffledRDD[222] at partitionBy at <unknown>:0 []
+-(3) PairwiseRDD[221] at partitionBy at <command-1339085475381815>:2 []
| PythonRDD[220] at partitionBy at <command-1339085475381815>:2 []
| ParallelCollectionRDD[219] at readRDDFromInputStream at PythonRDD.scala:413 []

We can see that no shuffling has occurred. This is because we tell Spark that we have only changed the value of the tuple, and not the key, and so Spark should assume that the original partitioning is kept intact.

robocat
Published by Isshin Inada
Edited by 0 others
Did you find this page useful?
thumb_up
thumb_down
Comment
Citation
Ask a question or leave a feedback...
thumb_up
1
thumb_down
0
chat_bubble_outline
0
settings
Enjoy our search
Hit / to insta-search docs and recipes!