Skip to main content

Spark Join Strategies — How & What?

 While dealing with data, we have all dealt with different kinds of joins, be it inner, outer, left or (maybe)left-semi. This article covers the different join strategies employed by Spark to perform the join operation. Knowing spark join internals comes in handy to optimize tricky join operations, in finding root cause of some out of memory errors, and for improved performance of spark jobs(we all want that, don’t we?). Please read on to find out.


Spark Join Strategies:

Broadcast Hash Join

Before beginning the Broadcast Hash join spark, let’s first understand Hash Join, in general:

Hash Join

As the name suggests, Hash Join is performed by first creating a Hash Table based on join_key of smaller relation and then looping over larger relation to match the hashed join_key values. Also, this is only supported for ‘=’ join.

In spark, Hash Join plays a role at per node level and the strategy is used to join partitions available on the node.

Now, coming to Broadcast Hash Join.

Broadcast Hash Join

In broadcast hash join, copy of one of the join relations are being sent to all the worker nodes and it saves shuffling cost. This is useful when you are joining a large relation with a smaller one. It is also known as map-side join(associating worker nodes with mappers).

Spark deploys this join strategy when the size of one of the join relations is less than the threshold values(default 10 M). The spark property which defines this threshold is spark.sql.autoBroadcastJoinThreshold(configurable).

Broadcast relations are shared among executors using the BitTorrent protocol(read more here). It is a peer to peer protocol in which block of files can be shared by peers amongst each other. Hence, they don’t need to rely on a single node. This is how peer to peer protocol works:

Peer to Peer protocol

Things to Note:

  • The broadcasted relation should fit completely into the memory of each executor as well as the driver. In Driver, because driver will start the data transfer.
  • Only supported for ‘=’ join.
  • Supported for all join types(inner, left, right) except full outer joins.
  • When the broadcast size is small, it is usually faster than other join strategies.
  • Copy of relation is broadcasted over the network. Therefore, being a network-intensive operation could cause out of memory errors or performance issues when broadcast size is big(for instance, when explicitly specified to use broadcast join/changes in the default threshold).
  • You can’t make changes to the broadcasted relation, after broadcast. Even if you do, they won’t be available to the worker nodes(because the copy is already shipped).

Shuffle hash join

Shuffle Hash Join

Shuffle Hash Join involves moving data with the same value of join key in the same executor node followed by Hash Join(explained above). Using the join condition as output key, data is shuffled amongst executor nodes and in the last step, data is combined using Hash Join, as we know data of the same key will be present in the same executor.

Things to Note:

  • Only supported for ‘=’ join.
  • The join keys don’t need to be sortable(this will make sense below).
  • Supported for all join types except full outer joins.
  • In my opinion, it’s an expensive join in a way that involves both shuffling and hashing(Hash Join as explained above). Maintaining a hash table requires memory and computation.

Shuffle sort-merge join

Let’s first understand Sort-Merge Join

Sort Merge Join

Sort join involves, first sorting the relations based on join keys and then merging both the datasets(think of merge step of merge sort).

Now, let’s understand shuffle sort-merge join strategy in spark:

Shuffle Sort Merge Join

Shuffle sort-merge join involves, shuffling of data to get the same join_key with the same worker, and then performing sort-merge join operation at the partition level in the worker nodes.

Things to Note:

  • Since spark 2.3, this is the default join strategy in spark and can be disabled with spark.sql.join.preferSortMergeJoin.
  • Only supported for ‘=’ join.
  • The join keys need to be sortable(obviously).
  • Supported for all join types.

Cartesian Join

In this strategy, the cartesian product(similar to SQL) of the two relations is calculated to evaluate join.

Broadcast nested loop join

Think of this as a nested loop comparison of both the relations:

for record_1 in relation_1:
for record_2 in relation_2:
# join condition is executed

As you can see, this can be a very slow strategy. This is generally, a fallback option when no other join type can be applied. Spark handles this using BroadcastNestedLoopJoinExeoperator that broadcasts the appropriate side of the query, so you can think that at least some chunk of results will be broadcasted to improve performance.

Things to note:

  • Supports both ‘=’ and non-equi-joins(‘≤=’, ‘<’ etc.).
  • Supports all the join types

How spark selects join strategy?

Taken directly from spark code, let’s see how spark decides on join strategy.

If it is an ‘=’ join:

Look at the join hints, in the following order:
1. Broadcast Hint: Pick broadcast hash join if the join type is supported.
2. Sort merge hint: Pick sort-merge join if join keys are sortable.
3. shuffle hash hint: Pick shuffle hash join if the join type is supported.
4. shuffle replicate NL hint: pick cartesian product if join type is inner like.

If there is no hint or the hints are not applicable
1. Pick broadcast hash join if one side is small enough to broadcast, and the join type is supported.
2. Pick shuffle hash join if one side is small enough to build the local hash map, and is much smaller than the other side, and spark.sql.join.preferSortMergeJoin is false.
3. Pick sort-merge join if join keys are sortable.
4. Pick cartesian product if join type is inner .
5. Pick broadcast nested loop join as the final solution. It may OOM but there is no other choice.

If it’s not ‘=’ join:

Look at the join hints, in the following order:
1. broadcast hint: pick broadcast nested loop join.
2. shuffle replicate NL hint: pick cartesian product if join type is inner like.

If there is no hint or the hints are not applicable
1. Pick broadcast nested loop join if one side is small enough to broadcast.
2. Pick cartesian product if join type is inner like.
3. Pick broadcast nested loop join as the final solution. It may OOM but we don’t have any other choice.

Comments

Popular posts from this blog

Apache Spark Discretized Streams (DStreams) with Pyspark

Apache Spark Discretized Streams (DStreams) with Pyspark SPARK STREAMING What is Streaming ? Try to imagine this; in every single second , nearly 9,000 tweets are sent , 1000 photos are uploaded on instagram, over 2,000,000 emails are sent and again nearly 80,000 searches are performed according to Internet Live Stats. So many data is generated without stopping from many sources and sent to another sources simultaneously in small packages. Many applications also generate consistently-updated data like sensors used in robotics, vehicles and many other industrial and electronical devices stream data for monitoring the progress and the performance. That’s why great numbers of generated data in every second have to be processed and analyzed rapidly in real time which means “ Streaming ”. DStreams Spark DStream (Discretized Stream) is the basic concept of Spark Streaming. DStream is a continuous stream of data.The data stream receives input from different kind of sources like Kafka, Kinesis...

6 Rules of Thumb for MongoDB Schema Design

“I have lots of experience with SQL and normalized databases, but I’m just a beginner with MongoDB. How do I model a one-to-N relationship?” This is one of the more common questions I get from users attending MongoDB office hours. I don’t have a short answer to this question, because there isn’t just one way, there’s a whole rainbow’s worth of ways. MongoDB has a rich and nuanced vocabulary for expressing what, in SQL, gets flattened into the term “One-to-N.” Let me take you on a tour of your choices in modeling One-to-N relationships. There’s so much to talk about here, In this post, I’ll talk about the three basic ways to model One-to-N relationships. I’ll also cover more sophisticated schema designs, including denormalization and two-way referencing. And I’ll review the entire rainbow of choices, and give you some suggestions for choosing among the thousands (really, thousands) of choices that you may consider when modeling a single One-to-N relationship. Jump the end of the post ...

Khác nhau giữa các chế độ triển khai giữa Local, Standalone và YARN trong Spark

Trong Apache Spark, có ba chế độ triển khai chính: Local, Standalone và YARN. Dưới đây là sự khác biệt giữa chúng: Chế độ triển khai Local: Chế độ triển khai Local là chế độ đơn giản nhất và được sử dụng cho môi trường phát triển và kiểm thử. Khi chạy trong chế độ Local, Spark sẽ chạy trên một máy tính duy nhất bằng cách sử dụng tất cả các luồng CPU có sẵn trên máy đó. Đây là chế độ phù hợp cho các tác vụ nhỏ và không yêu cầu phân tán dữ liệu. Chế độ triển khai Standalone: Chế độ triển khai Standalone cho phép bạn triển khai một cụm Spark độc lập bao gồm nhiều máy tính. Trong chế độ này, một máy tính được chọn làm "Spark Master" và các máy tính khác được kết nối với Spark Master như là "Spark Workers". Spark Master quản lý việc phân phối công việc và quản lý tài nguyên giữa các Spark Workers. Chế độ Standalone phù hợp cho triển khai Spark trên các cụm máy tính riêng lẻ mà không có hệ thống quản lý cụm chuyên dụng. Chế độ triển khai YARN: YARN (Yet Another Resource N...