PostgreSQL

PostgreSQL or Postgres is an open source relational database management system that uses and extends the SQL language. It is free to use, highly extensible, and tries to conform with the SQL standard.

This document will walk through setting up Postgres to be used as a source data Store in DeltaStream.

Setting up PostgreSQL

Prerequisites

  1. Create a user in the PostgreSQL instance (see PostgreSQL docs)

  2. Ensure the PostgreSQL instance uses logical decoding with the write-ahead log (WAL). This means that wal_level should be set to logical in the source relational database (see PostgreSQL ALTER SYSTEM docs).

Working with PostgreSQL replication slots

Each DeltaStream query that reads rows from a PostgreSQL table needs to specify a replication slot. Replication slots in PostgreSQL represent a stream of changes that can be replayed to a client in the order they were made on the original server. Each slot is uniquely named across all databases in a PostgreSQL instance, persists independently from the clients using them, is crash proof, and contains its own independent state. In DeltaStream, a single replication slot cannot be assigned to multiple simultaneously running queries. By default, PostgreSQL instances limit the number of replication slots to 10 (see PostgreSQL docs).

In the PostgreSQL CLI psql you can list, create, or drop replication slots with the following queries:

// Check list of existing replication slots
SELECT * FROM pg_replication_slots;

// Create a new logical replication slot
SELECT pg_create_logical_replication_slot('ds_cdc', 'pgoutput');

// Delete an existing replication slot
// Note: For logical slots, this must be called while connected to the same database the slot was created on.
SELECT pg_drop_replication_slot('ds_cdc');

Check PostgreSQL docs on replication slots here and here

Adding PostgreSQL as a DeltaStream Store

  1. Navigate to the DeltaStream web console

  2. Select "Stores" in the left panel then select "+ New Store" in the top right

  3. Enter the following information

    • Store Name – A name to identify your DeltaStream Store (See Store)

    • Store Type – POSTGRESQL

    • Access Region – DeltaStream access region to associate with the Store (See Region)

    • URL – URL for the PostgreSQL database with /<database_name> appended For example, given a postgres URI of my.postgresql.uri and an open port on the Database of 5432, to connect DeltaStream to the "demo" database the URI would look like the following: postgresql://my.postgresql.uri:5432/demo

    • Username – Username associated with the user in the PostgreSQL database that DeltaStream should assume

    • Password – The password associated with the username

  4. Click "Save" to create the Store

For instructions on creating the Store using DSQL, see CREATE STORE.

Inspect the PostgreSQL store

  1. Navigate to the "Stores" tab in the left side panel. This will display a list of the existing Stores.

  2. Select the Store called "psql_store" then select "Entities". This will bring up a list of the existing schemas in your PostgreSQL database.

  3. (Optional) Create a new schema

    1. Click on the 3 vertical dots next to your store name and select "Create Entity"

    2. In the popup, enter a name for the new schema and select "Create". You should now be able to see the new schema in the entities list.

  4. Click on a schema name to see the tables that exist under that schema

  5. Click on a table in a schema and select "Print" to print out a sample of rows from that table

Process PostgreSQL CDC data and sink to Kafka

For the following steps let's assume that there is a PostgreSQL Store called 'psql_store' (See Adding PostgreSQL as a DeltaStream Store) and a Kafka Store called 'kafka_store'. We will first define a DeltaStream Stream as our source data from PostgreSQL, then we will write a query to process this data and sink it to a Kafka topic.

Defining a DeltaStream Stream on a PostgreSQL Table

In this step we'll be creating a Stream called pageviews_cdc that is backed by data in a PostgreSQL table. This Stream will represent change data capture (CDC) events from the PostgreSQL table. DeltaStream uses Debezium to capture changes in a source Relation table. To learn more about how CDC works with DeltaStream, see Change Data Capture (CDC).

First, let's print the data for our source, which will be the pageviews PostgreSQL table. You can print some sample rows from the table in DeltaStream by inspecting your Store and navigating to the table you want to print (see Inspect the PostgreSQL store).

Below, we can see how to create a Stream on our pageviews data. The fields match the Debezium standard, so any insert, delete, or update to the pageviews table will become an event for our pageviews_cdc Stream.

CREATE STREAM pageviews_cdc(
  op VARCHAR,
  ts_ms BIGINT,
  `before` STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>, 
  `after`  STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>, 
  `source` STRUCT<db VARCHAR, `table` VARCHAR, `lsn` BIGINT>)
WITH (
  'store'='psql_store', 
  'value.format'='json',
  'postgresql.db.name'='demo',
  'postgresql.schema.name'='public',
  'postgresql.table.name'='pageviews');

Write a CSAS query to sink data into Kafka

  1. Navigate to the "SQL Editor" tab in the left side panel

  2. In the SQL editor box, write the CREATE STREAM AS SELECT (CSAS) query to ingest from pageviews_cdc and output to a new Stream called pageviews_cdc_sink. In this query, we are filtering for records whose op field is CREATE or UPDATE to represent a feed of upsert events.

CREATE STREAM pageviews_cdc_sink WITH (
  'store' = 'kafka_store',
  'topic' = 'pageviews_cdc_sink',
  'topic.partitions' = 1,
  'topic.replicas' = 3) AS
SELECT
  *
FROM pageviews_cdc WITH ('postgresql.slot.name'='ds_cdc_demo')
WHERE op = 'c' OR op = 'u';
  1. Select "Run"

  2. Navigate to "Queries" tab in the left side panel to see the existing queries, including the query from step 2. It takes a little bit of time for the query to transition into the 'Running' state.

  3. Refresh until you see the query is in the 'Running' state

  4. We can verify that the query is properly working by writing an interactive SELECT query

Last updated