I have been working with Apache Spark for around two years now and considered myself having fairly good knowledge of spark. But one day, somebody asked me about the closure in spark and I was blank!
Maybe, one of the reasons I was unaware of the Closure in spark is because I was working with the Spark DataFrame API’s too long, and operations at RDD level are like a black box to me. Well, it’s still no excuse to ignore a concept as important as closure. And trust me, it is IMPORTANT to understand closure in spark so that you don’t spend hours debugging a perfectly looking code, trying to figure out why it is not working as expected. Well enough talk, let’s get started.
All right my Big Data friends, what is wrong with the following piece of code?
total_sum = 0
rdd3 = sc.parallelize([1, 2, 3, 4, 5])
def increment_counter(x):
global total_sum
total_sum += x
rdd3.foreach(increment_counter)
print("total_sum value: ", total_sum)
Looks simple enough, right? But this code is WRONG as it might give unexpected results.
The reason behind this is the life cycle of the variable counter. Let’s say we have a spark cluster with 1 driver and 2 executor nodes. Now for the processing of the above code, Spark will break the above piece of code into tasks to be run on executors. For processing, the executors also need information regarding variables or methods, right? So this information is serialized by Spark and sent to each executor and is known as CLOSURE.
Now each executor has its own copy of the variables and methods, and all the updates will take place in that copy only. So, if my task is counting the elements in an RDD, the count in each executor’s copy will be updated corresponding to the data it is processing. But the counter on the driver will still be zero because the executors were modifying their own copies and the counter variable on the driver is untouched!
Now consider we are working on a single node cluster, where both the driver and executor are on the same machine. There might be a scenario the execution of the foreach
function by the executor will take place within the same JVM as the driver and the executor will reference the driver’s counter and might update it and we can get the actual count of the elements of the RDD.
But there are a lot of ifs and might’s in the above scenario and not how we expect our code to run. So, please don’t write your code like this!
What to do instead?
For scenarios where the updates on variables are split across various nodes across a cluster and we need a well-defined behaviour of our code. We can use Accumulators. As per Spark Docs,
In general, closures — constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.
Summing up, closure is those variables and methods which must be visible for the executor to perform its computations on the RDD. This closure is serialized and sent to each executor. Understanding closure is important to avoid any unexpected behaviour of the code. So, next time before updating a variable in a loop while dealing with spark, think about its scope and how Spark will break down the execution.
I will add the next article to discuss Accumulators and how they can be used to fix the code mentioned above and ensure a definite behaviour of the counter.
Comments
Post a Comment