PostgreSQL

PostgreSQL, or Postgres, is an open source relational database management system that uses and extends the SQL language. It is free to use, highly extensible, and tries to conform with the SQL standard.

This document walks you through setting up Postgres to use as a source data Store in DeltaStream.

Setting up PostgreSQL

Prerequisites

  1. Create a user in the PostgreSQL instance (see PostgreSQL documentation).

  2. Ensure the PostgreSQL instance uses logical decoding with the write-ahead log (WAL). This means that you should set wal_level to logical in the source relational database (see PostgreSQL ALTER SYSTEM docs).

Working with PostgreSQL replication slots

Each DeltaStream query that reads rows from a PostgreSQL table must specify a replication slot. Replication slots in PostgreSQL represent a stream of changes that can be replayed to a client in the order in which they were made on the original server. Each slot has the following characteristics:

  • uniquely named across all databases in a PostgreSQL instance

  • persists independently from the clients using it

  • crash proof

  • contains its own independent state

You cannot assign single replication slot in DeltaStream to multiple simultaneously running queries. By default, PostgreSQL instances limit the number of replication slots to 10 (see PostgreSQL documentation).

In the PostgreSQL CLI psql you can list, create, or drop replication slots with the following queries:

// Check list of existing replication slots
SELECT * FROM pg_replication_slots;

// Create a new logical replication slot
SELECT pg_create_logical_replication_slot('ds_cdc', 'pgoutput');

// Delete an existing replication slot
// Note: For logical slots, this must be called while connected to the same database the slot was created on.
SELECT pg_drop_replication_slot('ds_cdc');

Check PostgreSQL docs on replication slots here and here.

Adding PostgreSQL as a DeltaStream Store

  1. Click Add Store +, and from the list that displays click PostgresSQL. The Add Store window displays, with Postgres-specific fields you must complete.

  2. Enter the following information:

    • Store Name – A name to identify your DeltaStream store (See Store).

    • Store Type – POSTGRESQL.

    • Access Region – DeltaStream access region to associate with the store (see Region).

    • URI – URI for the PostgreSQL database with /<database_name> appended. For example, given a postgres URI of my.postgresql.uri and an open port on the database of 5432, to connect DeltaStream to the demo database the URI would display as: postgresql://my.postgresql.uri:5432/demo

    • Username – Username associated with the PostgreSQL database user DeltaStream should assume.

    • Password – The password associated with the username.

  3. Click Add to create and save the store.

Note For instructions on creating the store using DSQL, see CREATE STORE.

Inspect the PostgreSQL Store

  1. Click your PostgresSQL store (in this case, Postgres_Test_Store). The Postgres store page opens with the Schemas tab active. A list displays of the existing schemas in your PostgreSQL database.

  2. (Optional) Create a new schema To do this:

    • Click + Add Schema. When the Add Schema window opens, enter a name for the new schema and then click Add. Your new schema displays in the entities list.

  3. To view the tables in a schema, click on a schema name.

  4. To view a sample of rows from that table, click a table in a schema and then click Print.

Process PostgreSQL CDC Data and Sink to Kafka

To follow the next few steps, you must already have a PostgreSQL store labeled psql_store. You also must have a Kafka store labeled kafka_store. Define a DeltaStream stream as your source data from PostgreSQL. Then write a query to process this data and sink it to a Kafka topic.

Defining a DeltaStream Stream on a PostgreSQL Table

In this step, you create a stream called pageviews_cdc that is backed by data in a PostgreSQL table. This stream represents change data capture (CDC) events from the PostgreSQL table.

Note DeltaStream uses Debezium to capture changes in a source relation table. To learn more about how CDC works with DeltaStream, see Change Data Capture (CDC).

First, print the data for your source, which is the pageviews PostgreSQL table. To print sample rows from the table in DeltaStream, inspect your store and navigate to the table you wish to print. (For more details, see Inspect the PostgreSQL Store).

Below is an example of how to create a stream on your pageviews data. The fields match the Debezium standard; any insert, delete, or update to the pageviews table becomes an event for your pageviews_cdc stream.

CREATE STREAM pageviews_cdc(
  op VARCHAR,
  ts_ms BIGINT,
  `before` STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>, 
  `after`  STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>, 
  `source` STRUCT<db VARCHAR, `table` VARCHAR, `lsn` BIGINT>)
WITH (
  'store'='psql_store', 
  'value.format'='json',
  'postgresql.db.name'='demo',
  'postgresql.schema.name'='public',
  'postgresql.table.name'='pageviews');

Write a CSAS (Create Stream As Stream) Query to Sink Data into Kafka

  1. In the SQL pane of your workspace, write the CREATE STREAM AS SELECT (CSAS) query to ingest from pageviews_cdc and output to a new stream labeled pageviews_cdc_sink. To represent a feed of upsert events, this query filters for records whose op field is CREATE or UPDATE.

CREATE STREAM pageviews_cdc_sink WITH (
  'store' = 'kafka_store',
  'topic' = 'pageviews_cdc_sink',
  'topic.partitions' = 1,
  'topic.replicas' = 3) AS
SELECT
  *
FROM pageviews_cdc WITH ('postgresql.slot.name'='ds_cdc_demo')
WHERE op = 'c' OR op = 'u';
  1. Click Run.

  2. Verify that the query is properly working by writing an interactive SELECT query.

Last updated