LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Compute Pools
      • Data Store
      • Database
      • Function
      • Query
      • SQL
      • Visualizing Data Lineage
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • How do I...?
    • Create and Manage Data Stores
      • Create Data Stores for Streaming Data
      • Explore Data Store and Topic Details
      • Use Multiple Data Stores in Queries
    • Manage Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Create DeltaStream Objects to Structure Raw Data
    • Use Namespacing for Organizing Data
    • Create and Query Materialized Views
    • Create a Compute Pool to Work with Iceberg
    • Create a Function
    • Secure my Connection to a Data Store
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Serialize my Data
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Integrations
    • Setting up Data Store Integrations
      • AWS S3
      • ClickHouse
      • Confluent Cloud
      • Databricks
      • Iceberg REST Catalog
      • PostgreSQL
      • Snowflake
      • WarpStream
  • Setting up Enterprise Security Integrations
    • Okta SAML Integration
    • Okta SCIM Integration
  • use cases
    • Using an AWS S3 Store as a Source to Feed an MSK Topic
  • Reference
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST COMPUTE_POOLS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
        • START COMPUTE_POOL
        • STOP COMPUTE_POOL
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE COMPUTE_POOL
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP COMPUTE_POOL
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • START/STOP COMPUTE_POOL
        • UPDATE COMPUTE_POOL
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • DeltaStream OpenAPI
      • Deltastream
      • Models
Powered by GitBook
On this page
  • Setting up PostgreSQL
  • Prerequisites
  • Adding PostgreSQL as a DeltaStream Data Store
  • Inspect the PostgreSQL Data Store
  • Process PostgreSQL CDC Data and Sink to Kafka
  • Defining a DeltaStream Stream on a PostgreSQL Table
  • Write a CSAS (CREATE STREAM AS SELECT) Query to Sink Data into Kafka
  1. Integrations
  2. Setting up Data Store Integrations

PostgreSQL

PreviousIceberg REST CatalogNextSnowflake

Last updated 24 days ago

, or Postgres, is an open source relational database management system that uses and extends the SQL language. It is free to use, highly extensible, and tries to conform with the SQL standard.

This document walks you through setting up Postgres to use as a source data Data Store in DeltaStream.

Note In DeltaStream, in CDC pipelines you can use Postgres only as a source.

Setting up PostgreSQL

Prerequisites

  1. .

  2. Create a user in the PostgreSQL instance (see ).

Important If you're creating a CDC pipeline backed by a PostgreSQL source data store, .

Adding PostgreSQL as a DeltaStream Data Store

  1. Open DeltaStream. In the lefthand navigation, click Resources ( ). The Resources page displays, with the Data Stores tab active.

  2. Click + Add Data Store, and from the list that displays click PostgresSQL. The Add Data Store window displays, with Postgres-specific fields you must complete.

  3. Enter the following information:

    • Store Name – A name to identify your DeltaStream data store (See Data Store).

    • Store Type – POSTGRESQL.

    • URI – URI for the PostgreSQL database with /<database_name> appended. For example, given a postgres URI of my.postgresql.uri and an open port on the database of 5432, to connect DeltaStream to the demo database the URI would display as: postgresql://my.postgresql.uri:5432/demo

    • Username – Username associated with the PostgreSQL database user DeltaStream should assume.

    • Password – The password associated with the username.

  4. Click Add to create and save the data store.

Note For instructions on creating the store using DSQL, see CREATE STORE.

Inspect the PostgreSQL Data Store

  1. Click your PostgresSQL data store (in this case, Postgres_Test_Store). The Postgres data store page opens with the Schemas tab active. A list displays of the existing schemas in your PostgreSQL database.

  2. (Optional) Create a new schema. To do this:

    • Click + Add Schema. When the Add Schema window opens, enter a name for the new schema and then click Add. Your new schema displays in the entities list.

  3. To view the tables in a schema, click a schema name.

  4. To view a sample of rows from that table, click a table in a schema and then click Print.

Process PostgreSQL CDC Data and Sink to Kafka

Note For more details, see #adding-postgresql-as-a-deltastream-store.

Defining a DeltaStream Stream on a PostgreSQL Table

In this step, you create a stream called pageviews_cdc that is backed by data in a PostgreSQL table. This stream represents change data capture (CDC) events from the PostgreSQL table.

First, print the data for your source, which is the pageviews PostgreSQL table. To print sample rows from the table in DeltaStream, inspect your data store and navigate to the table you wish to print. (For more details, see #inspect-the-postgresql-store).

Below is an example of how to create a stream on your pageviews data. The fields match the Debezium standard; any insert, delete, or update to the pageviews table becomes an event for your pageviews_cdc stream.

CREATE STREAM pageviews_cdc(
  op VARCHAR,
  ts_ms BIGINT,
  `before` STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>, 
  `after`  STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>, 
  `source` STRUCT<db VARCHAR, `table` VARCHAR, `lsn` BIGINT>)
WITH (
  'store'='psql_store', 
  'value.format'='json',
  'postgresql.db.name'='demo',
  'postgresql.schema.name'='public',
  'postgresql.table.name'='pageviews');

Write a CSAS (CREATE STREAM AS SELECT) Query to Sink Data into Kafka

CREATE STREAM pageviews_cdc_sink WITH (
  'store' = 'kafka_store',
  'topic' = 'pageviews_cdc_sink',
  'topic.partitions' = 1,
  'topic.replicas' = 3) AS
SELECT
  *
FROM pageviews_cdc WITH ('postgresql.slot.name'='ds_cdc_demo')
WHERE op = 'c' OR op = 'u';
  1. Click Run.

  2. Verify that the query is properly working. To do this, write an interactive SELECT query.

In the lefthand navigation, click Resources ( ). This displays a list of the existing data stores.

To follow the next few steps, you must already have a PostgreSQL data store labeled psql_store. You also must have a Kafka data store labeled kafka_store. Define a DeltaStream as your source data from PostgreSQL. Then write a query to process this data and sink it to a Kafka topic.

Note DeltaStream uses to capture changes in a source relation table. To learn more about how CDC works with DeltaStream, see Change Data Capture (CDC).

In the lefthand navigation, click Workspace ( ).

In the SQL pane of your workspace, write the query to ingest from pageviews_cdc and output to a new stream labeled pageviews_cdc_sink. To represent a feed of upsert events, this query filters for records whose op field is CREATE or UPDATE.

In the lefthand navigation, click Queries ( ) to view existing queries, including the query from step 2, above. Important It can take a small amount of time for the query to transition into the Running state. Refresh you screen occasionally until you see the query transition into the Running state.

Debezium
CREATE STREAM AS SELECT (CSAS)
PostgreSQL
Have a PostgreSQL instance available
PostgreSQL documentation
stream
review these additional setup instructions
Postgres Store Details
Adding a Postgres data Store Schema
Postgres Schema Details
Verifying a Query in Postgres