# CDC Pipeline Using a Single Slot

Debezium can capture change data (CDC) from multiple tables within the same PostgreSQL database using just **one replication slot**. This means it doesn’t need to create a separate slot for each table - it streams all table changes through a single slot, which reduces overhead and makes replication more efficient.

Instead of maintaining one replication slot per table, all relevant changes are multiplexed through a single slot, minimizing replication slot overhead, simplifying resource management, and ensuring consistent log consumption across the subscribed tables.

## Pipeline creation in DeltaStream <a href="#pipeline-creation-in-deltastream" id="pipeline-creation-in-deltastream"></a>

### Example Scenario

Create a CDC pipeline that captures changes from two PostgreSQL tables—`public.customers` and `test.users`—and writes each table’s changes into a corresponding Snowflake table.

### Step 1 <a href="#heading-title-text" id="heading-title-text"></a>

Run DDL to define a single CDC source to read changes that Debezium captured for source tables.

When capturing changes from multiple tables with different schemas, the `before` and `after` fields, representing the state of a row before and after a change, are defined as `BYTES`. This choice allows flexibility, as using a fixed schema would not accommodate the variety in column names and data types across different source tables. The `BYTES` data type enables a generic representation of these changes until they are transformed and written into a destination with defined schemas.

```sql
CREATE STREAM cdc_raw_source (
  op VARCHAR,
  ts_ms BIGINT,
  `before` BYTES,
  `after`  BYTES,
  `source` STRUCT<`schema` VARCHAR, `table` VARCHAR>
) WITH (
  'store'='postgres_store',
  'value.format'='json',
  'postgresql.cdc.table.list'='public.customers,test.users');
```

When capturing changes from multiple tables, set the `postgresql.cdc.table.list` source property to a comma-separated list of fully qualified table names in Postgres in the form `<schema>.<table>.`

### Step 2

Run query to write CDC records for all tables into a shared Kafka topic.

```sql
CREATE STREAM cdc_raw_sink
WITH ('store'='kafka_store', 'topic.partitions' = 1, 'topic.replicas' = 3) AS
SELECT
  op,
  source->`schema` AS src_schema,
  source->`table` AS src_table,
  ts_ms,
  `before`,
  `after`
FROM cdc_raw_source WITH ('postgresql.slot.name' = 'cdc')
WHERE after IS NOT NULL;
```

### Step 3

Run one query per table to write its captured changes into its own dedicated Kafka topic.

<pre class="language-sql" data-overflow="wrap"><code class="lang-sql"><strong>CREATE STREAM cdc_raw_customers WITH ('store'='kafka_store', 'topic.partitions' = 1, 'topic.replicas' = 3) AS
</strong>SELECT * FROM cdc_raw_sink WITH ('starting.position'='earliest')
WHERE src_schema = 'public' AND src_table = 'customers';
</code></pre>

{% code overflow="wrap" %}

```sql
CREATE STREAM cdc_raw_users WITH ('store'='kafka_store', 'topic.partitions' = 1, 'topic.replicas' = 3) AS
SELECT * FROM cdc_raw_sink WITH ('starting.position'='earliest')
WHERE src_schema = 'test' AND src_table = 'users';
```

{% endcode %}

### Step 4

you can use "Generate Stream DDL" command to obtain the tables' schema, if the input tables' schema is complex.

{% code overflow="wrap" %}

```sql
GENERATE STREAM DDL customers_tb with ('store'='postgres_store', 
  'value.format'='json','postgresql.table.name' = 'customers', 'postgresql.schema.name' = 'public', 'postgresql.db.name' = 'dbname');
```

{% endcode %}

<pre class="language-sql" data-overflow="wrap"><code class="lang-sql">GENERATE STREAM DDL users_tb with ( 'store'='postgres_store', 
<strong>  'value.format'='json','postgresql.table.name' = 'users', 'postgresql.schema.name' = 'test', 'postgresql.db.name' = 'dbname');
</strong></code></pre>

### Step 5

Run a DDL per topic to define table specific CDC source for the table.

```sql
CREATE STREAM customers_cdc (
  op STRING,
  ts_ms BIGINT,
  "before" STRUCT<
    `id` BIGINT,
    first_name STRING,
    last_name STRING,
    `email` STRING,
    biography STRING>,
  "after" STRUCT<
    `id` BIGINT,
    first_name STRING,
    last_name STRING,
    `email` STRING,
    biography STRING>
) WITH (
  'value.format' = 'json',
  'store' = 'kafka_store',
  'topic' = 'cdc_raw_customers'
);
```

```sql
CREATE STREAM users_cdc (
  op STRING,
  ts_ms BIGINT,
  "before" STRUCT<
    uid VARCHAR,
    `name` VARCHAR,
    city VARCHAR,
    balance BIGINT>,
  "after" STRUCT<
    uid VARCHAR,
    `name` VARCHAR,
    city VARCHAR,
    balance BIGINT>
) WITH (
  'value.format' = 'json',
  'store' = 'kafka_store',
  'topic' = 'cdc_raw_users'
);
```

### Step 6

Write a query for each source to sink its changes into a Snowflake table

```sql
CREATE TABLE customers_cdc_sflk WITH (
'store' ='snowflake_store',
'snowflake.db.name'='sflk_db',
'snowflake.schema.name'='PUBLIC'
) AS
SELECT
  after->id AS id,
  after->first_name AS first_name,
  after->last_name AS last_name,
  after->`email` AS `email`,
  after->biography AS biography,
  ts_ms AS event_write_time,
  op
FROM customers_cdc WITH ('starting.position'='earliest');
```

```sql
CREATE TABLE users_cdc_sflk WITH (
'store' ='snowflake_store',
'snowflake.db.name'='sflk_db',
'snowflake.schema.name'='PUBLIC'
) AS
SELECT
  after->uid AS uid,
  after->`name` AS `name`,
  after->city AS city,
  after->balance AS balance,
  ts_ms AS event_write_time,
  op
FROM users_cdc WITH ('starting.position'='earliest');
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.deltastream.io/how-do-i.../cdc-pipeline-using-a-single-slot.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
