CDC Pipeline Using a Single Slot
Debezium can capture change data (CDC) from multiple tables within the same PostgreSQL database using just one replication slot. This means it doesn’t need to create a separate slot for each table - it streams all table changes through a single slot, which reduces overhead and makes replication more efficient.
Instead of maintaining one replication slot per table, all relevant changes are multiplexed through a single slot, minimizing replication slot overhead, simplifying resource management, and ensuring consistent log consumption across the subscribed tables.
Advantages
Reduced Overhead – One slot serves many tables, eliminating the need to manage multiple slots and lowering the risk of hitting PostgreSQL’s replication slot limits.
Simplified Resource Management – Easier monitoring, configuration, and slot lifecycle handling.
Consistent WAL Stream – Ensures changes across multiple tables are captured in a single ordered stream, which can help with cross-table consistency in downstream processing.
Trade-offs & Operational Considerations
Slot Bloat / WAL Retention
PostgreSQL retains WAL segments required by active slots until they are consumed.
If Debezium lags, WAL segments can accumulate, consuming disk space.
Mitigation: monitor consumer lag, configure alerting, and size storage appropriately.
Single Point of Bottleneck
Since one slot handles all tables, a slowdown in Debezium processing (or a paused connector) can stall WAL replay for all tables.
Mitigation: deploy multiple connectors (with separate slots) if isolating workloads is necessary.
Failover Handling
Replication slots are bound to a specific PostgreSQL instance. In a failover scenario, slots do not automatically transfer to a standby.
Mitigation: use tooling (e.g., pglogical, Patroni, or custom failover slot synchronization) to recreate slots on the new primary.
Throughput vs. Complexity
A single slot simplifies architecture, but for very high-volume environments, splitting workloads across multiple slots/connectors may improve throughput and fault isolation.Pipeline creation in DeltaStream
Pipeline creation in DeltaStream
Example Scenario
Create a CDC pipeline that captures changes from two PostgreSQL tables—public.customers
and test.users
—and writes each table’s changes into a corresponding Snowflake table.
Step 1
Run DDL to define a single CDC source to read changes that Debezium captured for source tables.
When capturing changes from multiple tables with different schemas, the before
and after
fields, representing the state of a row before and after a change, are defined as BYTES
. This choice allows flexibility, as using a fixed schema would not accommodate the variety in column names and data types across different source tables. The BYTES
data type enables a generic representation of these changes until they are transformed and written into a destination with defined schemas.
CREATE STREAM cdc_raw_source (
op VARCHAR,
ts_ms BIGINT,
`before` BYTES,
`after` BYTES,
`source` STRUCT<schema VARCHAR, table VARCHAR>
) WITH (
'store'='ec2pg',
'value.format'='json',
'postgresql.cdc.table.list'='public.customers,test.users');
When capturing changes from multiple tables, set the postgresql.cdc.table.list
source property to a comma-separated list of fully qualified table names in Postgres in the form <schema>.<table>.
Step 2
Run query to write CDC records for all tables into a shared Kafka topic.
CREATE STREAM cdc_raw_sink
WITH ('store'='pubmsk', 'topic.partitions' = 1, 'topic.replicas' = 3) AS
SELECT
op,
source->schema AS src_schema,
source->table AS src_table,
ts_ms,
before,
after
FROM cdc_raw_source WITH ('postgresql.slot.name' = 'cdc')
WHERE after IS NOT NULL;
Step 3
Run one query per table to write its captured changes into its own dedicated Kafka topic.
CREATE STREAM cdc_raw_customers WITH ('store'='pubmsk', 'topic.partitions' = 1, 'topic.replicas' = 3) AS
SELECT * FROM cdc_raw_sink WITH ('starting.position'='earliest')
WHERE src_schema = 'public' AND src_table = 'customers';
CREATE STREAM cdc_raw_users WITH ('store'='pubmsk', 'topic.partitions' = 1, 'topic.replicas' = 3) AS
SELECT * FROM cdc_raw_sink WITH ('starting.position'='earliest')
WHERE src_schema = 'test' AND src_table = 'users';
Step 4
you can use "Generate Stream DDL" command to obtain the tables' schema, if the input tables' schema is complex.
GENERATE STREAM DDL customer_tb with ('store'='pgstore',
'value.format'='json','postgresql.table.name' = 'customers', 'postgresql.schema.name' = 'public', 'postgresql.db.name' = 'dbname');
GENERATE STREAM DDL users_tb with ( 'store'='pgstore',
'value.format'='json','postgresql.table.name' = 'users', 'postgresql.schema.name' = 'test', 'postgresql.db.name' = 'dbname');
Step 5
Run a DDL per topic to define table specific CDC source for the table.
CREATE STREAM customers_cdc (
op STRING,
ts_ms BIGINT,
"before" STRUCT<
`id` BIGINT,
first_name STRING,
last_name STRING,
`email` STRING,
biography STRING>,
"after" STRUCT<
`id` BIGINT,
first_name STRING,
last_name STRING,
`email` STRING,
biography STRING>
) WITH (
'value.format' = 'json',
'store' = 'pubmsk',
'topic' = 'cdc_raw_customers'
);
CREATE STREAM users_cdc (
op STRING,
ts_ms BIGINT,
"before" STRUCT<
uid VARCHAR,
`name` VARCHAR,
city VARCHAR,
balance BIGINT>,
"after" STRUCT<
uid VARCHAR,
`name` VARCHAR,
city VARCHAR,
balance BIGINT>
) WITH (
'value.format' = 'json',
'store' = 'pubmsk',
'topic' = 'cdc_raw_users'
);
Step 6
Write a query for each source to sink its changes into a Snowflake table
CREATE TABLE customers_cdc_sflk WITH (
'store' ='snowflake_store',
'snowflake.db.name'='customers_cdc',
'snowflake.schema.name'='PUBLIC'
) AS
SELECT
after->id AS id,
after->first_name AS first_name,
after->last_name AS last_name,
after->`email` AS `email`,
after->biography AS biography,
ts_ms AS event_write_time,
op
FROM customers_cdc WITH ('starting.position'='earliest');
CREATE TABLE users_cdc_sflk WITH (
'store' ='snowflake_store',
'snowflake.db.name'='users_cdc',
'snowflake.schema.name'='PUBLIC'
) AS
SELECT
after->uid AS uid,
after->`name` AS `name`,
after->city AS city,
after->balance AS balance,
ts_ms AS event_write_time,
op
FROM users_cdc WITH ('starting.position'='earliest');
Last updated