Change Data Capture (CDC)

Description

Change Data Capture (CDC) allows ingesting data changes from tables in a relational database and publishing them to a sink. In a CDC pipeline, data changes in the source relational table are captured at the row-level operations (INSERT, DELETE, UPDATE) based on the table's PRIMARY KEY, and are pushed to the downstream sink in real-time. Currently, you can create a CDC pipeline with a source backed by a table in a POSTGRESQL Store.

How does a CDC Pipeline Work

Source Record Structure

DeltaStream uses Debezium to capture changes in the source relation table. This means when defining a CDC source, backed by a given table in a POSTGRESQL Store, you can use this JSON skeleton:

{
 "op": ...,
 "ts_ms": ...,
 "before": {...},
 "after": {...},
 "system": {...}
}
  • op: It describes the row-level operation that caused a change in the source. It can be c for CREATE, u for UPDATE, d for DELETE or r for READ.

  • ts_ms: It shows the time at which the change event is processed by the CDC pipeline, in the source.

  • before and after: They specify the state of the row, at which the change occurred, before and/or after the change, depending on the semantics of the operation.

  • system: It shows the metadata in the source about the operation. Here are some of its included fields:

    • db (String): The relational database name containing the source table.

    • schema (String): The relational schema name containing the source table.

    • table (String): The relational source table name.

    • lsn (BigInt): The log sequence number of the change.

Source Store Requirements

Debezium is using Postgresql's logical decoding slot for streaming changes from the given relational source table. When creating the CDC pipeline, the user can specify an existing slot's name to be used for the job. If not, DeltaStream creates a new replication slot. When creating the source POSTGRESQL Store, the provided username has to have enough privileges to use an existing replication slot or create a new one. For more information on replication slots in Postgresql check here. Moreover, the source POSTGRESQL store needs to use logical decoding with the write-ahead log (WAL). This means that wal_level should be set to logical in the source relational database.

Considerations

Storage TOAST

PostgreSQL uses a fixed page size (commonly 8kB), so rows with large field values are compressed and/or broken up into multiple physical rows. This technique is referred to as TOAST. See PostgreSQL documentation on the subject to learn more.

DeltaStream follows the Debezium default behavior when generating CDC events containing TOASTed fields. If the TOASTed field is unchanged, then the value in the CDC event will be replaced with __debezium_unavailable_value.

How to Create a CDC Pipeline

A CDC pipeline is created in two steps. First, the CDC source should be defined and then you define the sink and the query to write CDC changes into it.

Step 1. DDL for CDC source

Parameter NameDescription

postgresql.db.name

Name of the database in the POSTGRESQL Store containing the source table.

Required: Yes Type: String Valid values: See LIST ENTITIES

postgresql.schema.name

Name of the schema in the POSTGRESQL Store containing the source table.

Required: Yes Type: String Valid values: See LIST ENTITIES

postgresql.table.name

Name of the source table in the POSTGRESQL Store.

Required: Yes Type: String Valid values: See LIST ENTITIES

value.format

Format of the CDC record coming from the source.

Required: Yes Type: String Valid values: JSON

store

Name of the POSTGRESQL Store that hosts the relational table backing the CDC source.

Required: Yes Type: String Valid values: See LIST STORES

Step 2. CSAS for CDC sink Stream and Query

Sink Parameters

The CDC sink can write into an existing Entity in the sink Store or it creates a new Entity. Below parameters are used in the WITH clause for the sink in CSAS to pick the desired behavior:

Parameter NameDescription

topic

Name of the Streaming Entity that the data for the CDC sink is written into. If the Entity doesn't exist, an Entity with this name is created in the corresponding store. Required: No Default value: Lowercase sink name Type: String

topic.partitions

The number of partitions to use when creating the sink Entity, if applicable. If the Entity already exist, then this value must be equal to the number of partitions in the existing Entity.

Required: Yes, unless topic already exists Type: Integer Valid values: [1, ...]

topic.replicas

The number of replicas to use when creating the sink Entity, if applicable. If the Entity already exists, then this value must be equal to the number of replicas in the existing Entity.

Required: Yes, unless topic already exists Type: Integer Valid values: [1, ...]

Source Parameters

The source Stream in the CDC pipeline is capturing the changes from a relational table in the source POSTGRESQL Store. Here are the parameters that one can specify in the WITH clause for the source in CSAS:

Parameter NameDescription

postgresql.slot.name

Name of the existing Postgresql replication slot name to use for the CDC pipeline. Note that the username provided for the source POSTGRESQL Store should have the required privileges to use this slot. Required: No Default value: ds_cdc Type: String

postgresql.decoding.plugin.name

Name of the Postgres logical decoding plug-in on the source Store. Supported values are decoderbufs, pgoutput, wal2json, wal2json_rds, wal2json_streaming, and wal2json_rds_streaming. Required: No Default value: pgoutput Type: String

Example

Let's assume a POSTGRESQL Store is defined with the name pgstore and it has a table named pageviews under the schema named public in the report database. Here is how the data looks like in pageviews. Each row has 3 columns: viewtime, userid and pageid and it shows when a given page was visited by a specific user:

demodb.public/pgstore# PRINT ENTITY "public".pageviews;
+-----------------+-------------+-------------+
| viewtime        | userid      | pageid      |
+=================+=============+=============+
| 1694124853651   | User_4      | Page_29     |
+-----------------+-------------+-------------+
| 1694124856731   | User_1      | Page_59     |
+-----------------+-------------+-------------+
| 1694124857732   | User_1      | Page_63     |
+-----------------+-------------+-------------+
CREATE STREAM pageviews_cdc(
  op VARCHAR,
  ts_ms BIGINT,
  `before` STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>, 
  `after`  STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>, 
  `source` STRUCT<db VARCHAR, `schema` VARCHAR, `table` VARCHAR, `lsn` BIGINT>)
WITH (
  'store'='pgstore', 
  'value.format'='json',
  'postgresql.db.name'='report',
  'postgresql.schema.name'='public',
  'postgresql.table.name'='pageviews');

The pageviews_cdc Stream is used as the source in the CDC pipeline. Whenever an INSERT, DELETE, or UPDATE happens on pageviews in the relational database, a corresponding record will be generated in pageviews_cdc to capture the change. Now, we can use below CSAS statement to define a Stream as the sink and run a query to write those changes into it:

CREATE STREAM cdc_sink WITH (
  'topic.partitions' = 1,
  'topic.replicas' = 1) AS 
SELECT * FROM pageviews_cdc;

Note that in the above CSAS, we need to provide the topic properties as a new topic will be created in the sink Store. To use an already existing topic, such as cdc_logs, in the sink Store, we can replace topic properties with the topic name in the WITH clause:

CREATE STREAM cdc_sink WITH ('topic' = 'cdc_logs') AS 
SELECT * FROM pageviews_cdc;

As an example, assume a new record is added to pageviews showing that User_5 visited Page_94. Following this INSERT operation, we would see a record like below in cdc_sink published via the CDC pipeline we defined above:

{
 "op":"c",
 "ts_ms":1693430399726,
 "before":null,
 "after":{"viewtime":1693430399292,"userid":"User_5","pageid":"Page_94"},
 "source":{"db":"report","schema":"public","table":"pageviews","lsn":38990016}
}

Now imagine we are only interested in the DELETE events in the source and want to write the userid and lsn for each DELETE. We can use the query below to create such a CDC pipeline:

CREATE STREAM cdc_sink WITH ('topic' = 'cdc_logs') AS 
SELECT ts_ms AS event_ts,
       `before`->userid AS uid,
       `source`->`lsn` AS seq_num
FROM pageviews_cdc
WHERE op = 'd';

Last updated