# PostgreSQL

[PostgreSQL](https://www.postgresql.org/about/), or Postgres, is an open source relational database management system that uses and extends the SQL language. It is free to use, highly extensible, and tries to conform with the SQL standard.

This document walks you through setting up Postgres to use as a source data [store](https://docs.deltastream.io/overview/core-concepts/store "mention") in DeltaStream.

{% hint style="info" %}
**Note** In DeltaStream, in CDC pipelines you can use Postgres only as a source.
{% endhint %}

## Setting up PostgreSQL

### Prerequisites

1. [Have a PostgreSQL instance available](https://www.postgresql.org/download/).
2. Create a user in the PostgreSQL instance (see [PostgreSQL documentation](https://www.postgresql.org/docs/8.0/sql-createuser.html)).

{% hint style="warning" %}
**Important** If you're creating a CDC pipeline backed by a PostgreSQL source data store, [review these additional setup instructions](https://docs.deltastream.io/reference/sql-syntax/query/change-data-capture-cdc/postgresql#requirements-for-a-postgresql-source-store).
{% endhint %}

## Adding PostgreSQL as a DeltaStream Data Store

1. Open DeltaStream. In the lefthand navigation, click **Resources** ( ![](https://1288764042-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2Fdbd9e6ZJodkgF1H6AVay%2Fuploads%2F9gG1xxfNSfFRjO6aS5Ou%2F0.png?alt=media) ). The **Resources** page displays, with the **Data Stores** tab active.<br>

   <div align="center"><figure><img src="https://1288764042-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2Fdbd9e6ZJodkgF1H6AVay%2Fuploads%2FvU1DotsaOaJ0sQZpLHM3%2FStoreList3Stores.png?alt=media&#x26;token=bbf809f6-6e31-48fb-adda-0404ad6f3c91" alt=""><figcaption></figcaption></figure></div>
2. Click **+ Add Data Store**, and from the list that displays click **PostgresSQL**. The **Add Data Store** window displays, with Postgres-specific fields you must complete.<br>

   <div align="center"><figure><img src="https://1288764042-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2Fdbd9e6ZJodkgF1H6AVay%2Fuploads%2Fv68znRD6toUj5xhijW5H%2FAddPostgresDataStore.png?alt=media&#x26;token=27588b3c-2e11-4ec3-a121-01ff618490a3" alt="" width="460"><figcaption><p>Postgres Store Details</p></figcaption></figure></div>
3. Enter the following information:
   * **Store Name** – A name to identify your DeltaStream data store (See [store](https://docs.deltastream.io/overview/core-concepts/store "mention")).
   * **Store Type** – POSTGRESQL.
   * **URI** – URI for the PostgreSQL database with `/<database_name>` appended.\
     For example, given a postgres URI of `my.postgresql.uri` and an open port on the database of `5432`, to connect DeltaStream to the **demo** database the URI would display as:\
     `postgresql://my.postgresql.uri:5432/demo`
   * **Username** – Username associated with the PostgreSQL database user DeltaStream should assume.
   * **Password** – The password associated with the username.
4. Click **Add** to create and save the data store.

{% hint style="info" %}
**Note** For instructions on creating the store using DSQL, *see* [create-store](https://docs.deltastream.io/reference/sql-syntax/ddl/create-store "mention").
{% endhint %}

### Inspect the PostgreSQL Data Store

1. In the lefthand navigation, click **Resources** ( ![](https://1288764042-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2Fdbd9e6ZJodkgF1H6AVay%2Fuploads%2F9gG1xxfNSfFRjO6aS5Ou%2F0.png?alt=media) ). This displays a list of the existing data stores.<br>

   <div align="center"><figure><img src="https://1288764042-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2Fdbd9e6ZJodkgF1H6AVay%2Fuploads%2FvU1DotsaOaJ0sQZpLHM3%2FStoreList3Stores.png?alt=media&#x26;token=bbf809f6-6e31-48fb-adda-0404ad6f3c91" alt="" width="563"><figcaption></figcaption></figure></div>
2. Click your PostgresSQL data store (in this case, **Postgres\_Test\_Store**). The Postgres data store page opens with the **Schemas** tab active. A list displays of the existing schemas in your PostgreSQL database.<br>

   <div align="center"><figure><img src="https://1288764042-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2Fdbd9e6ZJodkgF1H6AVay%2Fuploads%2Fgit-blob-3a77a547500f8e93d1bdc16c0400a0b1e36d73b1%2FPostgresTestStorePage.png?alt=media" alt="" width="375"><figcaption></figcaption></figure></div>
3. (Optional) Create a new schema. To do this:
   * Click **+ Add Schema.** When the **Add Schema** window opens, enter a name for the new schema and then click **Add**. Your new schema displays in the entities list.<br>

     <div align="center"><figure><img src="https://1288764042-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2Fdbd9e6ZJodkgF1H6AVay%2Fuploads%2Fgit-blob-aa6ea964ba639ab2b589f734def1b4918a724ae1%2FNewPostgresTestStoreSchema.png?alt=media" alt="" width="375"><figcaption><p>Adding a Postgres data Store Schema</p></figcaption></figure></div>
4. To view the tables in a schema, click a schema name.
5. To view a sample of rows from that table, click a table in a schema and then click **Print**.<br>

   <div align="center"><figure><img src="https://1288764042-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2Fdbd9e6ZJodkgF1H6AVay%2Fuploads%2Fgit-blob-12da751e2c65adb90c92703c93defc3e0dc3898b%2FPostgresSchemaDetails.png?alt=media" alt="" width="375"><figcaption><p>Postgres Schema Details</p></figcaption></figure></div>

## Process PostgreSQL CDC Data and Sink to Kafka

To follow the next few steps, you must already have a PostgreSQL data store labeled `psql_store`. You also must have a Kafka data store labeled `kafka_store.` Define a DeltaStream [stream](https://docs.deltastream.io/overview/core-concepts/databases#stream) as your source data from PostgreSQL. Then write a query to process this data and sink it to a Kafka topic.

{% hint style="info" %}
**Note** For more details, see [#adding-postgresql-as-a-deltastream-store](#adding-postgresql-as-a-deltastream-store "mention").
{% endhint %}

### Defining a DeltaStream Stream on a PostgreSQL Table

In this step, you create a stream called `pageviews_cdc` that is backed by data in a PostgreSQL table. This stream represents change data capture (CDC) events from the PostgreSQL table.

{% hint style="info" %}
**Note** DeltaStream uses [Debezium](https://debezium.io/) to capture changes in a source relation table. To learn more about how CDC works with DeltaStream, see [postgresql](https://docs.deltastream.io/reference/sql-syntax/query/change-data-capture-cdc/postgresql "mention").
{% endhint %}

First, print the data for your source, which is the `pageviews` PostgreSQL table. To print sample rows from the table in DeltaStream, inspect your data store and navigate to the table you wish to print. (For more details, see [#inspect-the-postgresql-store](#inspect-the-postgresql-store "mention")).

Below is an example of how to create a stream on your `pageviews` data. The fields match the Debezium standard; any insert, delete, or update to the `pageviews` table becomes an event for your `pageviews_cdc` stream.

```sql
CREATE STREAM pageviews_cdc(
  op VARCHAR,
  ts_ms BIGINT,
  `before` STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>, 
  `after`  STRUCT<viewtime BIGINT, userid VARCHAR, pageid VARCHAR>, 
  `source` STRUCT<db VARCHAR, `table` VARCHAR, `lsn` BIGINT>)
WITH (
  'store'='psql_store', 
  'value.format'='json',
  'postgresql.db.name'='demo',
  'postgresql.schema.name'='public',
  'postgresql.table.name'='pageviews');
```

### Write a CSAS (CREATE STREAM AS SELECT) Query to Sink Data into Kafka

1. In the lefthand navigation, click **Workspace** ( ![](https://1288764042-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2Fdbd9e6ZJodkgF1H6AVay%2Fuploads%2F5EudXKVDSWZLByUuGbyy%2F5.png?alt=media) ).
2. In the SQL pane of your workspace, write the [CREATE STREAM AS SELECT (CSAS)](https://docs.deltastream.io/reference/sql-syntax/query/create-stream-as) query to ingest from `pageviews_cdc` and output to a new stream labeled `pageviews_cdc_sink`. To represent a feed of upsert events, this query filters for records whose `op` field is `CREATE` or `UPDATE`.

```sql
CREATE STREAM pageviews_cdc_sink WITH (
  'store' = 'kafka_store',
  'topic' = 'pageviews_cdc_sink',
  'topic.partitions' = 1,
  'topic.replicas' = 3) AS
SELECT
  *
FROM pageviews_cdc WITH ('postgresql.slot.name'='ds_cdc_demo')
WHERE op = 'c' OR op = 'u';
```

3. Click **Run**.
4. In the lefthand navigation, click **Queries** ( ![](https://1288764042-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2Fdbd9e6ZJodkgF1H6AVay%2Fuploads%2FoeV9owwjCxALYll5NfSJ%2F6.png?alt=media) ) to view existing queries, including the query from step 2, above.\
   \
   **Important** It can take a small amount of time for the query to transition into the **Running** state. Refresh you screen occasionally until you see the query transition into the **Running** state.
5. Verify that the query is properly working. To do this, write an interactive [select](https://docs.deltastream.io/reference/sql-syntax/query/select "mention") query.

<figure><img src="https://1288764042-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2Fdbd9e6ZJodkgF1H6AVay%2Fuploads%2Fgit-blob-c1f5b4ab4b077923fbc4600af87e53099dbb4c68%2FPostgresQuery.png?alt=media" alt="" width="375"><figcaption><p>Verifying a Query in Postgres</p></figcaption></figure>
