In databases, Change Data Capture (CDC) refers to a set of software design patterns used to determine and track the data that has changed so that action can be taken using the changed data. While there are many online implementations available, in this post, I’ll cover a Pyspark based implementation of Slowly Changing Dimension (type-4) where the underlying data is stored in Hive tables (Snapshot + History tables) and in Parquet format.
Overview
The main objective of this process is to compare incoming data with the existing data, and identify records in the the below mentioned fashion and then write them into history and snapshot tables.
Row Operations:
| Record Type | row_opern |
| ----------- | --------- |
| No Change | N |
| Updated | U |
| Inserted | I |
| Deleted | D |
Overall process is explained with the help of a flow diagram below:

Implementation
The implementation is based on the below mentioned basic principles utilizing the Pyspark library functions such as: MD5, concat_ws and some important data structures such as: List, Array, Set, etc.
- Define:
Key_List: As primary key for joining purposes when determining Updated(U) and No Change(N) records.
Ignore_List: List of columns not to be considered for capturing changes.
SCD_Cols: List of columns to be used for auditing, ex: rec_eff_dt, row_opern. - Calculate MD5 hash of incoming data and compare it against the MD5 hash of existing data to determine Updated(U) and No Change(N) records.
- Perform left_anti (incoming, existing) and left_anti (existing, incoming) joins to identify Inserted(I) and Deleted(D) records.
- Day-0 run: Mark all records as Inserted(I) in both the tables i.e., snapshot table and partitioned history table.
- Day-n run: Union Inserted(I), Deleted (D) and Updated(U) records and write into day-n partition of history table. Union Inserted(I), No Change (N) and Updated(U) and write into the snapshot table.
Results
Day-0 Run
Incoming Data

History Table

Snapshot Table

Day-1 Run
Incoming Data

History Table

Snapshot Table

Source Code
The entire source code for the above implementation is available at Github: https://github.com/akashmehta10/cdc_pyspark_hive
Conclusion
While the above implementation has been validated to work fine over 50M+ records utilizing a big Spark cluster, further optimizations that can be performed are as follows:
- Usage of broadcast joins can be implemented in case of smaller incoming data for efficiency.
- Code can be enhanced to persist MD5 hash of the existing data to boost performance, although this may limit some functionality when adding more columns to the Ignore_List.
- Code can be optimized to calculate MD5 hash only once instead of calculating at two places: get_no_change_records() and get_updated_records().
References
- https://cloudywithachanceofbigdata.com/change-data-capture-at-scale-using-spark/
- https://towardsdatascience.com/slowly-changing-dimension-type-2-in-spark-7d5865ac915b
- https://en.wikipedia.org/wiki/Slowly_changing_dimension
- https://en.wikipedia.org/wiki/Change_data_capture
CDC (Change Data Capture) và SCD (Slowly Changing Dimensions) là hai khái niệm quan trọng trong lĩnh vực quản lý dữ liệu.
CDC là quá trình nhận biết và ghi lại các thay đổi dữ liệu xảy ra trong hệ thống thông tin. Nó theo dõi và ghi lại các thay đổi dữ liệu từ nguồn gốc của chúng, giúp duy trì các phiên bản lịch sử của dữ liệu. CDC thường được sử dụng trong các hệ thống quản lý dữ liệu thời gian thực, nơi các thay đổi dữ liệu cần được phát hiện và xử lý ngay lập tức.
SCD, là một khái niệm trong việc thiết kế cơ sở dữ liệu để quản lý các thay đổi trong các thuộc tính của một thực thể theo thời gian. Khi dữ liệu thay đổi, SCD cho phép lưu trữ và theo dõi lịch sử của các giá trị thuộc tính để có được một cái nhìn tổng thể về sự thay đổi dữ liệu theo thời gian. SCD thường được sử dụng trong việc phân tích dữ liệu và xây dựng các báo cáo lịch sử.
Tóm lại, CDC là quá trình ghi lại các thay đổi dữ liệu xảy ra trong hệ thống, trong khi SCD là một phương pháp thiết kế cơ sở dữ liệu để quản lý sự thay đổi dữ liệu theo thời gian. CDC tập trung vào việc ghi lại thay đổi dữ liệu, trong khi SCD tập trung vào việc theo dõi và lưu trữ lịch sử của các giá trị thuộc tính.
Comments
Post a Comment