# CREATE CHANGELOG

## Syntax <a href="#synopsis" id="synopsis"></a>

```sql
CREATE CHANGELOG changelog_name (
   column_name data_type [NOT NULL] [, ...],
   PRIMARY KEY (column_name [, ...])
) WITH (changelog_parameter = value [, ...]);
```

```sql
CREATE CHANGELOG changelog_name WITH (changelog_parameter = value [, ...]);
```

## Description <a href="#description" id="description"></a>

A [changelog](https://docs.deltastream.io/overview/core-concepts/databases#_changelog) is a sequence of partitioned and partially-ordered events. It's a relational representation of data in the streaming [stores](https://docs.deltastream.io/overview/core-concepts/store), such as the data in a Apache Kafka topic or an Amazon Kinesis stream.

{% hint style="info" %}
**Note**   DeltaStream uses the terms **events** and **records** synonymously.
{% endhint %}

A changelog defines a `PRIMARY KEY` for records that is used to represent the change over time for records with the same `PRIMARY KEY`. Records in a changelog correlate with each other based on the `PRIMARY KEY`. This means a record in a changelog is either an insert record or an upsert record.

* It's an insert record if it’s the first time the record with the given `PRIMARY KEY` is appended to the changelog
* It's an upsert record if a previous record with the same `PRIMARY KEY` has been inserted into the changelog.

In Deltastream a changelog is a type of [relation](https://docs.deltastream.io/overview/core-concepts/databases#_relation). Each relation belongs to a [schema](https://docs.deltastream.io/overview/core-concepts/databases#_schema) in a [database](https://docs.deltastream.io/overview/core-concepts/databases), so the fully-qualified name of the relation would be `<database>.<schema>.<relation>`.

### Alternative syntax

If data is in Protobuf or Avro format, alternative syntax can be used, assuming the source topic has a [Schema](https://docs.deltastream.io/reference/sql-syntax/ddl/create-schema_registry) or a [Descriptor](https://docs.deltastream.io/reference/sql-syntax/ddl/create-descriptor_source) associated to it and it will create a Changelog definition which includes ALL columns.  The user must provide the `value.format`.

### Arguments <a href="#parameters" id="parameters"></a>

#### changelog\_name

This specifies the name of the new changelog. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lowercase name.

#### column\_name

This is the name of a column to be created in the new changelog. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lowercase name.

#### data\_type

This refers to the data type of the column, which can include array specifiers. For more information on the data types supported by DeltaStream, [refer to the data types page](https://docs.deltastream.io/reference/sql-syntax/data-types).

#### NOT NULL

Defines a constraint on the column, ensuring it cannot contain `NULL` values.

#### PRIMARY KEY (column\_name \[, …​])

The `PRIMARY KEY` constraint specifies that column(s) of a table can contain only unique (non-duplicate), non-null values.

#### WITH (changelog\_parameter = value \[, …​ ])

This clause specifies [#changelog\_parameters](#changelog_parameters "mention").

### Upsert Mode Changelog <a href="#changelog_parameters" id="changelog_parameters"></a>

An upsert Changelog is a Changelog that uses its PRIMARY KEY to determine how records are serialized into/from the underlying Kafka topic's key. Upsert mode is only supported on Kafka stores. When `enable.upsert.mode` is set to `true`, DeltaStream automatically derives the key definition from the Changelog PRIMARY KEY columns. The `key.columns` and `key.type` properties are inferred and must not be set explicitly. If `key.format` is not provided, it defaults to the `value.format`.\
This ensures that records with the same primary key are routed to the same Kafka partition and that downstream consumers can interpret each record as an insert or an update based on the key.

In an upsert Changelog, each record written to the underlying Kafka topic carries a key derived from the Changelog's PRIMARY KEY. The key determines how downstream consumers interpret the record:

* Normal record: A record whose value is non-null.
* Tombstone record: A record whose value is null. A tombstone signals that the entry for that key has been deleted. Downstream consumers and compacted Kafka topics use tombstones to remove the corresponding key from their materialized state.

### Changelog Parameters <a href="#changelog_parameters" id="changelog_parameters"></a>

| Parameter Name     | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| ------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `topic`            | <p>Name of the <a data-mention href="../../../../overview/core-concepts/store#entity">#entity</a> that has the data for this changelog. If the entity doesn’t exist, an entity with this name is created in the corresponding <code>store</code>.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> Lowercase <code>changelog\_name</code><br><strong>Type:</strong> String</p>                                                                                                                                                                                             |
| `store`            | <p>Name of the store that hosts the entity for this changelog.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> User’s default store name<br><strong>Type:</strong> String<br><strong>Valid values:</strong> See <a data-mention href="../command/list-stores">list-stores</a>.</p>                                                                                                                                                                                                                                                                                        |
| `value.format`     | <p>Format of message value in the <a data-mention href="../../../../overview/core-concepts/store#entity">#entity</a>. See <a data-mention href="../data-format-serialization">data-format-serialization</a> for more information regarding serialization formats.<br></p><p><strong>Required:</strong> Yes<br><strong>Type:</strong> String<br><strong>Valid values:</strong> <code>JSON</code>, <code>AVRO</code>, <code>PROTOBUF</code>, <code>PRIMITIVE</code></p><p><strong>Valid values without column definition:</strong>  <code>AVRO</code>, <code>PROTOBUF</code></p>                       |
| `timestamp`        | <p>Name of the column in the changelog to use as the timestamp. If not set, the timestamp of the message is used for time-based operations such as window aggregations and joins. If the type of this timestamp field is <code>BIGINT</code>, DeltaStream expects the values in epoch milliseconds UTC.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> Record’s timestamp<br><strong>Type:</strong> String<br><strong>Valid values:</strong> Must be of type <code>BIGINT</code> or <code>TIMESTAMP</code>. See <a data-mention href="../data-types">data-types</a>.</p> |
| `timestamp.format` | <p>The format to use for <code>TIMESTAMP</code> typed fields. See <a data-mention href="../data-types">data-types</a>.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> <code>sql</code><br><strong>Type:</strong> String<br><strong>Valid values:</strong> <code>sql</code>, <code>iso8601</code></p>                                                                                                                                                                                                                                                                     |

### **Kafka-Specific Parameters**

Parameters to be used if the associated [store](https://docs.deltastream.io/overview/core-concepts/store "mention") is type `KAFKA`:

| Parameter Name          | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| ----------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `topic.partitions`      | <p>The number of partitions to use when creating the entity, if applicable. If the entity already exists, then this value must be equal to the number of partitions in the existing Kafka entity.<br></p><p><strong>Required:</strong> Yes, unless entity already exists<br><strong>Default value:</strong> Leftmost source relation Entity’s partition count<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| `topic.replicas`        | <p>The number of replicas to use when creating the entity, if applicable. If the entity already exists, then this value must be equal to the number of replicas in the existing Kafka entity.<br></p><p><strong>Required:</strong> Yes, unless entity already exists<br><strong>Default values:</strong> Leftmost source relation Entity's replica count<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| `kafka.topic.*`         | <p>A configuration specific for the topic being created — for example, <a href="https://kafka.apache.org/documentation/">Kafka Entity Configuration for Confluent Platform</a>.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> Kafka topic configuration specific to the underlying <a data-mention href="../../../overview/core-concepts/store">store</a> type.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| `key.format`            | <p>Format of the message key in the <a data-mention href="../../../../overview/core-concepts/store#entity">#entity</a>. This value can be the same as or different from the <code>value.format</code>. See <a data-mention href="../data-format-serialization">data-format-serialization</a> for more information regarding serialization formats.<br></p><p><strong>Required:</strong> No, unless <code>key.type</code> is set<br><strong>Default value:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> <code>JSON</code>, <code>AVRO</code>, <code>PROTOBUF</code>, <code>PRIMITIVE</code></p><p><strong>Valid values without column definition:</strong>  <code>AVRO</code>, <code>PROTOBUF</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| `key.type`              | <p>Declares the names and data types of key columns. The type is a <code>STRUCT</code> when <code>key.format</code> is a non-primitive value — for example,<code>'key.type'='STRUCT\<id BIGINT, name VARCHAR>'</code>. For primitive values, the type is one of the <a data-mention href="../../data-types#primitive-data-types">#primitive-data-types</a> — for example,<code>'key.type'='VARCHAR'</code>.<br></p><p><strong>Required:</strong> No, unless<br><strong>Default value:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> See <code>STRUCT</code> in <a data-mention href="../data-types">data-types</a>.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| `key.columns`           | <p>Specifies the name(s) of the value columns, separated by commas, that are used to construct the record key.</p><ul><li>For a non-primitive <code>key.format</code>, the record key is created as a <code>STRUCT</code> whose fields are the columns listed in this property.</li><li>For a primitive <code>key.format</code>, this property must contain exactly one column with a primitive data type.</li><li><strong>Note:</strong> <code>key.columns</code> cannot be set if <code>key.type</code> is already defined.<br></li></ul><p><strong>Required:</strong> No<br><strong>Default:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> One or more valid column names from the relation’s column list, separated by commas.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
| `value.columns.exclude` | <p>Specifies the name(s) of the columns, separated by commas, that should be excluded from the record’s value and included only in its key.</p><ul><li>You can only set this property if <code>key.columns</code> is already defined.</li><li>The excluded columns must appear at the end of the object’s column list and must also be listed in <code>key.columns</code>.</li></ul><p><br><strong>Required:</strong> No<br><strong>Default:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> One or more valid column names from the relation’s column list, separated by commas.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| `delivery.guarantee`    | <p>The fault tolerance guarantees applied when producing to this changelog.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> <code>at\_least\_once</code><br><strong>Type:</strong> String<br><strong>Valid values:</strong></p><ul><li><code>exactly\_once</code>: Produces to the changelog using Kafka transactions. These transactions are committed when the query takes a checkpoint. On the consumer side, when setting the <a href="https://kafka.apache.org/documentation/#consumerconfigs_isolation.level">Kafka consumer <code>isolation.level</code> configuration</a> to <code>read\_committed</code>, only the committed records are displayed. Since records aren’t committed until the query takes a checkpoint, there is some additional delay when using this setting.</li><li><code>at\_least\_once</code>: Ensures that records are output to the changelog at least once. During query checkpointing, the query waits to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible as the query will try to reprocess old data.</li><li><code>none</code>: There is no fault tolerance guarantee when producing to the changelog. If there are issues on the Kafka broker, then records may be lost, and if there are issues with the query then output records may be duplicated.</li></ul> |
| `sink.timestamp.column` | <p>Specifies the name of the value column to be used to set the Kafka record’s timestamp when writing to the Kafka sink’s entity. If no timestamp column is specified, the Kafka producer record is created without an explicit timestamp, allowing the sink’s store to assign a timestamp according to its configured policy.</p><p><br><strong>Required:</strong> No<br><strong>Default value:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> One of the column names from the sink relation’s column list. Must be of type <code>BIGINT</code> or <code>TIMESTAMP</code> or <code>TIMESTAMP\_LTZ</code>. See <a data-mention href="../data-types">data-types</a>.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| `enable.upsert.mode`    | <p>Enables upsert mode for a Changelog. When set to <code>true</code>, DeltaStream automatically derives the relation key definition from the Changelog's PRIMARY KEY columns. Upsert mode is only supported on Kafka stores.<br><br><strong>Required</strong>: No <br><strong>Default value</strong>: false <br><strong>Type</strong>: Boolean <br><strong>Valid values</strong>: <code>true</code>, <code>false</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |

### Kinesis-Specific Parameters <a href="#example" id="example"></a>

Parameters to be used if the associated [store](https://docs.deltastream.io/overview/core-concepts/store "mention") is type `KINESIS`:

| Parameter Name | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| -------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `topic.shards` | <p>The number of shards to use when creating the entity, if applicable. If the entity already exists, this value must be equal to the number of shards in the existing Kinesis data stream.<br></p><p><strong>Required:</strong> Yes, unless entity already exists<br><strong>Default values:</strong> Leftmost source relation topic’s shard count<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]<br><strong>Alias:</strong> <code>kinesis.shards</code></p> |

Kinesis stores provide a delivery guarantee of `at_least_once` when producing events into a sink [#entity](https://docs.deltastream.io/overview/core-concepts/store#entity "mention").

### Format-Specific Parameters

#### Avro

Parameters to be used when writing records into a changelog if associated `key.format` or `value.format` is `avro` and the default Avro schema generation must be changed using a base schema for the key and/or value.

When generating an Avro schema for a column using a base schema:

* if the base schema has a field with the same name and data type as those of the column, then the field's definition from the base is used in the generated schema. This includes retaining the base schema's `doc` and `logicalType` for the field.
* if the base schema has a field with the same name as that of the column but has a different data type, then an Avro schema type definition is generated from the column's data type with the field's `doc` taken from the its corresponding field in the base schema.

{% hint style="info" %}
**Notes**

* Currently supported schema registries are Confluent Cloud and Confluent Platform.
* Known limitation: Confluent schema registry must use the default [TopicNameStrategy](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#subject-name-strategy) for creating subject names.

Check [create-schema\_registry](https://docs.deltastream.io/reference/sql-syntax/ddl/create-schema_registry "mention") for more details.
{% endhint %}

| Parameter Name            | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| ------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `avro.base.schema.store`  | <p>Name of the store whose <a href="../../../../overview/core-concepts/store#schema-registry">schema registry</a> contains the Avro schema subject(s) to be used as the base schema for generating the Avro schema for the changelog's key and/or value.<br></p><p><strong>Required:</strong> No<br><strong>Default values:</strong> Current session's store name<br><strong>Type:</strong> Identifier<br><strong>Valid values:</strong> See <a data-mention href="../command/list-stores">list-stores</a>.</p> |
| `avro.base.subject.key`   | <p>Name of the subject in the <a href="../../../../overview/core-concepts/store#schema-registry">schema registry</a> to obtain the base schema for generating Avro schema for changelog's key.</p><p><strong>Required:</strong> No, unless <code>key.format</code> is set to <code>avro</code> and <code>key.type</code> is defined.<br><strong>Type:</strong> String</p>                                                                                                                                       |
| `avro.base.subject.value` | <p>Name of the subject in the <a href="../../../../overview/core-concepts/store#schema-registry">schema registry</a> to obtain the base schema for generating Avro schema for changelog's value columns.</p><p><strong>Required:</strong> No, unless <code>value.format</code> is set to <code>avro</code> .<br><strong>Type:</strong> String</p>                                                                                                                                                               |

## Examples <a href="#example" id="example"></a>

#### Create a new changelog

The following creates a new changelog, `user_last_page`. This changelog reads from a topic named `pageviews` and has a `value.format` of `JSON`. Note that this query also specifies `userid` as the `PRIMARY KEY` for the changelog:

```sql
CREATE CHANGELOG user_last_page (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR,
   PRIMARY KEY(userid)
)
WITH (
   'topic'='pageviews',
   'value.format'='json'
);
```

#### Create a new changelog for an existing entity

The following creates a new `users` changelog for the existing `users` [#entity](https://docs.deltastream.io/overview/core-concepts/store#entity "mention") in the current [store](https://docs.deltastream.io/overview/core-concepts/store "mention"). This DDL implies that the name of the changelog should be used as the name of the entity that hosts the records. This DDL also implies the original structure for the `users` entity with a `PRIMARY KEY` for updates:

```sql
CREATE CHANGELOG "users" (
    registertime BIGINT,
    userid VARCHAR,
    regionid VARCHAR,
    gender VARCHAR,
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<
        phone VARCHAR,
        city VARCHAR,
        "state" VARCHAR,
        zipcode VARCHAR>,
    PRIMARY KEY(userid)
) WITH ( 'value.format'='json' );
```

#### Create a new changelog with passthrough configuration for retention

```sql
CREATE CHANGELOG customers_log (
 ts BIGINT, customer_id VARCHAR, full_name BIGINT, region VARCHAR, 
PRIMARY KEY(customer_id)
) WITH (
  'store' = 'kafka_store',
  'topic.partitions' = 1, 
  'topic.replicas' = 2, 
  'kafka.topic.retention.ms' = '172800000');
```

#### Create a new changelog with a multi-column primary key

The following creates a new changelog, `pagevisits`. This changelog reads from an entity named `pageviews` and has a `value.format` of `JSON`. Note that this query also specifies `(userid, pageid)` as the `PRIMARY KEY` for the changelog:

```sql
CREATE CHANGELOG pagevisits (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR,
   PRIMARY KEY(userid, pageid)
) WITH ( 'topic'='pageviews', 'value.format'='json' );
```

#### Create a new changelog with specifying key and timestamp

The following creates a new changelog, `LatestPageVisitor`, in the database, `DataBase`, and schema, `Schema2`. This changelog reads from a topic named `case_sensitive_pageviews` from the store `OtherStore` and has a `value.format` of Avro and a `key.format` of `PROTOBUF`. Since the `key.format` is included, it also requires the `key.type` and the value in this example is `STRUCT<pageid VARCHAR>`. This query also specifies `PageId` as the `PRIMARY KEY` for the changelog. Also, many of the columns are in quotes, indicating they are case-sensitive. The case-insensitive column named `CaseInsensitiveCol` is in lowercase as `caseinsensitivecol` when the relation is created. In the parameters, the `timestamp` for this relation is also specified, so queries processing data using this relation as the source refer to the `timestamp` column `ViewTime` as the event’s timestamp:

```sql
CREATE CHANGELOG "DataBase"."Schema2"."LatestPageVisitor" (
   "ViewTime" BIGINT,
   "userID" VARCHAR,
   "PageId" VARCHAR,
   "CaseSensitiveCol" BIGINT,
   CaseInsensitiveCol BIGINT,
   PRIMARY KEY("PageId")
) WITH (
   'topic'='case_sensitive_pageviews',
   'store'='OtherStore',
   'value.format'='avro',
   'key.format'='protobuf',
   'key.type'='STRUCT<"PageId" VARCHAR>',
   'timestamp'='ViewTime'
);
```

#### Create a new changelog specifying Kafka delivery guarantee

The following creates a new changelog, `user_exactly_once`. This changelog reads from an entity named `users` and has a `delivery.guarantee` of `exactly_once`. By specifying the `delivery.guarantee`, you override the default value of `at_least_once`. You may wish to use this configuration if your application can tolerate higher latencies but cannot tolerate duplicate records. When you use this changelog as the sink in an [insert-into](https://docs.deltastream.io/reference/sql-syntax/query/insert-into "mention") query, the query uses the `delivery.guarantee` specified here.

```sql
CREATE CHANGELOG user_exactly_once (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR,
   PRIMARY KEY(userid)
)
WITH (
   'topic'='users',
   'value.format'='json',
   'delivery.guarantee'='exactly_once'
);
```

#### Create a new changelog with \`NOT NULL\` column

The following creates a new changelog, `users_log`. Two columns in this changelog are defined with the `NOT NULL` constraint: `registertime` and `contactinfo`. As a result these two columns are not allowed to contain null values in any valid record from this changelog.

```sql
CREATE CHANGELOG users_log (
    registertime BIGINT NOT NULL,
    userid VARCHAR, 
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR> NOT NULL,
    PRIMARY KEY(userid)
)
WITH (
   'topic'='users', 
    'key.format'='json', 
    'key.type'='STRUCT<userid VARCHAR>', 
    'value.format'='json'
);
```

#### Create a new changelog with format specific properties for Avro

The following creates a new changelog, `usersInfo,` whose records' key and value are in `avro` format. It uses subjects from a store called `sr_store` as the base Avro schema to generate Avro schema for `usersInfo`'s key and value. It uses `users_data-key` subject to generate key's Avro schema. It also uses `users_data-value` subject to generate the value's Avro schema for the records written into `usersInfo.`

```sql
CREATE CHANGELOG "usersInfo" (
    registertime BIGINT NOT NULL,
    userid VARCHAR, 
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR> NOT NULL,
    PRIMARY KEY(userid)
)
WITH (
    'topic'='usersInfo', 
    'key.format'='avro',
    'value.format'='avro',
    'avro.base.store.name' = sr_store,
    'avro.base.subject.key' = 'users_data-key',
    'avro.base.subject.value' = 'users_data-value'
);
```

#### Create a changelog with data in S3

The following creates changelog with data backed by an S3 store:&#x20;

```sql
CREATE CHANGELOG pageviews_s3 (
    viewtime BIGINT, 
    userid VARCHAR, 
    pageid VARCHAR
) WITH (
    'store' = 's3_store', 
    's3.uri'='s3://ctan-playground-data/jsonl/', 
    's3.discovery.interval.seconds'=15, 
    'value.format'='jsonl'
);
```

**Notes:**

* `s3.uri` is required
* `value.format`: options\[jsonl, json]
* `s3.discovery.interval.seconds`: optional.  Default = 10

#### Create a new changelog with key columns

The following creates a pageviews changelog. The key and value of records in this changelog are both in `json` format.&#x20;

Value consists of 3 columns:

1. `viewtime`
2. `userid`
3. `pageid`

Key is a `STRUCT` with two fields:

1. `userid`
2. `pageid`

&#x20;The values of these fields come from the corresponding columns.

```sql
CREATE CHANGELOG pageviews (
    viewtime BIGINT, 
    userid VARCHAR, 
    pageid VARCHAR,
    PRIMARY KEY (userid)
) WITH (
    'store' = 'kafka_store',
    'topic' = 'pageviews',
    'value.format' = 'json',
    'key.format' = 'json', 
    'key.columns'='userid,pageid'
);
```

#### Create a new changelog with key columns and value exclude columns

The following creates a pageviews changelog. The key and value of records in this changelog are both in `json` format. Key is a `STRUCT` with two fields:

1. `userid`
2. `pageid`

The values of these fields come from the corresponding columns. Since`pageid` is set to be an excluded column from value, value in each record consists of 2 columns

1. `viewtime`
2. `userid`

```sql
CREATE CHANGELOG pageviews (
    viewtime BIGINT, 
    userid VARCHAR, 
    pageid VARCHAR,
    PRIMARY KEY (userid)
) WITH (
    'store' = 'kafka_store',
    'topic' = 'pageviews',
    'value.format' = 'json',
    'key.format' = 'json', 
    'key.columns'='userid,pageid',
    'value.columns.exclude'='pageid'
);
```

#### Create a new changelog without column definitions for protobuf data

The following creates a pageviews changelog. The value of records in this changelog is Protobuf. The 'pageviews' topic should exist as an entity in Deltastream and it should have a Protobuf value [descriptor](https://docs.deltastream.io/reference/sql-syntax/ddl/create-descriptor_source).

```sql
CREATE CHANGELOG pageviews PRIMARY KEY (userid) WITH (
    'store' = 'kafka_store',
    'topic' = 'pageviews',
    'value.format' = 'protobuf'
);
```

#### Create an upsert changelog

The following creates a new changelog in the upsert mode, `upsert_log`. Because `enable.upsert.mode` is set to `true`, DeltaStream automatically sets `key.format` to json (matching the `value.format`) and derives `key.columns` and `key.type` from the PRIMARY KEY column `userid`:

```sql
CREATE CHANGELOG upsert_log (
    userid VARCHAR,
    pageid VARCHAR,
    PRIMARY KEY(userid)
) WITH (
    'value.format' = 'json',
    'topic.partitions' = 1,
    'topic.replicas' = 1,
    'enable.upsert.mode' = true
);
```

#### Create an upsert changelog with an explicit key format

The following creates an upsert changelog whose record key is serialized as Avro while the value is JSON. Since `enable.upsert.mode` is enabled, `key.columns` and `key.type` are still derived from the PRIMARY KEY automatically:

```sql
CREATE CHANGELOG upsert_orders (
    orderid VARCHAR,
    userid VARCHAR,
    price INTEGER,
    PRIMARY KEY(orderid)
) WITH (
    'value.format' = 'json',
    'key.format' = 'avro',
    'topic.partitions' = 3,
    'topic.replicas' = 2,
    'enable.upsert.mode' = true
);
```
