LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Region
      • SQL
      • Store
      • Database
      • Query
      • Visualizing Data Lineage
      • Function
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • Tutorials
    • Managing Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Creating Stores for Streaming Data
    • Using Multiple Stores in Queries
    • Creating Relations to Structure Raw Data
    • Namespacing with Database and Schema
    • Creating and Querying Materialized Views
    • Creating a Function
    • Securing Your Connections to Data Stores
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Integrations
      • Connecting to Confluent Cloud
      • Databricks
      • PostgreSQL
      • Snowflake
      • WarpStream
    • Serialization
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Reference
    • Enterprise Security Integrations
      • Okta SAML Integration
      • Okta SCIM Integration
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST REGIONS
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • Rest API
Powered by GitBook
On this page
  • Description
  • How a CDC Pipeline Works
  • Source Record Structure
  • Requirements for a PostgreSQL Source Store
  • Working with PostgreSQL replication slots
  • Considerations
  • How to Create a CDC Pipeline
  • Step 1. Define DDL for CDC source
  • Step 2. Define the CSAS for CDC Sink Stream and Query
  • Example
  1. Reference
  2. SQL Syntax
  3. Query

Change Data Capture (CDC)

PreviousAPPLICATIONNextCREATE CHANGELOG AS SELECT

Last updated 3 months ago

Description

Change Data Capture (CDC) enables you to ingest data changes from tables in a relational database and publish them to a sink. In a CDC pipeline, data changes in the source relational table are captured at the row-level operations (INSERT, DELETE, UPDATE) based on the table's PRIMARY KEY. Then they're pushed to the downstream sink in real-time.

Currently, you can create a CDC pipeline with a source backed by a table in a PostgreSQL Store.

How a CDC Pipeline Works

Source Record Structure

DeltaStream uses to capture changes in the source relation table. This means when defining a CDC source backed by a given table in a POSTGRESQL store, you can use this JSON skeleton:

{
 "op": ...,
 "ts_ms": ...,
 "before": {...},
 "after": {...},
 "system": {...}
}
  • op: Describes the row-level operation that caused a change in the source. It can be c for CREATE, u for UPDATE, d for DELETE or r for READ.

  • ts_ms: Shows the time at which the change event is processed by the CDC pipeline, in the source.

  • before and after: These specify the state of the row at which the change occurred, before and/or after the change, depending on the semantics of the operation.

  • system: Shows the metadata in the source about the operation. Some of its included fields are:

    • db (String): The relational database name containing the source table.

    • schema (String): The relational schema name containing the source table.

    • table (String): The relational source table name.

    • lsn (BigInt): The log sequence number of the change.

Requirements for a PostgreSQL Source Store

Debezium uses PostgreSQL's logical decoding slot for streaming changes from the given relational source table.

  • When you create the CDC pipeline, you specify an existing slot's name to use for the job. If you do not, DeltaStream creates a new replication slot. (See the below section.)

  • When you create the source PostgreSQL Store

    • the username you provide must have enough privileges to use an existing replication slot or create a new one.

Working with PostgreSQL replication slots

Each DeltaStream query that reads rows from a PostgreSQL table must specify a replication slot. Replication slots in PostgreSQL represent a stream of changes that can be replayed to a client in the order in which they were made on the original server. Each slot has the following characteristics:

  • uniquely named across all databases in a PostgreSQL instance

  • persists independently from the clients using it

  • crash proof

  • contains its own independent state

You cannot assign a single replication slot in DeltaStream to multiple simultaneously-running queries. By default, PostgreSQL instances limit the number of replication slots to 10.

In the PostgreSQL CLI psql you can list, create, or drop replication slots with the following queries:

// Check list of existing replication slots
SELECT * FROM pg_replication_slots;

// Create a new logical replication slot
SELECT pg_create_logical_replication_slot('ds_cdc', 'pgoutput');

// Delete an existing replication slot
// Note: For logical slots, this must be called while connected to the same database the slot was created on.
SELECT pg_drop_replication_slot('ds_cdc');

Note For more information on replication slots, please see the following sections of the PostgreSQL documentation:

Considerations

Storage TOAST

DeltaStream follows the Debezium default behavior when generating CDC events containing TOASTed fields. If the TOASTed field is unchanged, then the value in the CDC event is replaced with __debezium_unavailable_value.

How to Create a CDC Pipeline

You create a CDC pipeline in two steps:

  1. Define the CDC source

  2. Define the sink and the query to write CDC changes into it

Step 1. Define DDL for CDC source

Parameter Name
Description

postgresql.db.name

Name of the database in the POSTGRESQL store containing the source table.

postgresql.schema.name

Name of the schema in the POSTGRESQL store containing the source table.

postgresql.table.name

Name of the source table in the POSTGRESQL store.

value.format

Format of the CDC record coming from the source.

Required: Yes Type: String Valid values: JSON

store

Name of the POSTGRESQL store that hosts the relational table backing the CDC source.

Step 2. Define the CSAS for CDC Sink Stream and Query

Sink parameters

The CDC sink can write into an existing entity in the sink store or create a new entity. The below parameters are used in the WITH clause for the sink in CSAS to create the desired behavior:

Parameter Name
Description

topic

topic.partitions

The number of partitions to use when creating the sink entity, if applicable. If the entity already exists, this value must equal the number of partitions in the existing entity.

Required: Yes, unless topic already exists Type: Integer Valid values: [1, ...]

topic.replicas

The number of replicas to use when creating the sink entity, if applicable. If the entity already exists, this value must equal the number of replicas in the existing entity.

Required: Yes, unless topic already exists Type: Integer Valid values: [1, ...]

Source parameters

The source stream in the CDC pipeline captures the changes from a relational table in the source POSTGRESQL Store. Here are the parameters you can specify in the WITH clause for the source in CSAS:

Parameter Name
Description

postgresql.slot.name

Name of the existing PostgreSQL replication slot name to use for the CDC pipeline. Note that the username you provide for the source PostgreSQL store should have the required privileges to use this slot. Required: No Default value: ds_cdc Type: String

postgresql.decoding.plugin.name

Name of the Postgres logical decoding plug-in on the source store. Supported values are decoderbufs, pgoutput, wal2json, wal2json_rds, wal2json_streaming, and wal2json_rds_streaming. Required: No Default value: pgoutput Type: String

Example

Assume a POSTGRESQL Store is defined with the name pgstore and it has a table named pageviews under the schema named public in the report database. Here is how the data displays in pageviews. Each row has 3 columns — viewtime, userid, and pageid — and shows when a given page was visited by a specific user:

demodb.public/pgstore# PRINT ENTITY "public".pageviews;
+-----------------+-------------+-------------+
| viewtime        | userid      | pageid      |
+=================+=============+=============+
| 1694124853651   | User_4      | Page_29     |
+-----------------+-------------+-------------+
| 1694124856731   | User_1      | Page_59     |
+-----------------+-------------+-------------+
| 1694124857732   | User_1      | Page_63     |
+-----------------+-------------+-------------+
CREATE STREAM pageviews_cdc(
  op VARCHAR,
  ts_ms BIGINT,
  `before` STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>, 
  `after`  STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>, 
  `source` STRUCT<db VARCHAR, `schema` VARCHAR, `table` VARCHAR, `lsn` BIGINT>)
WITH (
  'store'='pgstore', 
  'value.format'='json',
  'postgresql.db.name'='report',
  'postgresql.schema.name'='public',
  'postgresql.table.name'='pageviews');

The pageviews_cdc stream is used as the source in the CDC pipeline. Whenever an INSERT, DELETE, or UPDATE happens on pageviews in the relational database, a corresponding record is generated in pageviews_cdc to capture the change. Now use the below CSAS statement to define a stream as the sink and run a query to write those changes into it:

CREATE STREAM cdc_sink WITH (
  'topic.partitions' = 1,
  'topic.replicas' = 1) AS 
SELECT * FROM pageviews_cdc;

Note that in the above CSAS you must provide the topic properties, as a new topic is created in the sink store. To use a pre-existing topic such as cdc_logs in the sink store, you can replace topic properties with the topic name in the WITH clause:

CREATE STREAM cdc_sink WITH ('topic' = 'cdc_logs') AS 
SELECT * FROM pageviews_cdc;

As an example, assume a new record is added to pageviews showing that User_5 visited Page_94. Following this INSERT operation, you see a record similar to the below in cdc_sink published via the CDC pipeline defined above:

{
 "op":"c",
 "ts_ms":1693430399726,
 "before":null,
 "after":{"viewtime":1693430399292,"userid":"User_5","pageid":"Page_94"},
 "source":{"db":"report","schema":"public","table":"pageviews","lsn":38990016}
}

Now imagine you're only interested in the DELETE events in the source and wish to write the userid and lsn for each DELETE. You can use the query below to create such a CDC pipeline:

CREATE STREAM cdc_sink WITH ('topic' = 'cdc_logs') AS 
SELECT ts_ms AS event_ts,
       `before`->userid AS uid,
       `source`->`lsn` AS seq_num
FROM pageviews_cdc
WHERE op = 'd';

the source PostgreSQL store must use logical decoding with the write-ahead log (WAL). This means you should set wal_level to logical in the source relational database. (For more information, see .)

PostgreSQL uses a fixed page size (commonly 8 kb), so rows with large field values are compressed and/or broken up into multiple physical rows. This technique is referred to as TOAST. See on the subject to learn more.

Using CREATE STREAM, define a backed by the source relational table. The parameters below are used as CDC parameters in the WITH clause in CREATE STREAM:

Required: Yes Type: String Valid values: See

Required: Yes Type: String Valid values: See

Required: Yes Type: String Valid values: See

Required: Yes Type: String Valid values: See

Using CREATE STREAM AS SELECT, define a backed by the sink and the query to insert CDC records into it.

Name of the into which the data for the CDC sink is written. If the entity doesn't exist, an entity with this name is created in the corresponding store. Required: No Default value: Lowercase sink name Type: String

Use the DDL statement below to create a named pageviews_cdc,backed by the pageviews table to capture its CDC records:

Debezium
PostgreSQL ALTER SYSTEM docs
Replication
Replication Slots
Logical Decoding and Replication Slots
PostgreSQL documentation
LIST ENTITIES
LIST ENTITIES
LIST ENTITIES
LIST STORES
Streaming Entity
Streaming Entity
Stream
Stream
Stream