Change Data Capture (CDC)
Description
Change Data Capture (CDC) enables you to ingest data changes from tables in a relational database and publish them to a sink. In a CDC pipeline, data changes in the source relational table are captured at the row-level operations (INSERT
, DELETE
, UPDATE
) based on the table's PRIMARY KEY
. Then they're pushed to the downstream sink in real-time.
Currently, you can create a CDC pipeline with a source backed by one or more tables in a PostgreSQL Data Store.
How a CDC Pipeline Works
Source Record Structure
DeltaStream uses Debezium to capture changes in the source relation table. This means when you define a CDC source backed by a given table in a POSTGRESQL store, you can use this JSON skeleton:
{
"op": ...,
"ts_ms": ...,
"before": {...},
"after": {...},
"system": {...}
}
op: Describes the row-level operation that caused a change in the source. It can be
c
forCREATE
,u
forUPDATE
,d
forDELETE
orr
forREAD
.ts_ms: Shows the time at which the change event is processed by the CDC pipeline, in the source.
before and after: These specify the state of the row at which the change occurred, before and/or after the change, depending on the semantics of the operation.
system: Shows the metadata in the source about the operation. Some of its included fields are:
db (String): The relational database name containing the source table.
schema (String): The relational schema name containing the source table.
table (String): The relational source table name.
lsn (BigInt): The log sequence number of the change.
Requirements for a PostgreSQL Source Store
Debezium uses PostgreSQL's logical decoding slot for streaming changes from the given relational source table.
When you create the CDC pipeline, you need to specify a name for the replication slot. If a slot already exists with that name, the system uses the CDC job. If there's no pre-existing slot, the system creates a new replication slot and uses the slot name for the job. See below for more details.
When you create the source PostgreSQL Data Store:
the username you provide must have enough privileges to use an existing replication slot or create a new one.
the source PostgreSQL store must use logical decoding with the write-ahead log (WAL). This means you should set
wal_level
tological
in the source relational database. (For more information, see PostgreSQL ALTER SYSTEM docs.)
Working with PostgreSQL replication slots
Each DeltaStream query that reads rows from a PostgreSQL source for a CDC pipeline must specify a replication slot. Replication slots in PostgreSQL represent a stream of changes that can be replayed to a client in the order in which the changes were made on the original server. Each slot has the following characteristics:
uniquely named across all databases in a PostgreSQL instance
persists independently from the clients using it
crash-proof
contains its own independent state
You cannot assign a single replication slot in DeltaStream to multiple simultaneously-running queries. By default, PostgreSQL instances limit the number of replication slots to 10.
To list, create, or drop replication slots In the PostgreSQL CLI psql
, use the following queries:
// Check list of existing replication slots
SELECT * FROM pg_replication_slots;
// Create a new logical replication slot
SELECT pg_create_logical_replication_slot('ds_cdc', 'pgoutput');
// Delete an existing replication slot
// Note: For logical slots, this must be called while connected to the same database the slot was created on.
SELECT pg_drop_replication_slot('ds_cdc');
Considerations
Storage TOAST
PostgreSQL uses a fixed page size (commonly 8 kb), so rows with large field values are compressed and/or broken up into multiple physical rows. This is the TOAST technique. See PostgreSQL documentation on the subject to learn more.
DeltaStream follows the Debezium default behavior when generating CDC events containing TOASTed fields. If the TOASTed field is unchanged, DeltaStream replaces the value in the CDC event with __debezium_unavailable_value
.
How to Create a CDC Pipeline
You create a CDC pipeline in 2 steps:
Define the CDC source
Define the sink and the query to write CDC changes into it
Step 1. Define DDL for the CDC source
Using CREATE STREAM, define a Stream backed by the source relational table. The parameters below are used as CDC parameters in the WITH
clause in CREATE STREAM:
postgresql.db.name
Name of the database in the POSTGRESQL store containing the source table.
Required: No Type: String Valid values: See LIST ENTITIES
postgresql.schema.name
Name of the schema in the POSTGRESQL store containing the source table.
Required: No Type: String Valid values: See LIST ENTITIES
postgresql.table.name
Name of the source table in the POSTGRESQL store.
Required: No Type: String Valid values: See LIST ENTITIES
postgresql.cdc.table.list
Comma-separated list of fully qualified source tables to capture using CDC. Each entry must be specified as schema_name.table_name
. If a table name contains special characters (such as a dot) or uppercase letters, enclose it in double quotes — for example, public."orders.v1"
. Regex patterns are supported to match multiple tables dynamically. (For example, public.orders_.*
matches all tables in the public
schema with names starting with orders_
).
Required: No Type: String Valid values: See LIST ENTITIES
value.format
Format of the CDC record coming from the source.
Required: Yes
Type: String
Valid values: JSON
store
Name of the POSTGRESQL store that hosts the relational table backing the CDC source.
Required: Yes Type: String Valid values: See LIST STORES
Step 2. Define the CREATE STREAM AS SELECT (CSAS) for CDC Sink Stream and Query
Using CREATE STREAM AS SELECT, define a Stream backed by the sink Streaming Entity and the query to insert CDC records into it.
Sink parameters
The CDC sink can write into an existing entity in the sink store or create a new entity. The below parameters are used in the WITH clause for the sink in CSAS to create the desired behavior:
topic
Name of the Streaming Entity into which the data for the CDC sink is written. If the entity doesn't exist, the system creates an entity with this name in the corresponding store
.
Required: No
Default value: Lowercase sink name
Type: String
topic.partitions
The number of partitions to use when creating the sink entity, if applicable. If the entity already exists, this value must equal the number of partitions in the existing entity.
Required: Yes, unless topic already exists Type: Integer Valid values: [1, ...]
topic.replicas
The number of replicas to use when creating the sink entity, if applicable. If the entity already exists, this value must equal the number of replicas in the existing entity.
Required: Yes, unless topic already exists Type: Integer Valid values: [1, ...]
Source parameters
The source stream in the CDC pipeline captures the changes from a relational table in the source POSTGRESQL Data Store. Here are the parameters you can specify in the WITH clause for the source in CSAS:
postgresql.slot.name
Name of the PostgreSQL replication slot to use for the CDC pipeline. Note that the username you provide for the source PostgreSQL store should have the required privileges to use this slot. Required: Yes Type: String
postgresql.decoding.plugin.name
Name of the Postgres logical decoding plug-in on the source store. Supported values are decoderbufs
, pgoutput
, wal2json
, wal2json_rds
, wal2json_streaming
, and wal2json_rds_streaming.
Required: No
Default value: pgoutput
Type: String
Example
Create CDC pipeline capturing changes from a Postgres table
Assume a POSTGRESQL Data Store is defined with the name pgstore
and it has a table named pageviews
under the schema named public
in the report
database. Here is how the data displays in pageviews
. Each row has 3 columns — viewtime
, userid
, and pageid
— and shows when a given page was visited by a specific user:
demodb.public/pgstore# PRINT ENTITY "public".pageviews;
+-----------------+-------------+-------------+
| viewtime | userid | pageid |
+=================+=============+=============+
| 1694124853651 | User_4 | Page_29 |
+-----------------+-------------+-------------+
| 1694124856731 | User_1 | Page_59 |
+-----------------+-------------+-------------+
| 1694124857732 | User_1 | Page_63 |
+-----------------+-------------+-------------+
Use the DDL statement below to create a Stream named pageviews_cdc,
backed by the pageviews
table to capture its CDC records:
CREATE STREAM pageviews_cdc(
op VARCHAR,
ts_ms BIGINT,
`before` STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>,
`after` STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>,
`source` STRUCT<db VARCHAR, `schema` VARCHAR, `table` VARCHAR, `lsn` BIGINT>)
WITH (
'store'='pgstore',
'value.format'='json',
'postgresql.db.name'='report',
'postgresql.schema.name'='public',
'postgresql.table.name'='pageviews');
The pageviews_cdc
stream is used as the source in the CDC pipeline. Whenever an INSERT
, DELETE
, or UPDATE
happens on pageviews
in the relational database, a corresponding record is generated in pageviews_cdc
to capture the change. Now use the below CSAS statement to define a stream as the sink and run a query to write those changes into it:
CREATE STREAM cdc_sink WITH (
'topic.partitions' = 1,
'topic.replicas' = 1) AS
SELECT * FROM pageviews_cdc WITH ('postgresql.slot.name' = 'slot001');
Note that in the above CSAS you must provide the topic properties, as a new topic is created in the sink store. To use a pre-existing topic such as cdc_logs
in the sink store, you can replace topic properties with the topic name in the WITH
clause:
CREATE STREAM cdc_sink WITH ('topic' = 'cdc_logs') AS
SELECT * FROM pageviews_cdc WITH ('postgresql.slot.name' = 'slot002');
As an example, assume a new record is added to pageviews
showing that User_5
visited Page_94
. Following this INSERT
operation, you see a record similar to the below in cdc_sink
published via the CDC pipeline defined above:
{
"op":"c",
"ts_ms":1693430399726,
"before":null,
"after":{"viewtime":1693430399292,"userid":"User_5","pageid":"Page_94"},
"source":{"db":"report","schema":"public","table":"pageviews","lsn":38990016}
}
Now imagine you're only interested in the DELETE
events in the source and wish to write the userid
and lsn
for each DELETE
. You can use the query below to create such a CDC pipeline:
CREATE STREAM cdc_sink WITH ('topic' = 'cdc_logs') AS
SELECT ts_ms AS event_ts,
`before`->userid AS uid,
`source`->`lsn` AS seq_num
FROM pageviews_cdc WITH ('postgresql.slot.name' = 'slot003')
WHERE op = 'd';
Create CDC pipeline capturing changes from multiple tables
Now consider a scenario similar to the previous example, except that besides the pageviews
table there is another table called users
under the staff
schema within the report
database in the pgstore
store.
Suppose you want to create a CDC pipeline to capture changes for both tables using a single replication slot. First, define a stream as the CDC source:
CREATE STREAM cdc_source(
op VARCHAR,
ts_ms BIGINT,
`before` BYTES,
`after` BYTES,
`source` STRUCT<`schema` VARCHAR, `table` VARCHAR>)
WITH (
'store'='pgstore',
'value.format'='json',
'postgresql.cdc.table.list'='public.pageviews,staff.users');
The cdc_source
stream acts as the CDC source in your pipeline. Whenever an INSERT
, DELETE
, or UPDATE
happens on either the pageviews
or users
table in the source database, a corresponding change event is captured and written into this stream.
Now, use the following CSAS statement to define a downstream stream as the sink and run a query to write these changes into it. Each output record includes the operation type and timestamp (op
and ts_ms
), along with the source schema and table name, which indicate where the changes came from.
You can use these fields to filter or route changes for each table in subsequent queries that read from cdc_sink
.
CREATE STREAM cdc_sink WITH (
'topic.partitions' = 1,
'topic.replicas' = 1) AS
SELECT
`source`->`schema` AS src_schema,
`source`->`table` AS src_table,
`before`,
`after`,
`op`,
`ts_ms`
FROM cdc_source WITH ('postgresql.slot.name' = 'slot004');
Last updated