# CDC Pipeline Using a Single Slot

Debezium can capture change data (CDC) from multiple tables within the same PostgreSQL database using just **one replication slot**. This means it doesn’t need to create a separate slot for each table - it streams all table changes through a single slot, which reduces overhead and makes replication more efficient.

Instead of maintaining one replication slot per table, all relevant changes are multiplexed through a single slot, minimizing replication slot overhead, simplifying resource management, and ensuring consistent log consumption across the subscribed tables.

## Pipeline creation in DeltaStream <a href="#pipeline-creation-in-deltastream" id="pipeline-creation-in-deltastream"></a>

### Example Scenario

Create a CDC pipeline that captures changes from two PostgreSQL tables—`public.customers` and `test.users`—and writes each table’s changes into a corresponding Snowflake table.

### Step 1 <a href="#heading-title-text" id="heading-title-text"></a>

Run DDL to define a single CDC source to read changes that Debezium captured for source tables.

When capturing changes from multiple tables with different schemas, the `before` and `after` fields, representing the state of a row before and after a change, are defined as `BYTES`. This choice allows flexibility, as using a fixed schema would not accommodate the variety in column names and data types across different source tables. The `BYTES` data type enables a generic representation of these changes until they are transformed and written into a destination with defined schemas.

```sql
CREATE STREAM cdc_raw_source (
  op VARCHAR,
  ts_ms BIGINT,
  `before` BYTES,
  `after`  BYTES,
  `source` STRUCT<`schema` VARCHAR, `table` VARCHAR>
) WITH (
  'store'='postgres_store',
  'value.format'='json',
  'postgresql.cdc.table.list'='public.customers,test.users');
```

When capturing changes from multiple tables, set the `postgresql.cdc.table.list` source property to a comma-separated list of fully qualified table names in Postgres in the form `<schema>.<table>.`&#x20;

### Step 2

Run query to write CDC records for all tables into a shared Kafka topic.

```sql
CREATE STREAM cdc_raw_sink
WITH ('store'='kafka_store', 'topic.partitions' = 1, 'topic.replicas' = 3) AS
SELECT
  op,
  source->`schema` AS src_schema,
  source->`table` AS src_table,
  ts_ms,
  `before`,
  `after`
FROM cdc_raw_source WITH ('postgresql.slot.name' = 'cdc')
WHERE after IS NOT NULL;
```

### Step 3

Run one query per table to write its captured changes into its own dedicated Kafka topic.

<pre class="language-sql" data-overflow="wrap"><code class="lang-sql"><strong>CREATE STREAM cdc_raw_customers WITH ('store'='kafka_store', 'topic.partitions' = 1, 'topic.replicas' = 3) AS
</strong>SELECT * FROM cdc_raw_sink WITH ('starting.position'='earliest')
WHERE src_schema = 'public' AND src_table = 'customers';
</code></pre>

{% code overflow="wrap" %}

```sql
CREATE STREAM cdc_raw_users WITH ('store'='kafka_store', 'topic.partitions' = 1, 'topic.replicas' = 3) AS
SELECT * FROM cdc_raw_sink WITH ('starting.position'='earliest')
WHERE src_schema = 'test' AND src_table = 'users';
```

{% endcode %}

### Step 4

you can use "Generate Stream DDL" command to obtain the tables' schema, if the input tables' schema is complex.

{% code overflow="wrap" %}

```sql
GENERATE STREAM DDL customers_tb with ('store'='postgres_store', 
  'value.format'='json','postgresql.table.name' = 'customers', 'postgresql.schema.name' = 'public', 'postgresql.db.name' = 'dbname');
```

{% endcode %}

<pre class="language-sql" data-overflow="wrap"><code class="lang-sql">GENERATE STREAM DDL users_tb with ( 'store'='postgres_store', 
<strong>  'value.format'='json','postgresql.table.name' = 'users', 'postgresql.schema.name' = 'test', 'postgresql.db.name' = 'dbname');
</strong></code></pre>

### Step 5

Run a DDL per topic to define table specific CDC source for the table.

```sql
CREATE STREAM customers_cdc (
  op STRING,
  ts_ms BIGINT,
  "before" STRUCT<
    `id` BIGINT,
    first_name STRING,
    last_name STRING,
    `email` STRING,
    biography STRING>,
  "after" STRUCT<
    `id` BIGINT,
    first_name STRING,
    last_name STRING,
    `email` STRING,
    biography STRING>
) WITH (
  'value.format' = 'json',
  'store' = 'kafka_store',
  'topic' = 'cdc_raw_customers'
);
```

```sql
CREATE STREAM users_cdc (
  op STRING,
  ts_ms BIGINT,
  "before" STRUCT<
    uid VARCHAR,
    `name` VARCHAR,
    city VARCHAR,
    balance BIGINT>,
  "after" STRUCT<
    uid VARCHAR,
    `name` VARCHAR,
    city VARCHAR,
    balance BIGINT>
) WITH (
  'value.format' = 'json',
  'store' = 'kafka_store',
  'topic' = 'cdc_raw_users'
);
```

### Step 6

Write a query for each source to sink its changes into a Snowflake table

```sql
CREATE TABLE customers_cdc_sflk WITH (
'store' ='snowflake_store',
'snowflake.db.name'='sflk_db',
'snowflake.schema.name'='PUBLIC'
) AS
SELECT
  after->id AS id,
  after->first_name AS first_name,
  after->last_name AS last_name,
  after->`email` AS `email`,
  after->biography AS biography,
  ts_ms AS event_write_time,
  op
FROM customers_cdc WITH ('starting.position'='earliest');
```

```sql
CREATE TABLE users_cdc_sflk WITH (
'store' ='snowflake_store',
'snowflake.db.name'='sflk_db',
'snowflake.schema.name'='PUBLIC'
) AS
SELECT
  after->uid AS uid,
  after->`name` AS `name`,
  after->city AS city,
  after->balance AS balance,
  ts_ms AS event_write_time,
  op
FROM users_cdc WITH ('starting.position'='earliest');
```
