Skip to main content

Leader election and Sharding Practices at Wix microservices

Leader election and Sharding Practices at Wix microservices

Photo by Glen Carrie on Unsplash

Wix’s distributed system of 2000 clustered microservices is required to process billions of business events every day with very high speed in a highly concurrent fashion.

There is a need to balance the load between the various cluster nodes, such that no bottlenecks are created. For serving HTTP requests, this can be done by load balancers such as NGINX or Amazon’s ELB — this is out of scope for this article.

A service acting as a client may also require to load-balance its calls in certain cases. For example when it initializes an internal cache from data retrieved from a different service.

There are also many cases where events and actions have to be processed in an atomic manner such that the stored data remains valid. E.g. changing account balance or updating inventory.

In this blog post, we will explore various practices used by Wix microservices that ensure atomic operation for updating the state of some resource (e.g. a cache or a DB entry), thus keeping the data valid but without compromising on high throughput and low latency.

The following practices are divided by their operation “granularity”:

  1. Selecting a single leader service node to run a task or a bunch of tasks
  2. Sharding the retrieval of a large dataset by multiple “leader” nodes
  3. Processing events sequentially for single domain entity by any random service node

1. Selecting a Leader for scheduling tasks using ZooKeeper

Motivation
There are many services at Wix that are required to perform scheduled tasks.

Let’s consider for example Contacts Importer Service that imports Wix Site Owners contacts from external locations such as gmail.

The Importer service DB accumulates many import jobs metadata that becomes stale and can be deleted or archived once the import process is completed. Otherwise the DB will grow bigger and have slower response times.

Scheduling a cron job
A periodic cleaning job needs to be scheduled that will perform the DB deletion operations.

Note that scheduling cron jobs by a clustered microservice comprising multiple nodes, requires that only one node will be in charge of the scheduling of the same task.

Otherwise, the cleaning task can potentially run more than once at the same time, causing unintended race condition errors, like ending up with incorrect import job state and it also puts more load on the DB.

Wix has a Cron Scheduler service called Cronulla that makes sure that jobs are scheduled in just one of the client service nodes. It accepts requests to schedule a REST call to the client service with some cron expression string.

e.g.: "0 7 * * * *"
This cron expression means run once every hour on the 7th minute.

Zookeeper and Curator
In order to make sure the requested job is only sent once to Contacts Importer service every hour, Cronulla enlists the help of Apache Zookeeper. Zookeeper is a centralized service used to coordinate distributed systems.

Cronulla uses Apache Curator library, which is a high level, robust zookeeper client. It offers built-in recipes, including shared counters and locks. The relevant recipe for Cronulla’s case is leader election.

Following are steps to take In order to configure Curator to execute some task on a single leader:

  1. First the Curator client is built, including the zookeeper connection string.

2. Then a LeaderSelector, (which is the leader election recipe abstraction) is created. It is provided with the following parameters:

  • The Curator client itself.
  • The path to a unique Zookeeper ZNode representing this leadership group
  • A LeaderSelectorListenerAdapter which defines the action to take once this node becomes leader. More details on item 4.

3. The LeaderSelector is then set to autoRequeue() so that it puts itself back in the election pool after it has relinquished leadership.

4. The LeaderSelectorListenerAdapter defines a takeLeadership callback, where actions can be performed, as this node is now the leader. In our case the actions that are performed are scheduled cron tasks.

It is important to periodically check if this thread has been interrupted — an indicator that the leader needs to be relinquished and cron jobs should no longer be executed on this node.

For more information about Curator API and usage visit this detailed blog post.

Analysis
Zookeeper Server together with Curator Client provide a powerful and relatively simple way to coordinate distributed microservices. Especially for leader election and guarantee of atomic scheduled task processing.

In reality, Wix Cron scheduler service is more complex and also uses Apache Kafka and Greyhound — Wix Kafka client, in order to guarantee eventual task successful processing (using Consumer-side retries)

Having a single leader eliminates concurrency issues like race conditions and corrupted state healing, but on the other hand it introduces a single point of failure and limited possibility of scaling.

Comments

Popular posts from this blog

Apache Spark Discretized Streams (DStreams) with Pyspark

Apache Spark Discretized Streams (DStreams) with Pyspark SPARK STREAMING What is Streaming ? Try to imagine this; in every single second , nearly 9,000 tweets are sent , 1000 photos are uploaded on instagram, over 2,000,000 emails are sent and again nearly 80,000 searches are performed according to Internet Live Stats. So many data is generated without stopping from many sources and sent to another sources simultaneously in small packages. Many applications also generate consistently-updated data like sensors used in robotics, vehicles and many other industrial and electronical devices stream data for monitoring the progress and the performance. That’s why great numbers of generated data in every second have to be processed and analyzed rapidly in real time which means “ Streaming ”. DStreams Spark DStream (Discretized Stream) is the basic concept of Spark Streaming. DStream is a continuous stream of data.The data stream receives input from different kind of sources like Kafka, Kinesis

Reference Hadoop HDFS config Files

Trong Hadoop HDFS (Hadoop Distributed File System), có một số file cấu hình quan trọng để tùy chỉnh và điều chỉnh các thành phần của hệ thống. Dưới đây là một số file cấu hình quan trọng trong Hadoop HDFS và ý nghĩa của chúng: 1./ hdfs-site.xml : File này chứa cấu hình cho các thuộc tính liên quan đến HDFS. Đây là nơi bạn có thể thiết lập các cấu hình như kích thước block mặc định, số lượng bản sao dữ liệu, quyền truy cập, v.v. Điều chỉnh các giá trị trong file này có thể ảnh hưởng đến hiệu suất và tính sẵn sàng của HDFS. 2./ core-site.xml: File này chứa cấu hình cho các thuộc tính cơ bản của Hadoop. Nó bao gồm thông tin về tên miền Hadoop, địa chỉ máy chủ NameNode và các cài đặt liên quan đến mạng như cổng giao tiếp và giao thức. 3./ hdfs-default.xml : Đây là file mẫu chứa tất cả các thuộc tính có thể được cấu hình trong HDFS. File này cung cấp mô tả chi tiết và giá trị mặc định của mỗi thuộc tính. Nếu bạn muốn thay đổi một thuộc tính nào đó, bạn có thể sao chép nó vào hdfs-s

Khác nhau giữa các chế độ triển khai giữa Local, Standalone và YARN trong Spark

Trong Apache Spark, có ba chế độ triển khai chính: Local, Standalone và YARN. Dưới đây là sự khác biệt giữa chúng: Chế độ triển khai Local: Chế độ triển khai Local là chế độ đơn giản nhất và được sử dụng cho môi trường phát triển và kiểm thử. Khi chạy trong chế độ Local, Spark sẽ chạy trên một máy tính duy nhất bằng cách sử dụng tất cả các luồng CPU có sẵn trên máy đó. Đây là chế độ phù hợp cho các tác vụ nhỏ và không yêu cầu phân tán dữ liệu. Chế độ triển khai Standalone: Chế độ triển khai Standalone cho phép bạn triển khai một cụm Spark độc lập bao gồm nhiều máy tính. Trong chế độ này, một máy tính được chọn làm "Spark Master" và các máy tính khác được kết nối với Spark Master như là "Spark Workers". Spark Master quản lý việc phân phối công việc và quản lý tài nguyên giữa các Spark Workers. Chế độ Standalone phù hợp cho triển khai Spark trên các cụm máy tính riêng lẻ mà không có hệ thống quản lý cụm chuyên dụng. Chế độ triển khai YARN: YARN (Yet Another Resource N