Comment on page
Setting up and Integrating PostgreSQL with your Organization
PostgreSQL or Postgres is an open source relational database management system that uses and extends the SQL language. It is free to use, highly extensible, and tries to conform with the SQL standard.
- 3.Ensure the PostgreSQL instance uses logical decoding with the write-ahead log (WAL). This means that
wal_levelshould be set to
logicalin the source relational database (see PostgreSQL ALTER SYSTEM docs).
Each DeltaStream query that reads rows from a PostgreSQL table needs to specify a replication slot. Replication slots in PostgreSQL represent a stream of changes that can be replayed to a client in the order they were made on the original server. Each slot is uniquely named across all databases in a PostgreSQL instance, persists independently from the clients using them, is crash proof, and contains its own independent state. In DeltaStream, a single replication slot cannot be assigned to multiple simultaneously running queries. By default, PostgreSQL instances limit the number of replication slots to 10 (see PostgreSQL docs).
In the PostgreSQL CLI
psqlyou can list, create, or drop replication slots with the following queries:
// Check list of existing replication slots
SELECT * FROM pg_replication_slots;
// Create a new logical replication slot
SELECT pg_create_logical_replication_slot('ds_cdc', 'pgoutput');
// Delete an existing replication slot
// Note: For logical slots, this must be called while connected to the same database the slot was created on.
- 2.Select "Stores" in the left panel then select "+ New Store" in the top right
- 3.Enter the following information
- Store Type – POSTGRESQL
- URL – URL for the PostgreSQL database with
/<database_name>appended For example, given a postgres URI of
my.postgresql.uriand an open port on the Database of
5432, to connect DeltaStream to the "demo" database the URI would look like the following:
- Username – Username associated with the user in the PostgreSQL database that DeltaStream should assume
- Password – The password associated with the username
- 4.Click "Save" to create the Store
- 1.Navigate to the "Stores" tab in the left side panel. This will display a list of the existing Stores.
- 2.Select the Store called "psql_store" then select "Entities". This will bring up a list of the existing schemas in your PostgreSQL database.
- 3.(Optional) Create a new schema
- 1.Click on the 3 vertical dots next to your store name and select "Create Entity"
- 2.In the popup, enter a name for the new schema and select "Create". You should now be able to see the new schema in the entities list.
- 4.Click on a schema name to see the tables that exist under that schema
- 5.Click on a table in a schema and select "Print" to print out a sample of rows from that table
For the following steps let's assume that there is a PostgreSQL Store called 'psql_store' (See Adding PostgreSQL as a DeltaStream Store) and a Kafka Store called 'kafka_store'. We will first define a DeltaStream Stream as our source data from PostgreSQL, then we will write a query to process this data and sink it to a Kafka topic.
In this step we'll be creating a Stream called
pageviews_cdcthat is backed by data in a PostgreSQL table. This Stream will represent change data capture (CDC) events from the PostgreSQL table. DeltaStream uses Debezium to capture changes in a source Relation table. To learn more about how CDC works with DeltaStream, see Change Data Capture (CDC).
First, let's print the data for our source, which will be the
pageviewsPostgreSQL table. You can print some sample rows from the table in DeltaStream by inspecting your Store and navigating to the table you want to print (see Inspect the PostgreSQL store).
Below, we can see how to create a Stream on our
pageviewsdata. The fields match the Debezium standard, so any insert, delete, or update to the
pageviewstable will become an event for our
CREATE STREAM pageviews_cdc(
`before` STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>,
`after` STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>,
`source` STRUCT<db VARCHAR, `table` VARCHAR, `lsn` BIGINT>)
- 1.Navigate to the "SQL" tab in the left side panel
- 2.In the SQL editor box, write the CREATE STREAM AS SELECT (CSAS) query to ingest from
pageviews_cdcand output to a new Stream called
pageviews_cdc_sink. In this query, we are filtering for records whose
UPDATEto represent a feed of upsert events.
CREATE STREAM pageviews_cdc_sink WITH (
'store' = 'kafka_store',
'topic' = 'pageviews_cdc_sink',
'topic.partitions' = 1,
'topic.replicas' = 3) AS
FROM pageviews_cdc WITH ('postgresql.slot.name'='ds_cdc_demo')
WHERE op = 'c' OR op = 'u';
- 3.Select "Run"
- 4.Navigate to "Queries" tab in the left side panel to see the existing queries, including the query from step 2. It takes a little bit of time for the query to transition into the 'Running' state.
- 5.Refresh until you see the query is in the 'Running' state