Skip to main content

How to Implement Change Data Capture (CDC)/SCD using Spark & Hive — Big Data?

In databases, Change Data Capture (CDC) refers to a set of software design patterns used to determine and track the data that has changed so that action can be taken using the changed data. While there are many online implementations available, in this post, I’ll cover a Pyspark based implementation of Slowly Changing Dimension (type-4) where the underlying data is stored in Hive tables (Snapshot + History tables) and in Parquet format.

Overview

The main objective of this process is to compare incoming data with the existing data, and identify records in the the below mentioned fashion and then write them into history and snapshot tables.

Row Operations:

| Record Type | row_opern |
| ----------- | --------- |
|
No Change | N |
|
Updated | U |
|
Inserted | I |
|
Deleted | D |

Overall process is explained with the help of a flow diagram below:

Process Flow

Implementation

The implementation is based on the below mentioned basic principles utilizing the Pyspark library functions such as: MD5, concat_ws and some important data structures such as: List, Array, Set, etc.

  • Define:
    Key_List: As primary key for joining purposes when determining Updated(U) and No Change(N) records.
    Ignore_List: List of columns not to be considered for capturing changes.
    SCD_Cols: List of columns to be used for auditing, ex: rec_eff_dt, row_opern.
  • Calculate MD5 hash of incoming data and compare it against the MD5 hash of existing data to determine Updated(U) and No Change(N) records.
  • Perform left_anti (incoming, existing) and left_anti (existing, incoming) joins to identify Inserted(I) and Deleted(D) records.
  • Day-0 run: Mark all records as Inserted(I) in both the tables i.e., snapshot table and partitioned history table.
  • Day-n run: Union Inserted(I), Deleted (D) and Updated(U) records and write into day-n partition of history table. Union Inserted(I), No Change (N) and Updated(U) and write into the snapshot table.

Results

Day-0 Run

Incoming Data

Day-0 Incoming Data

History Table

Day-0 History Table

Snapshot Table

Day-0 Snapshot Table

Day-1 Run

Incoming Data

Day-1 Incoming Data

History Table

Day-1 History Table

Snapshot Table

Day-1 Snapshot Table

Source Code

The entire source code for the above implementation is available at Github: https://github.com/akashmehta10/cdc_pyspark_hive

Conclusion

While the above implementation has been validated to work fine over 50M+ records utilizing a big Spark cluster, further optimizations that can be performed are as follows:

  • Usage of broadcast joins can be implemented in case of smaller incoming data for efficiency.
  • Code can be enhanced to persist MD5 hash of the existing data to boost performance, although this may limit some functionality when adding more columns to the Ignore_List.
  • Code can be optimized to calculate MD5 hash only once instead of calculating at two places: get_no_change_records() and get_updated_records().

References



DUNGLT NOTE:

CDC (Change Data Capture) và SCD (Slowly Changing Dimensions) là hai khái niệm quan trọng trong lĩnh vực quản lý dữ liệu.

CDC là quá trình nhận biết và ghi lại các thay đổi dữ liệu xảy ra trong hệ thống thông tin. Nó theo dõi và ghi lại các thay đổi dữ liệu từ nguồn gốc của chúng, giúp duy trì các phiên bản lịch sử của dữ liệu. CDC thường được sử dụng trong các hệ thống quản lý dữ liệu thời gian thực, nơi các thay đổi dữ liệu cần được phát hiện và xử lý ngay lập tức.

SCD, là một khái niệm trong việc thiết kế cơ sở dữ liệu để quản lý các thay đổi trong các thuộc tính của một thực thể theo thời gian. Khi dữ liệu thay đổi, SCD cho phép lưu trữ và theo dõi lịch sử của các giá trị thuộc tính để có được một cái nhìn tổng thể về sự thay đổi dữ liệu theo thời gian. SCD thường được sử dụng trong việc phân tích dữ liệu và xây dựng các báo cáo lịch sử.

Tóm lại, CDC là quá trình ghi lại các thay đổi dữ liệu xảy ra trong hệ thống, trong khi SCD là một phương pháp thiết kế cơ sở dữ liệu để quản lý sự thay đổi dữ liệu theo thời gian. CDC tập trung vào việc ghi lại thay đổi dữ liệu, trong khi SCD tập trung vào việc theo dõi và lưu trữ lịch sử của các giá trị thuộc tính.

Comments

Popular posts from this blog

Apache Spark Discretized Streams (DStreams) with Pyspark

Apache Spark Discretized Streams (DStreams) with Pyspark SPARK STREAMING What is Streaming ? Try to imagine this; in every single second , nearly 9,000 tweets are sent , 1000 photos are uploaded on instagram, over 2,000,000 emails are sent and again nearly 80,000 searches are performed according to Internet Live Stats. So many data is generated without stopping from many sources and sent to another sources simultaneously in small packages. Many applications also generate consistently-updated data like sensors used in robotics, vehicles and many other industrial and electronical devices stream data for monitoring the progress and the performance. That’s why great numbers of generated data in every second have to be processed and analyzed rapidly in real time which means “ Streaming ”. DStreams Spark DStream (Discretized Stream) is the basic concept of Spark Streaming. DStream is a continuous stream of data.The data stream receives input from different kind of sources like Kafka, Kinesis...

6 Rules of Thumb for MongoDB Schema Design

“I have lots of experience with SQL and normalized databases, but I’m just a beginner with MongoDB. How do I model a one-to-N relationship?” This is one of the more common questions I get from users attending MongoDB office hours. I don’t have a short answer to this question, because there isn’t just one way, there’s a whole rainbow’s worth of ways. MongoDB has a rich and nuanced vocabulary for expressing what, in SQL, gets flattened into the term “One-to-N.” Let me take you on a tour of your choices in modeling One-to-N relationships. There’s so much to talk about here, In this post, I’ll talk about the three basic ways to model One-to-N relationships. I’ll also cover more sophisticated schema designs, including denormalization and two-way referencing. And I’ll review the entire rainbow of choices, and give you some suggestions for choosing among the thousands (really, thousands) of choices that you may consider when modeling a single One-to-N relationship. Jump the end of the post ...

Khác nhau giữa các chế độ triển khai giữa Local, Standalone và YARN trong Spark

Trong Apache Spark, có ba chế độ triển khai chính: Local, Standalone và YARN. Dưới đây là sự khác biệt giữa chúng: Chế độ triển khai Local: Chế độ triển khai Local là chế độ đơn giản nhất và được sử dụng cho môi trường phát triển và kiểm thử. Khi chạy trong chế độ Local, Spark sẽ chạy trên một máy tính duy nhất bằng cách sử dụng tất cả các luồng CPU có sẵn trên máy đó. Đây là chế độ phù hợp cho các tác vụ nhỏ và không yêu cầu phân tán dữ liệu. Chế độ triển khai Standalone: Chế độ triển khai Standalone cho phép bạn triển khai một cụm Spark độc lập bao gồm nhiều máy tính. Trong chế độ này, một máy tính được chọn làm "Spark Master" và các máy tính khác được kết nối với Spark Master như là "Spark Workers". Spark Master quản lý việc phân phối công việc và quản lý tài nguyên giữa các Spark Workers. Chế độ Standalone phù hợp cho triển khai Spark trên các cụm máy tính riêng lẻ mà không có hệ thống quản lý cụm chuyên dụng. Chế độ triển khai YARN: YARN (Yet Another Resource N...