Skip to main content

Closure in Apache Spark

 

I have been working with Apache Spark for around two years now and considered myself having fairly good knowledge of spark. But one day, somebody asked me about the closure in spark and I was blank!

Maybe, one of the reasons I was unaware of the Closure in spark is because I was working with the Spark DataFrame API’s too long, and operations at RDD level are like a black box to me. Well, it’s still no excuse to ignore a concept as important as closure. And trust me, it is IMPORTANT to understand closure in spark so that you don’t spend hours debugging a perfectly looking code, trying to figure out why it is not working as expected. Well enough talk, let’s get started.

All right my Big Data friends, what is wrong with the following piece of code?

total_sum = 0
rdd3 = sc.parallelize([1, 2, 3, 4, 5])
def increment_counter(x):
    global total_sum
    total_sum += x
rdd3.foreach(increment_counter)
print("total_sum value: ", total_sum)

Looks simple enough, right? But this code is WRONG as it might give unexpected results.

The reason behind this is the life cycle of the variable counter. Let’s say we have a spark cluster with 1 driver and 2 executor nodes. Now for the processing of the above code, Spark will break the above piece of code into tasks to be run on executors. For processing, the executors also need information regarding variables or methods, right? So this information is serialized by Spark and sent to each executor and is known as CLOSURE.

Now each executor has its own copy of the variables and methods, and all the updates will take place in that copy only. So, if my task is counting the elements in an RDD, the count in each executor’s copy will be updated corresponding to the data it is processing. But the counter on the driver will still be zero because the executors were modifying their own copies and the counter variable on the driver is untouched!

Now consider we are working on a single node cluster, where both the driver and executor are on the same machine. There might be a scenario the execution of the foreach function by the executor will take place within the same JVM as the driver and the executor will reference the driver’s counter and might update it and we can get the actual count of the elements of the RDD.

But there are a lot of ifs and might’s in the above scenario and not how we expect our code to run. So, please don’t write your code like this!

What to do instead?

For scenarios where the updates on variables are split across various nodes across a cluster and we need a well-defined behaviour of our code. We can use Accumulators. As per Spark Docs,

In general, closures — constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.

Summing up, closure is those variables and methods which must be visible for the executor to perform its computations on the RDD. This closure is serialized and sent to each executor. Understanding closure is important to avoid any unexpected behaviour of the code. So, next time before updating a variable in a loop while dealing with spark, think about its scope and how Spark will break down the execution.

I will add the next article to discuss Accumulators and how they can be used to fix the code mentioned above and ensure a definite behaviour of the counter.

Comments

Popular posts from this blog

Apache Spark Discretized Streams (DStreams) with Pyspark

Apache Spark Discretized Streams (DStreams) with Pyspark SPARK STREAMING What is Streaming ? Try to imagine this; in every single second , nearly 9,000 tweets are sent , 1000 photos are uploaded on instagram, over 2,000,000 emails are sent and again nearly 80,000 searches are performed according to Internet Live Stats. So many data is generated without stopping from many sources and sent to another sources simultaneously in small packages. Many applications also generate consistently-updated data like sensors used in robotics, vehicles and many other industrial and electronical devices stream data for monitoring the progress and the performance. That’s why great numbers of generated data in every second have to be processed and analyzed rapidly in real time which means “ Streaming ”. DStreams Spark DStream (Discretized Stream) is the basic concept of Spark Streaming. DStream is a continuous stream of data.The data stream receives input from different kind of sources like Kafka, Kinesis...

6 Rules of Thumb for MongoDB Schema Design

“I have lots of experience with SQL and normalized databases, but I’m just a beginner with MongoDB. How do I model a one-to-N relationship?” This is one of the more common questions I get from users attending MongoDB office hours. I don’t have a short answer to this question, because there isn’t just one way, there’s a whole rainbow’s worth of ways. MongoDB has a rich and nuanced vocabulary for expressing what, in SQL, gets flattened into the term “One-to-N.” Let me take you on a tour of your choices in modeling One-to-N relationships. There’s so much to talk about here, In this post, I’ll talk about the three basic ways to model One-to-N relationships. I’ll also cover more sophisticated schema designs, including denormalization and two-way referencing. And I’ll review the entire rainbow of choices, and give you some suggestions for choosing among the thousands (really, thousands) of choices that you may consider when modeling a single One-to-N relationship. Jump the end of the post ...

Khác nhau giữa các chế độ triển khai giữa Local, Standalone và YARN trong Spark

Trong Apache Spark, có ba chế độ triển khai chính: Local, Standalone và YARN. Dưới đây là sự khác biệt giữa chúng: Chế độ triển khai Local: Chế độ triển khai Local là chế độ đơn giản nhất và được sử dụng cho môi trường phát triển và kiểm thử. Khi chạy trong chế độ Local, Spark sẽ chạy trên một máy tính duy nhất bằng cách sử dụng tất cả các luồng CPU có sẵn trên máy đó. Đây là chế độ phù hợp cho các tác vụ nhỏ và không yêu cầu phân tán dữ liệu. Chế độ triển khai Standalone: Chế độ triển khai Standalone cho phép bạn triển khai một cụm Spark độc lập bao gồm nhiều máy tính. Trong chế độ này, một máy tính được chọn làm "Spark Master" và các máy tính khác được kết nối với Spark Master như là "Spark Workers". Spark Master quản lý việc phân phối công việc và quản lý tài nguyên giữa các Spark Workers. Chế độ Standalone phù hợp cho triển khai Spark trên các cụm máy tính riêng lẻ mà không có hệ thống quản lý cụm chuyên dụng. Chế độ triển khai YARN: YARN (Yet Another Resource N...