CDC Pipeline Using a Single Slot

Debezium can capture change data (CDC) from multiple tables within the same PostgreSQL database using just one replication slot. This means it doesn’t need to create a separate slot for each table - it streams all table changes through a single slot, which reduces overhead and makes replication more efficient.

Instead of maintaining one replication slot per table, all relevant changes are multiplexed through a single slot, minimizing replication slot overhead, simplifying resource management, and ensuring consistent log consumption across the subscribed tables.

Advantages

  1. Reduced Overhead – One slot serves many tables, eliminating the need to manage multiple slots and lowering the risk of hitting PostgreSQL’s replication slot limits.

  2. Simplified Resource Management – Easier monitoring, configuration, and slot lifecycle handling.

  3. Consistent WAL Stream – Ensures changes across multiple tables are captured in a single ordered stream, which can help with cross-table consistency in downstream processing.

Trade-offs & Operational Considerations

  1. Slot Bloat / WAL Retention

    • PostgreSQL retains WAL segments required by active slots until they are consumed.

    • If Debezium lags, WAL segments can accumulate, consuming disk space.

    • Mitigation: monitor consumer lag, configure alerting, and size storage appropriately.

  2. Single Point of Bottleneck

    • Since one slot handles all tables, a slowdown in Debezium processing (or a paused connector) can stall WAL replay for all tables.

    • Mitigation: deploy multiple connectors (with separate slots) if isolating workloads is necessary.

  3. Failover Handling

    • Replication slots are bound to a specific PostgreSQL instance. In a failover scenario, slots do not automatically transfer to a standby.

    • Mitigation: use tooling (e.g., pglogical, Patroni, or custom failover slot synchronization) to recreate slots on the new primary.

  4. Throughput vs. Complexity

    • A single slot simplifies architecture, but for very high-volume environments, splitting workloads across multiple slots/connectors may improve throughput and fault isolation.Pipeline creation in DeltaStream

Pipeline creation in DeltaStream

Example Scenario

Create a CDC pipeline that captures changes from two PostgreSQL tables—public.customers and test.users—and writes each table’s changes into a corresponding Snowflake table.

Step 1

Run DDL to define a single CDC source to read changes that Debezium captured for source tables.

When capturing changes from multiple tables with different schemas, the before and after fields, representing the state of a row before and after a change, are defined as BYTES. This choice allows flexibility, as using a fixed schema would not accommodate the variety in column names and data types across different source tables. The BYTES data type enables a generic representation of these changes until they are transformed and written into a destination with defined schemas.

CREATE STREAM cdc_raw_source (
  op VARCHAR,
  ts_ms BIGINT,
  `before` BYTES,
  `after`  BYTES,
  `source` STRUCT<schema VARCHAR, table VARCHAR>
) WITH (
  'store'='ec2pg',
  'value.format'='json',
  'postgresql.cdc.table.list'='public.customers,test.users');

When capturing changes from multiple tables, set the postgresql.cdc.table.list source property to a comma-separated list of fully qualified table names in Postgres in the form <schema>.<table>.

Step 2

Run query to write CDC records for all tables into a shared Kafka topic.

CREATE STREAM cdc_raw_sink
WITH ('store'='pubmsk', 'topic.partitions' = 1, 'topic.replicas' = 3) AS
SELECT
  op,
  source->schema AS src_schema,
  source->table AS src_table,
  ts_ms,
  before,
  after
FROM cdc_raw_source WITH ('postgresql.slot.name' = 'cdc')
WHERE after IS NOT NULL;

Step 3

Run one query per table to write its captured changes into its own dedicated Kafka topic.

CREATE STREAM cdc_raw_customers WITH ('store'='pubmsk', 'topic.partitions' = 1, 'topic.replicas' = 3) AS
SELECT * FROM cdc_raw_sink WITH ('starting.position'='earliest')
WHERE src_schema = 'public' AND src_table = 'customers';
CREATE STREAM cdc_raw_users WITH ('store'='pubmsk', 'topic.partitions' = 1, 'topic.replicas' = 3) AS
SELECT * FROM cdc_raw_sink WITH ('starting.position'='earliest')
WHERE src_schema = 'test' AND src_table = 'users';

Step 4

you can use "Generate Stream DDL" command to obtain the tables' schema, if the input tables' schema is complex.

GENERATE STREAM DDL customer_tb with ('store'='pgstore', 
  'value.format'='json','postgresql.table.name' = 'customers', 'postgresql.schema.name' = 'public', 'postgresql.db.name' = 'dbname');
GENERATE STREAM DDL users_tb with ( 'store'='pgstore', 
  'value.format'='json','postgresql.table.name' = 'users', 'postgresql.schema.name' = 'test', 'postgresql.db.name' = 'dbname');

Step 5

Run a DDL per topic to define table specific CDC source for the table.

CREATE STREAM customers_cdc (
  op STRING,
  ts_ms BIGINT,
  "before" STRUCT<
    `id` BIGINT,
    first_name STRING,
    last_name STRING,
    `email` STRING,
    biography STRING>,
  "after" STRUCT<
    `id` BIGINT,
    first_name STRING,
    last_name STRING,
    `email` STRING,
    biography STRING>
) WITH (
  'value.format' = 'json',
  'store' = 'pubmsk',
  'topic' = 'cdc_raw_customers'
);
CREATE STREAM users_cdc (
  op STRING,
  ts_ms BIGINT,
  "before" STRUCT<
    uid VARCHAR,
    `name` VARCHAR,
    city VARCHAR,
    balance BIGINT>,
  "after" STRUCT<
    uid VARCHAR,
    `name` VARCHAR,
    city VARCHAR,
    balance BIGINT>
) WITH (
  'value.format' = 'json',
  'store' = 'pubmsk',
  'topic' = 'cdc_raw_users'
);

Step 6

Write a query for each source to sink its changes into a Snowflake table

CREATE TABLE customers_cdc_sflk WITH (
'store' ='snowflake_store',
'snowflake.db.name'='customers_cdc',
'snowflake.schema.name'='PUBLIC'
) AS
SELECT
  after->id AS id,
  after->first_name AS first_name,
  after->last_name AS last_name,
  after->`email` AS `email`,
  after->biography AS biography,
  ts_ms AS event_write_time,
  op
FROM customers_cdc WITH ('starting.position'='earliest');
CREATE TABLE users_cdc_sflk WITH (
'store' ='snowflake_store',
'snowflake.db.name'='users_cdc',
'snowflake.schema.name'='PUBLIC'
) AS
SELECT
  after->uid AS uid,
  after->`name` AS `name`,
  after->city AS city,
  after->balance AS balance,
  ts_ms AS event_write_time,
  op
FROM users_cdc WITH ('starting.position'='earliest');

Last updated