Change data capture

nipun Agarwal
8 min readApr 22, 2020

CDC using hive

Introduction

With the event driven architecture and the need to get the data from transactional and RDBMS systems in data lake, a lot of tools and technologies have been build across them to make this happen. I know, with the platforms like Delta lake from Databricks and HUDI from Uber, things are made more simple for the data engineers to bring in the changes from these systems. I would like to share my design before these systems actually came into production. I know I am a bit tad late in writing this blog, but this will give you an overall picture and understanding of what all challenges were faced while building these systems and how are they averted with some limitations and design considerations.

I will try to explain each component taking a dummy table example. The design has some limitations which I will talk about and how that can be averted.

Change data capture in data lake

Let me first explain how things are moving from left to right and then I will go into the details of each one of them.

  • Change data (update/delete/add) is captured using Debezium which is a kafka connect.
  • Related schema of the tables are also captured in the schema registry using the same tool.
  • Kafka sink connectors send the changes incrementally to the data store ( S3 ) as incremental files in parquet format
  • Hive on EMR runs compaction job on top of these incremental files and merges the changes to the base table file after regular intervals
  • Schema extractor is responsible for creating tables in hive using the schema from schema registry

Ingestion phase

Data Ingestion

Debezium is an open source tool to capture change stream from databases like mysql, postgres etc and sends it to kafka. Here is how the data is being ingested

  • Debezium reads the transaction logs in real time to capture the changes and sends it to kafka topics. Filters can be present in the debezium properties file as to which tables should be tracked.
  • Every table resembles a topic in kafka where the changes are pushed
  • Schema is pushed to the schema registry with proper versioning.
  • Schema is present in popular AVRO format.
  • Full table snapshot is pushed to the kafka at the start of the debezium connect and after that only changes are pushed.

Change Data storage phase

Change data is captured in kafka and pushed to S3

Data continuously flows in kafka topics which is consumed by the kafka sink connectors.

  • Sink connectors are configured to save the data after every interval ( 10 min ) to S3.
  • The incremental files are saved as parquet format. Parquet is a columnar format and gives very good query times when querying on few columns.
  • No base file is present at the beginning of the whole thing and is build subsequently by the compaction process ( which I will explain later ), which is also in parquet.

Data Processing Phase

Hive compaction and query processor

Here comes the beast, hive tables and compaction to get the latest changes in the file. I will explain later with a dummy table, before that let me list down the steps as to what all things are happening here

  • External table is created on top of table directory (base + incremental files) in S3. The table is created using the Schema extractor component which extracts the table schema in AVRO format from the schema registry and converts it into the Create table statement in Hive.
  • View is created on the same table which dedupe rows with same id using the latest timestamp ( I will explain with an example later )
  • Compaction is run hourly which removes incremental files and dedupe rows with same id ( primary key or row identification key ) on timestamps and soft deletes rows which are actually deleted in the table.

Let us take a simple Student table and see how the whole flow ( as explained above ) will bring the table in S3 with incremental updates.

Student table with id and name as fields

Debezium Properties file

check this to get more info about the properties file

  • This is a simple Student table with `id` as primary key and full_name as name of the student.
  • Changes of the table are pushed in the topic defined by debezium properties file.
  • The op field in the debezium payload represents c : create, u : update or d : delete event. We will use this field in the parquet to represent whether a row was deleted, updated or created.
  • ts_ms field in the payload is the timestamp and use this as the timestamp field in your parquet table. This timestamp will be used to take the most latest record in case of updates. You can also use kafka_offset here, which is added by kafka once the payload is pushed in kafka.
  • My final parquet file will consists of id, full_name, op and ts_ms as the columns.
  • Incremental files are dumped in hourly folders inside the parent directory where base directory is also present
.
|_student
|_base_dir
|_base_file
|_incremental
|_increment_YYYYMMDDHH
|_increment_YYYYMMDDHH

Compaction

credit : this defines a 4 step process for incremental updates which I have followed

Once the incremental files start flowing in, in the S3 storage, we will schedule our hourly compaction job on it. Remember compaction is an expensive process and the bigger the size of the table, more time it takes to compact.

  • Create external table in hive student pointing to the location where both base table and incremental files are present i.e the student folder
Create external table student (
id int,
full_name string,
op char,
timestamp int)
stored as parquet
location 's3://.....'
  • Create an external hive table student_base table pointing to only the base directory
  • Create an external hive table student_incremental_yymmddhh table pointing to the incremental directory for that particular hour
  • Create an external hive table student_compactor table pointing to a temp directory. Delete any temp directory of previous runs.
  • Create a compaction query using the student_base and student_incremental_yymmddhh tables created earlier. What this query will do is, use the latest row based on timestamp from group of rows with same id
select
*
from
(
select
*,
row_number() over (partition by id
order by
timestamp desc ) row_number
from
(
select
*
from
student_base
union
select
*
from
student_incremental_yymmddhh A
)
B
where
row_number = 1
  • Do an insert overwrite in the student_compactor table using the query above.
  • Copy the file created by the student_compactor table into the base table directory student_base
  • Delete the student_incremental_yymmddhh files or you can archive these files at some place in S3 for historical purposes or recreating the table if anything goes wrong.

In order to query the student table, we will create a view student_view which will be doing dedup at run_time when the queries are fired. You can use the same above compactor query to create a view using student table. So compaction will reduce the number of files to be scanned when a query is fired on that table.

Downsides

  • As the size of the base table grows, the compaction time will also increase. To circumvent this, base table can be partitioned based on range (id) and then compaction on individual partitions.
  • You have to take care of exceptions at every step in your program. Copying, deleting, compaction can all raise exceptions and proper exception handling should be done. Retrying logic should be placed incase of errors or exceptions
  • S3 eventual consistency should be kept in mind after copying, deleting or any other activity on S3, as reads immediately after writes may not give latest results.
  • An extra view needs to be created on top of every table to get the deduped view of the table.
  • Incremental changes are pushed after 10 minutes. You can change the frequency which will change the file counts and hence change the query time. More the files, more the query time.
  • This will be done for every table that you need to replicate in data lake. With the number of table to be present in storage grows, compaction job on hive grows.

A quick note, the solution is not limited to hive but you can use spark or other processing engines. This is a just one way to get the changes of tables in the lake

Edit 1

Schema Evolution

One important aspect I missed is schema changes and affect of that in the above design

  • Will the above design fail if there is a change in the schema in the source system?
  • What will happen if a new column gets added in the table?
  • What will happen if a column is deleted in the table ( rarely happens in a production systems )?
  • What happens if a column type is changed ( very rare.. should not happen in a production system. This can break a lot of things, will not talk about this )

In case of a change in schema, downstream systems should be informed about the change. In this design, a change in the schema will eventually push a new version of schema in the schema registry and hence can be a triggering point to notify that there is a change.

The above design will not fail incase columns are added or removed as it is a parquet file and will not just take them into considerations and process as is. For newly added columns compaction will not use those columns and hence will not appear in the final parquet and for deleted columns, will update it with nulls in the final parquet.

There are 2 ways to handle this in production

  • If there is a change, inform the downstream systems and the engineers update their process to include it in further processing
  • Automatically digest the schema change in the downstream systems

Using the example above, I will try to explain how you can do it, without any manual intervention.

Compaction — ReVisited

Incremental parquet files written by kafka connect in S3 will contain the schema changes without any modification to the properties file as they are independent of that. It is in the compaction phase where schema evolution needs to be considered.

With the hourly compaction the tables can be created using the schema from the schema registry and hence will include/exclude the columns added/deleted in the changes respectively. These are the tables created at every compaction job student_base , student_incremental_yymmddhh and student_compactor. By just creating these tables before every compaction job using the latest schema will make sure we integrate the schema changes in the table compaction.

The table which is not created every time is the student table which can be altered once the trigger is received from the registry incase of change in schema.

I have tried my best to explain the process on getting the changes to lake. Should look at delta lakes and HUDI which provides data ingestion at scale and real time.

Hope you liked it. Lets connect on Linkedin

--

--