PySpark RDD | map method
Start your free 7-days trial now!
PySpark RDD's map(~)
method applies a function on each element of the RDD.
Parameters
1. f
| function
The function to apply.
2. preservesPartitioning
| boolean
| optional
Whether or not to let Spark assume that partitioning is still valid. This is only relevant to PairRDD
. Consult examples below for clarification. By default, preservesPartitioning=False
.
Return Value
A PySpark RDD (pyspark.rdd.PipelinedRDD
).
Examples
Applying a function to each element of RDD
To make all values in the RDD lowercased:
# Create a RDD with 5 partitionsnew_rdd = rdd.map(lambda x: x.lower())
['a', 'b', 'c', 'd', 'e', 'f']
Preserving partition while applying the map method to RDD
The preservesPartitioning
parameter only comes into play when the RDD contains a list of tuples (pair RDD).
When a RDD is re-partitioned via partitionBy(~)
(using a hash partitioner), we guarantee that the tuples with the same key end up in the same partition:
[[('C', 1)], [('A', 1), ('B', 1), ('A', 1), ('D', 1)]]
Indeed, we see that the tuple ('A',1)
and ('A',1)
lie in the same partition.
Let us now perform a map(~)
operation with preservesPartitioning
set to False
(default):
Here, we are applying a map(~)
that returns a tuple with the same key, but with a different value. We can see that the partitioning has not changed. Behind the scenes, however, Spark internally has a flag that indicates whether or not the partitioning has been destroyed, and this flag has now been set to True
(i.e. partitioning has been destroyed) due to setting preservesPartitioning=False
by default. This is naive of Spark to do so, since the tuples key have not been changed, and so the partitioning should still be valid.
We can confirm that Spark is now naively unaware that the data is partitioned by the tuple key by performing a shuffling operation like reduceByKey(~)
:
(2) PythonRDD[238] at RDD at PythonRDD.scala:58 [] | MapPartitionsRDD[237] at mapPartitions at PythonRDD.scala:183 [] | ShuffledRDD[236] at partitionBy at <unknown>:0 [] +-(2) PairwiseRDD[235] at reduceByKey at <command-1339085475381822>:1 [] | PythonRDD[234] at reduceByKey at <command-1339085475381822>:1 [] | MapPartitionsRDD[223] at mapPartitions at PythonRDD.scala:183 [] | ShuffledRDD[222] at partitionBy at <unknown>:0 [] +-(3) PairwiseRDD[221] at partitionBy at <command-1339085475381815>:2 [] | PythonRDD[220] at partitionBy at <command-1339085475381815>:2 [] | ParallelCollectionRDD[219] at readRDDFromInputStream at PythonRDD.scala:413 []
You can see that a shuffling has indeed occurred. However, this is completely unnecessary because we know that the tuples with the same key reside in the same partition (machine), and so this operation can be done locally.
Now, consider the case when we set preservesPartitioning
to True
:
mapped_rdd_preserved = new_rdd.map(lambda my_tuple: (my_tuple[0], my_tuple[1]+3), preservesPartitioning=True)mapped_rdd_preserved_reduced = mapped_rdd_preserved.reduceByKey(lambda x: x+y)
(2) PythonRDD[239] at RDD at PythonRDD.scala:58 [] | MapPartitionsRDD[223] at mapPartitions at PythonRDD.scala:183 [] | ShuffledRDD[222] at partitionBy at <unknown>:0 [] +-(3) PairwiseRDD[221] at partitionBy at <command-1339085475381815>:2 [] | PythonRDD[220] at partitionBy at <command-1339085475381815>:2 [] | ParallelCollectionRDD[219] at readRDDFromInputStream at PythonRDD.scala:413 []
We can see that no shuffling has occurred. This is because we tell Spark that we have only changed the value of the tuple, and not the key, and so Spark should assume that the original partitioning is kept intact.