PostgreSQL
PostgreSQL or Postgres is an open source relational database management system that uses and extends the SQL language. It is free to use, highly extensible, and tries to conform with the SQL standard.
This document will walk through setting up Postgres to be used as a source data Store in DeltaStream.
Setting up PostgreSQL
Prerequisites
Create a user in the PostgreSQL instance (see PostgreSQL docs)
Ensure the PostgreSQL instance uses logical decoding with the write-ahead log (WAL). This means that
wal_level
should be set tological
in the source relational database (see PostgreSQL ALTER SYSTEM docs).
Working with PostgreSQL replication slots
Each DeltaStream query that reads rows from a PostgreSQL table needs to specify a replication slot. Replication slots in PostgreSQL represent a stream of changes that can be replayed to a client in the order they were made on the original server. Each slot is uniquely named across all databases in a PostgreSQL instance, persists independently from the clients using them, is crash proof, and contains its own independent state. In DeltaStream, a single replication slot cannot be assigned to multiple simultaneously running queries. By default, PostgreSQL instances limit the number of replication slots to 10 (see PostgreSQL docs).
In the PostgreSQL CLI psql
you can list, create, or drop replication slots with the following queries:
Check PostgreSQL docs on replication slots here and here
Adding PostgreSQL as a DeltaStream Store
Navigate to the DeltaStream web console
Select "Stores" in the left panel then select "+ New Store" in the top right
Enter the following information
Store Name – A name to identify your DeltaStream Store (See Store)
Store Type – POSTGRESQL
Access Region – DeltaStream access region to associate with the Store (See Region)
URL – URL for the PostgreSQL database with
/<database_name>
appended For example, given a postgres URI ofmy.postgresql.uri
and an open port on the Database of5432
, to connect DeltaStream to the "demo" database the URI would look like the following:postgresql://my.postgresql.uri:5432/demo
Username – Username associated with the user in the PostgreSQL database that DeltaStream should assume
Password – The password associated with the username
Click "Save" to create the Store
For instructions on creating the Store using DSQL, see CREATE STORE.
Inspect the PostgreSQL store
Navigate to the "Stores" tab in the left side panel. This will display a list of the existing Stores.
Select the Store called "psql_store" then select "Entities". This will bring up a list of the existing schemas in your PostgreSQL database.
(Optional) Create a new schema
Click on the 3 vertical dots next to your store name and select "Create Entity"
In the popup, enter a name for the new schema and select "Create". You should now be able to see the new schema in the entities list.
Click on a schema name to see the tables that exist under that schema
Click on a table in a schema and select "Print" to print out a sample of rows from that table
Process PostgreSQL CDC data and sink to Kafka
For the following steps let's assume that there is a PostgreSQL Store called 'psql_store' (See Adding PostgreSQL as a DeltaStream Store) and a Kafka Store called 'kafka_store'. We will first define a DeltaStream Stream as our source data from PostgreSQL, then we will write a query to process this data and sink it to a Kafka topic.
Defining a DeltaStream Stream on a PostgreSQL Table
In this step we'll be creating a Stream called pageviews_cdc
that is backed by data in a PostgreSQL table. This Stream will represent change data capture (CDC) events from the PostgreSQL table. DeltaStream uses Debezium to capture changes in a source Relation table. To learn more about how CDC works with DeltaStream, see Change Data Capture (CDC).
First, let's print the data for our source, which will be the pageviews
PostgreSQL table. You can print some sample rows from the table in DeltaStream by inspecting your Store and navigating to the table you want to print (see Inspect the PostgreSQL store).
Below, we can see how to create a Stream on our pageviews
data. The fields match the Debezium standard, so any insert, delete, or update to the pageviews
table will become an event for our pageviews_cdc
Stream.
Write a CSAS query to sink data into Kafka
Navigate to the "SQL Editor" tab in the left side panel
In the SQL editor box, write the CREATE STREAM AS SELECT (CSAS) query to ingest from
pageviews_cdc
and output to a new Stream calledpageviews_cdc_sink
. In this query, we are filtering for records whoseop
field isCREATE
orUPDATE
to represent a feed of upsert events.
Select "Run"
Navigate to "Queries" tab in the left side panel to see the existing queries, including the query from step 2. It takes a little bit of time for the query to transition into the 'Running' state.
Refresh until you see the query is in the 'Running' state
We can verify that the query is properly working by writing an interactive SELECT query
Last updated