# PostgreSQL

## Description

Change Data Capture (CDC) enables you to ingest data changes from tables in a PostgreSQL [Data Store](/overview/core-concepts/store.md) and publish them to a sink. In a CDC pipeline, data changes in the source relational table are captured at the row-level operations (`INSERT`, `DELETE`, `UPDATE`) based on the table's `PRIMARY KEY`. Then they're pushed to the downstream sink in real-time.

## How a CDC Pipeline Works

### Source Record Structure

DeltaStream uses [Debezium](https://debezium.io/) to capture changes in the source relational table. This means when you define a CDC source backed by a given table in a POSTGRESQL store, you can use this JSON skeleton:

```
{
 "op": ...,
 "ts_ms": ...,
 "before": {...},
 "after": {...},
 "system": {...}
}
```

* **op:** Describes the row-level operation that caused a change in the source. It can be `c` for `CREATE`, `u` for `UPDATE`, `d` for `DELETE` or `r` for `READ`.
* **ts\_ms:** Shows the time at which the change event is processed by the CDC pipeline, in the source.
* **before** and **after**: These specify the state of the row at which the change occurred, before and/or after the change, depending on the semantics of the operation.
* **system**: Shows the metadata in the source about the operation. Some of its included fields are:
  * **db** (String): The relational database name containing the source table.
  * **schema** (String): The relational schema name containing the source table.
  * **table** (String): The relational source table name.
  * **lsn** (BigInt): The log sequence number of the change.

### Requirements for a PostgreSQL Source Store

Debezium uses PostgreSQL's logical decoding slot for streaming changes from the given relational source table.

* When you create the CDC pipeline, you need to specify a name for the replication slot. If a slot already exists with that name, the system uses the CDC job. If there's no pre-existing slot, the system creates a new replication slot and uses the slot name for the job. See below for more details.
* When you create the source PostgreSQL [Data Store](/overview/core-concepts/store.md):
  * the username you provide must have enough privileges to use an existing replication slot or create a new one.
  * the source PostgreSQL store must use logical decoding with the write-ahead log (WAL). This means you should set `wal_level` to `logical` in the source relational database. (For more information, see [PostgreSQL ALTER SYSTEM docs](https://www.postgresql.org/docs/current/sql-altersystem.html).)
  * PostgreSQL needs to have a publication created for streaming changes. This publication is created at start-up, if it does not already exist, and it includes all tables. For that, the Postgres user must have superuser permissions to create such a publication. Another option is to create a publication, named `dbz_publication` , for all tables in PostgreSQL before starting the CDC query so the connector uses that.

### Working with PostgreSQL replication slots

Each DeltaStream query that reads rows from a PostgreSQL source for a CDC pipeline must specify a **replication slot**. Replication slots in PostgreSQL represent a stream of changes that can be replayed to a client in the order in which the changes were made on the original server. Each slot has the following characteristics:

* uniquely named across all databases in a PostgreSQL instance
* persists independently from the clients using it
* crash-proof
* contains its own independent state

You cannot assign a single replication slot in DeltaStream to multiple simultaneously-running queries. By default, PostgreSQL instances limit the number of replication slots to 10.

To list, create, or drop replication slots In the PostgreSQL CLI `psql` , use the following queries:

```sql
// Check list of existing replication slots
SELECT * FROM pg_replication_slots;

// Create a new logical replication slot
SELECT pg_create_logical_replication_slot('ds_cdc', 'pgoutput');

// Delete an existing replication slot
// Note: For logical slots, this must be called while connected to the same database the slot was created on.
SELECT pg_drop_replication_slot('ds_cdc');
```

{% hint style="info" %}
**Note** For more information on replication slots, please see the following sections of the PostgreSQL documentation:

* [Replication](https://www.postgresql.org/docs/current/runtime-config-replication.html)
* [Replication Slots](https://www.postgresql.org/docs/9.4/warm-standby.html#STREAMING-REPLICATION-SLOTS)
* [Logical Decoding and Replication Slots](https://www.postgresql.org/docs/9.4/logicaldecoding-explanation.html#AEN67208)
  {% endhint %}

### Considerations

#### Storage TOAST

PostgreSQL uses a fixed page size (commonly 8 kb), so rows with large field values are compressed and/or broken up into multiple physical rows. This is the TOAST technique. See [PostgreSQL documentation](https://www.postgresql.org/docs/current/storage-toast.html#STORAGE-TOAST) on the subject to learn more.

DeltaStream follows the Debezium default behavior when generating CDC events containing TOASTed fields. If the TOASTed field is unchanged, DeltaStream replaces the value in the CDC event with `__debezium_unavailable_value`.

## How to Create a CDC Pipeline

You create a CDC pipeline in 2 steps:

1. Define the CDC source
2. Define the sink and the query to write CDC changes into it

### Step 1. Define DDL for the CDC source

Using [CREATE STREAM](/reference/sql-syntax/ddl/create-stream.md), define a [Database](/overview/core-concepts/databases.md#stream) backed by the source relational table. The parameters below are used as CDC parameters in the `WITH` clause in [CREATE STREAM](/reference/sql-syntax/ddl/create-stream.md):

| Parameter Name              | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
| --------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `postgresql.db.name`        | <p>Name of the database in the POSTGRESQL store containing the source table.</p><p><br><strong>Required:</strong> No<br><strong>Type:</strong> String<br><strong>Valid values:</strong> See <a data-mention href="/pages/ZVNhODGeUbLHKEa7VuaP">/pages/ZVNhODGeUbLHKEa7VuaP</a></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
| `postgresql.schema.name`    | <p>Name of the schema in the POSTGRESQL store containing the source table.</p><p><br><strong>Required:</strong> No<br><strong>Type:</strong> String<br><strong>Valid values:</strong> See <a data-mention href="/pages/ZVNhODGeUbLHKEa7VuaP">/pages/ZVNhODGeUbLHKEa7VuaP</a></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| `postgresql.table.name`     | <p>Name of the source table in the POSTGRESQL store.</p><p><br><strong>Required:</strong> No<br><strong>Type:</strong> String<br><strong>Valid values:</strong> See <a data-mention href="/pages/ZVNhODGeUbLHKEa7VuaP">/pages/ZVNhODGeUbLHKEa7VuaP</a></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| `postgresql.cdc.table.list` | <p>Comma-separated list of fully qualified source tables to capture using CDC. Each entry must be specified as <code>schema\_name.table\_name</code>. If a table name contains special characters (such as a dot) or uppercase letters, enclose it in double quotes — for example, <code>public."orders.v1"</code>. Regex patterns are supported to match multiple tables dynamically. (For example, <code>public.orders\_.\*</code> matches all tables in the <code>public</code> schema with names starting with <code>orders\_</code>).</p><p><strong>Note:</strong> This property can be specified either in the DDL when defining the CDC source, or as a source property at query time. If it is provided in both places, the value specified at query time takes precedence.</p><p><br><strong>Required:</strong> No<br><strong>Type:</strong> String<br><strong>Valid values:</strong> See <a data-mention href="/pages/ZVNhODGeUbLHKEa7VuaP">/pages/ZVNhODGeUbLHKEa7VuaP</a></p> |
| `value.format`              | <p>Format of the CDC record coming from the source.</p><p><br><strong>Required:</strong> Yes<br><strong>Type:</strong> String<br><strong>Valid values:</strong> <code>JSON</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| `store`                     | <p>Name of the POSTGRESQL store that hosts the relational table backing the CDC source.</p><p><br><strong>Required:</strong> Yes<br><strong>Type:</strong> String<br><strong>Valid values:</strong> See <a data-mention href="/pages/yGCSID06tqSuDcvkGevU">/pages/yGCSID06tqSuDcvkGevU</a></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |

### Step 2. Define the CREATE STREAM AS SELECT (CSAS) for CDC Sink Stream and Query

Using [CREATE STREAM AS SELECT](/reference/sql-syntax/query/create-stream-as.md), define a [Database](/overview/core-concepts/databases.md#stream) backed by the sink [Data Store](/overview/core-concepts/store.md#streaming-entity) and the query to insert CDC records into it.

#### Sink parameters

The CDC sink can write into an existing entity in the sink store or create a new entity. The below parameters are used in the WITH clause for the sink in CSAS to create the desired behavior:

| Parameter Name     | Description                                                                                                                                                                                                                                                                                                                                                                                                                             |
| ------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `topic`            | <p>Name of the <a data-mention href="/pages/khAma1ENd997ICjDGcJW#streaming-entity">/pages/khAma1ENd997ICjDGcJW#streaming-entity</a> into which the data for the CDC sink is written. If the entity doesn't exist, the system creates an entity with this name in the corresponding <code>store</code>.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> Lowercase sink name<br><strong>Type:</strong> String</p> |
| `topic.partitions` | <p>The number of partitions to use when creating the sink entity, if applicable. If the entity already exists, this value must equal the number of partitions in the existing entity.<br></p><p><strong>Required:</strong> Yes, unless topic already exists<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                           |
| `topic.replicas`   | <p>The number of replicas to use when creating the sink entity, if applicable. If the entity already exists, this value must equal the number of replicas in the existing entity.<br></p><p><strong>Required:</strong> Yes, unless topic already exists<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                               |

#### Source parameters

The source stream in the CDC pipeline captures the changes from a relational table in the source POSTGRESQL [Data Store](/overview/core-concepts/store.md). Here are the parameters you can specify in the WITH clause for the source in CSAS:

| Parameter Name                    | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| --------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `postgresql.slot.name`            | <p>Name of the PostgreSQL replication slot to use for the CDC pipeline. Note that the username you provide for the source PostgreSQL store should have the required privileges to use this slot.<br><br><strong>Required:</strong> Yes<br><strong>Type:</strong> String</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
| `scan.startup.option`             | <p>Startup mode for Postgres CDC consumer when starting.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> <code>initial</code><br><strong>Type:</strong> String<br><strong>Valid values:</strong> <code>initial</code>, <code>latest-offset</code>, <code>committed-offset</code>, <code>snapshot</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| `postgresql.decoding.plugin.name` | <p>Name of the Postgres logical decoding plug-in on the source store. Supported values are <code>decoderbufs</code>, <code>pgoutput</code>, <code>wal2json</code>, <code>wal2json\_rds</code>, <code>wal2json\_streaming</code>, and <code>wal2json\_rds\_streaming.</code><br><br><strong>Required:</strong> No<br><strong>Default value:</strong> <code>pgoutput</code><br><strong>Type:</strong> String</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
| `postgresql.cdc.table.list`       | <p>Comma-separated list of fully qualified source tables to capture using CDC. Each entry must be specified as <code>schema\_name.table\_name</code>. If a table name contains special characters (such as a dot) or uppercase letters, enclose it in double quotes — for example, <code>public."orders.v1"</code>. Regex patterns are supported to match multiple tables dynamically. (For example, <code>public.orders\_.\*</code> matches all tables in the <code>public</code> schema with names starting with <code>orders\_</code>).</p><p><strong>Note:</strong> This property can be specified either in the DDL when defining the CDC source, or as a source property at query time. If it is provided in both places, the value specified at query time takes precedence</p><p><br><strong>Required:</strong> No<br><strong>Type:</strong> String<br><strong>Valid values:</strong> See <a data-mention href="/pages/ZVNhODGeUbLHKEa7VuaP">/pages/ZVNhODGeUbLHKEa7VuaP</a></p> |

#### Passthrough Source parameters

**Passthrough CDC properties**

Passthrough CDC properties are supported for the CDC source stream. These properties must be specified using the `cdc.` prefix in their names and must be assigned a string value. They are listed in the table below. Refer to the [CDC connector options](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.5/docs/connectors/flink-sources/postgres-cdc/#incremental-snapshot-options) for more details.

| Parameter Name (with prefix)                         | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| ---------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `cdc.split.size`                                     | <p>The number of rows of table snapshot. Captured tables are split into multiple splits when read the snapshot of table.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> <code>'1000'</code><br><strong>Type:</strong> String</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
| `cdc.fetch.size`                                     | <p>The maximum fetch size for per poll when read table snapshot.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> <code>'1024'</code><br><strong>Type:</strong> String</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| `cdc.scan.incremental.close-idle-reader.enabled`     | <p>Whether to close idle readers at the end of the snapshot phase.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> <code>'false'</code><br><strong>Type:</strong> String</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| `cdc.heartbeat.interval.ms`                          | <p>The interval of sending heartbeat event for tracing the latest available replication slot offsets.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> <code>'30s'</code><br><strong>Type:</strong> String</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| `cdc.chunk-key.even-distribution.factor.lower-bound` | <p>The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the table is evenly distribution or not. The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven. The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> <code>'0.05d'</code><br><strong>Type:</strong> String</p>                                                                                                                                                                               |
| `cdc.chunk-key.even-distribution.factor.upper-bound` | <p>The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the table is evenly distribution or not. The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven. The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> <code>'1000.0d'</code><br><strong>Type:</strong> String</p>                                                                                                                                                                             |
| `cdc.connect.pool.size`                              | <p>The connection pool size.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> <code>'30'</code><br><strong>Type:</strong> String</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| `cdc.connect.max-retries`                            | <p>The max retry times that the connector should retry to build database server connection.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> <code>'3'</code><br><strong>Type:</strong> String</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
| `cdc.connect.timeout`                                | <p>The maximum time that the connector should wait after trying to connect to the PostgreSQL database server before timing out.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> <code>'30s'</code><br><strong>Type:</strong> String</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| `cdc.scan.newly-added-table.enabled`                 | <p>Enables automatic discovery of new tables when resuming a Postgres CDC pipeline. When set to <code>true</code>, any tables that match the configured tables list patterns, but were not part of the previous run, are automatically detected. When set to <code>false</code> (default), the pipeline only continues reading the tables that were part of the original state and ignores any newly created matching tables. This option has an effect only when resuming a CDC pipeline from a previous run's state and does not change the behavior of a fresh initial run.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> <code>'false'</code><br><strong>Type:</strong> String</p> |

**Passthrough Debezium properties**

CDC source also supports passthrough Debezium properties. These properties must be specified using the `cdc.debezium.` prefix. For additional details on Debezium passthrough options, refer to [Debezium's Postgres Connector properties](https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-connector-properties)

## Supported Data Types in a Postgres CDC Pipeline

| PostgreSQL type                                                              | DeltaStream SQL type |
| ---------------------------------------------------------------------------- | -------------------- |
| BOOLEAN                                                                      | BOOLEAN              |
| <p>SMALLINT<br>INT2<br>SMALLSERIAL</p>                                       | SMALLINT             |
| <p>INTEGER</p><p>INT</p><p>INT4<br>SERIAL</p>                                | INTEGER              |
| <p>BIGINT</p><p>INT8<br>BIGSERIAL</p>                                        | BIGINT               |
| <p>REAL<br>FLOAT4</p>                                                        | FLOAT                |
| <p>FLOAT8<br>DOUBLE PRECISION</p>                                            | DOUBLE               |
| <p>NUMERIC(p, s)<br>DECIMAL(p, s)</p>                                        | DECIMAL(p,s)         |
| DATE                                                                         | DATE                 |
| TIME \[(p)] \[WITHOUT TIMEZONE]                                              | TIME\[(p)]           |
| TIMESTAMP \[(p)] \[WITHOUT TIMEZONE]                                         | TIMESTAMP \[(p)]     |
| <p>CHAR(n)<br>CHARACTER(n)<br>VARCHAR(n)<br>CHARACTER VARYING(n)<br>TEXT</p> | VARCHAR              |
| <p>BYTEA<br></p>                                                             | BYTES                |

***Attention*** : PostgreSQL data types that are not listed in the table above are not supported by the DeltaStream Flink CDC Connector.

## Example

**Create CDC pipeline capturing changes from a Postgres table**

Assume a POSTGRESQL [Data Store](/overview/core-concepts/store.md) is defined with the name `pgstore` and it has a table named `pageviews` under the schema named `public` in the `report` database. Here is how the data displays in `pageviews`. Each row has 3 columns — `viewtime`, `userid`, and `pageid` — and shows when a given page was visited by a specific user:

```sh
demodb.public/pgstore# PRINT ENTITY "public".pageviews;
+-----------------+-------------+-------------+
| viewtime        | userid      | pageid      |
+=================+=============+=============+
| 1694124853651   | User_4      | Page_29     |
+-----------------+-------------+-------------+
| 1694124856731   | User_1      | Page_59     |
+-----------------+-------------+-------------+
| 1694124857732   | User_1      | Page_63     |
+-----------------+-------------+-------------+
```

Use the DDL statement below to create a [Database](/overview/core-concepts/databases.md#stream) named `pageviews_cdc,` backed by the `pageviews` table to capture its CDC records:

```sql
CREATE STREAM pageviews_cdc(
  op VARCHAR,
  ts_ms BIGINT,
  `before` STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>, 
  `after`  STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>, 
  `source` STRUCT<db VARCHAR, `schema` VARCHAR, `table` VARCHAR, `lsn` BIGINT>)
WITH (
  'store'='pgstore', 
  'value.format'='json',
  'postgresql.db.name'='report',
  'postgresql.schema.name'='public',
  'postgresql.table.name'='pageviews');
```

The `pageviews_cdc` stream is used as the source in the CDC pipeline. Whenever an `INSERT`, `DELETE`, or `UPDATE` happens on `pageviews` in the relational database, a corresponding record is generated in `pageviews_cdc` to capture the change. Now use the below CSAS statement to define a stream as the sink and run a query to write those changes into it:

```sql
CREATE STREAM cdc_sink WITH (
  'topic.partitions' = 1,
  'topic.replicas' = 1) AS 
SELECT * FROM pageviews_cdc WITH ('postgresql.slot.name' = 'slot001');
```

Note that in the above CSAS you must provide the topic properties, as a new topic is created in the sink store. To use a pre-existing topic such as `cdc_logs` in the sink store, you can replace topic properties with the topic name in the `WITH` clause:

```sql
CREATE STREAM cdc_sink WITH ('topic' = 'cdc_logs') AS 
SELECT * FROM pageviews_cdc WITH ('postgresql.slot.name' = 'slot002');
```

As an example, assume a new record is added to `pageviews` showing that `User_5` visited `Page_94`. Following this `INSERT` operation, you see a record similar to the below in `cdc_sink` published via the CDC pipeline defined above:

```sh
{
 "op":"c",
 "ts_ms":1693430399726,
 "before":null,
 "after":{"viewtime":1693430399292,"userid":"User_5","pageid":"Page_94"},
 "source":{"db":"report","schema":"public","table":"pageviews","lsn":38990016}
}
```

Now imagine you're only interested in the `DELETE` events in the source and wish to write the `userid` and `lsn` for each `DELETE`. You can use the query below to create such a CDC pipeline:

```sql
CREATE STREAM cdc_sink WITH ('topic' = 'cdc_logs') AS 
SELECT ts_ms AS event_ts,
       `before`->userid AS uid,
       `source`->`lsn` AS seq_num
FROM pageviews_cdc WITH ('postgresql.slot.name' = 'slot003')
WHERE op = 'd';
```

**Create CDC pipeline capturing changes from multiple tables**

Now consider a scenario similar to the previous example, except that besides the `pageviews` table there is another table called `users` under the `staff` schema within the `report` database in the `pgstore` store.

Suppose you want to create a CDC pipeline to capture changes for **both tables** using a single replication slot. First, define a stream as the CDC source:

```sql
CREATE STREAM cdc_source(
  op VARCHAR,
  ts_ms BIGINT,
  `before` BYTES,
  `after`  BYTES, 
  `source` STRUCT<`schema` VARCHAR, `table` VARCHAR>)
WITH (
  'store'='pgstore', 
  'value.format'='json',
  'postgresql.cdc.table.list'='public.pageviews,staff.users');
```

The `cdc_source` stream acts as the CDC source in your pipeline. Whenever an `INSERT`, `DELETE`, or `UPDATE` happens on either the `pageviews` or `users` table in the source database, a corresponding change event is captured and written into this stream.

Now, use the following CSAS statement to define a downstream stream as the sink and run a query to write these changes into it. Each output record includes the operation type and timestamp (`op` and `ts_ms`), along with the source schema and table name, which indicate where the changes came from.

You can use these fields to filter or route changes for each table in subsequent queries that read from `cdc_sink`.

```sql
CREATE STREAM cdc_sink WITH (
  'topic.partitions' = 1,
  'topic.replicas' = 1) AS 
SELECT
  `source`->`schema` AS src_schema,
  `source`->`table` AS src_table,
  `before`,
  `after`,
  `op`,
  `ts_ms`
FROM cdc_source WITH ('postgresql.slot.name' = 'slot004');
```

Run one query per table to write the captured changes into their own dedicated Kafka topic.

<pre class="language-sql"><code class="lang-sql">// Query 1
CREATE pageviews_raw_cdc WITH ('store'='kafka_store') AS
SELECT `before`,
<strong>    `after`,
</strong>    `op`,
    `ts_ms`
FROM cdc_raw_sink

WHERE src_schema = ‘public’ AND `src_table` = ‘pageviews’;

<strong>// Query 2
</strong>CREATE orders_raw_cdc WITH ('store'='kafka_store') AS
SELECT `before`,
    `after`,
    `op`,
    `ts_ms`
FROM cdc_raw_sink
WHERE src_schema = ‘public’ AND `src_table` = ‘orders’;

</code></pre>

Now run a DDL per topic to define a table-specific CDC source for the table.

```sql
CREATE STREAM pageviews_cdc(
op VARCHAR,

ts_ms BIGINT,
`before` STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>,

`after` STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>,
) WITH (
    'store'='kafka_store',
    'value.format'='json',
    'topic'='pageviews_raw_cdc’);
```

```sql
CREATE STREAM orders_cdc(
    op VARCHAR,
    ts_ms BIGINT,
    `before` STRUCT<ts BIGINT, order_id VARCHAR, price INTEGER>,
    `after` STRUCT<ts BIGINT, order_id VARCHAR, price INTEGER>,

) WITH (
    'store'='kafka_store',
    'value.format'='json',
    'topic'='orders_raw_cdc’);
```

You could write further queries on the above source relations, again defined per table on dedicated topics, to process and write results into the sink of your choice, such as a Snowflake table.<br>

**Create CDC pipeline with an updated list of source tales**

Assume a CDC query is already running with the query ID\
`0196fa3a-3851-7307-a54f-4330e17a91f9`. This query uses the PostgreSQL replication slot `slot004` and captures changes from the following tables: `'public.pageviews,staff.users'` .

Now suppose you want to add a new table to the CDC source and re-run the query such that:

* Existing tables continue consuming changes from where they left off, and
* Newly added tables go through an initial snapshot, followed by streaming CDC events.

To achieve this, you can start a new CDC query with the updated table list, passed as a source property. This new query should be resumed from the original query and uses the same replication slot. For that, follow these steps:

* Terminate the original CDC query before starting the new one (required to reuse the replication slot).
* Reuse the same replication slot (`slot004`).
* Resume from the previous query using `resume.from.query.id`.
* Enable snapshotting for newly added tables by setting the passthrough CDC property\
  `cdc.scan.newly-added-table.enabled = true`

```sql
CREATE STREAM cdc_sink WITH (
  'topic.partitions' = 1,
  'topic.replicas' = 1) AS 
SELECT
  `source`->`schema` AS src_schema,
  `source`->`table` AS src_table,
  `before`,
  `after`,
  `op`,
  `ts_ms`
FROM cdc_source WITH (
 'postgresql.slot.name' = 'slot004',
 'postgresql.cdc.table.list'='public.pageviews,staff.users,public.users',
 'cdc.scan.newly-added-table.enabled' = 'true')
 QUERY WITH ('resume.from.query.id' = "0196fa3a-3851-7307-a54f-4330e17a91f9");
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.deltastream.io/reference/sql-syntax/query/change-data-capture-cdc/postgresql.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
