# CREATE CHANGELOG AS SELECT

## Syntax <a href="#synopsis" id="synopsis"></a>

```sql
CREATE CHANGELOG changelog_name
[WITH (changelog_parameter = value [, ... ])]
AS select_statement;
```

## Description <a href="#description" id="description"></a>

`CREATE CHANGELOG AS` is essentially a combination of two statements:

* A DDL statement that creates a new [#\_changelog](https://docs.deltastream.io/overview/core-concepts/databases#_changelog "mention").
* An [insert-into](https://docs.deltastream.io/reference/sql-syntax/query/insert-into "mention") statement that runs a [select](https://docs.deltastream.io/reference/sql-syntax/query/select "mention") statement and adds the results into the newly-created changelog.

### Arguments <a href="#parameters" id="parameters"></a>

#### changelog\_name

Specifies the name of the new changelog. Optionally, use `<database_name>.<schema_name>` as the prefix to the name to create the relation in that scope. If the name is case-sensitive, you must wrap it in double quotes; otherwise, the system uses the lower case name.

#### WITH (changelog\_parameter = value \[, …​ ])

Optionally, this clause specifies the [#\_changelog\_parameters](#_changelog_parameters "mention").

#### select\_statement

This statement specifies the [select](https://docs.deltastream.io/reference/sql-syntax/query/select "mention") statement to run.

### Upsert Mode Changelog <a href="#changelog_parameters" id="changelog_parameters"></a>

An [upsert mode Changelog](https://docs.deltastream.io/reference/ddl/create-changelog#changelog_parameters) can be created from another one using `CREATE CHANGELOG AS SELECT` statement with below rules:

* When the source Changelog has `enable.upsert.mode` set to `true`, the sink Changelog automatically inherits upsert mode. The key properties (`key.format`, `key.columns`, `key.type`) are derived from the sink's PRIMARY KEY columns.
* You cannot explicitly set `enable.upsert.mode` to `false` on the sink when the source is an upsert Changelog.
* `enable.upsert.mode` cannot be supplied as a query-time source property on the FROM clause. It is a DDL-only property.
* When using INSERT INTO with an upsert source, the sink must also be an upsert changelog.

### Changelog Parameters <a href="#changelog_parameters" id="changelog_parameters"></a>

| Parameter Name     | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `topic`            | <p>The name of the entity that has the data for a newly-created sink changelog. If the entity doesn’t exist in the store, an entity with this name is created, and an entity with the <code>changelog\_name</code> is created in the corresponding store.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> Lowercase <code>changelog\_name</code>.<br><strong>Type:</strong> String<br><strong>Valid values:</strong> See <a data-mention href="../command/list-entities">list-entities</a></p>                                                      |
| `store`            | <p>The name of the store that hosts the entity for this changelog.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> User’s default <a data-mention href="../../../overview/core-concepts/store">store</a>.</p><p><strong>Type:</strong> String<br><strong>Valid values:</strong> See <a data-mention href="../command/list-stores">list-stores</a></p>                                                                                                                                                                                                  |
| `value.format`     | <p>Format of the message value in the <a data-mention href="../../../../overview/core-concepts/store#entity">#entity</a>. For more information regarding serialization formats, see <a data-mention href="../data-format-serialization">data-format-serialization</a> .<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> Value format from the leftmost source relation.<br><strong>Type:</strong> <code>VALUE\_FORMAT</code><br><strong>Valid values:</strong> <code>JSON</code>, <code>AVRO</code>, <code>PROTOBUF</code>, <code>PRIMITIVE</code></p> |
| `timestamp`        | <p>Name of the column in the changelog to use as the timestamp. If not set, the timestamp of the message is used for time-based operations such as window aggregations and joins.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> Record’s timestamp.<br><strong>Type:</strong> String<br><strong>Valid values:</strong> Must be of type <code>BIGINT</code> or <code>TIMESTAMP</code> or <code>TIMESTAMP\_LTZ</code>. See <a data-mention href="../data-types">data-types</a>.</p>                                                                 |
| `timestamp.format` | <p>The format to use for <code>TIMESTAMP-</code>typed fields. See <a data-mention href="../data-types">data-types</a>.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> <code>sql</code><br><strong>Type:</strong> String<br><strong>Valid values:</strong> <code>sql</code>, <code>iso8601</code></p>                                                                                                                                                                                                                                               |

### **Kafka-Specific Parameters**

| Parameter Name          | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| ----------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `topic.partitions`      | <p>The number of partitions to use when creating the Kafka topic, if applicable. Note that the number of partitions must be compatible with the Kafka cluster's configuration. If there's any mismatch, a query may time out when communicating with the cluster at record creation time.<br></p><p><strong>Required:</strong> Yes, unless entity already exists.<br><strong>Default value:</strong> Leftmost source relation entity’s partition count.<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
| `topic.replicas`        | <p>The number of replicas to use when creating the Kafka topic, if applicable. Note that the number of replicas must be compatible with the Kafka cluster's configuration. If there's any mismatch, a query may time out when communicating with the cluster at record creation time.<br></p><p><strong>Required:</strong> Yes, unless entity already exists.<br><strong>Default values:</strong> Leftmost source relation entity’s replica count.<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
| `kafka.topic.*`         | <p>A configuration specific for the topic being created — for example, <a href="https://kafka.apache.org/documentation/">Kafka Entity Configuration for Confluent Platform</a>.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> Kafka topic configuration specific to the underlying <a data-mention href="../../../overview/core-concepts/store">store</a> type.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| `key.format`            | <p>The format of a message key in the <a data-mention href="../../../../overview/core-concepts/store#entity">#entity</a>. For more information regarding serialization formats see <a data-mention href="../data-format-serialization">data-format-serialization</a>.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> <code>JSON</code>, <code>AVRO</code>, <code>PROTOBUF</code>, <code>PRIMITIVE</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| `key.columns`           | <p>Specifies the name(s) of the value columns, separated by commas, that are used to construct the record key.</p><p><br>For a non-primitive key.format, the record key is created as a STRUCT whose fields are the columns listed in this property.</p><p><br>For a primitive <code>key.format</code>, this property must contain exactly one column with a primitive data type.<br></p><p><strong>Required</strong>: No<br><strong>Default</strong>: None<br><strong>Type</strong>: String<br><strong>Valid values</strong>: One or more valid column names from the sink relation’s column list, separated by commas.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| `value.columns.exclude` | <p>Specifies the name(s) of the columns, separated by commas, that should be excluded from the record’s value and included only in its key.</p><p><br>This property can only be set if key.columns is already defined.</p><p><br>The excluded columns must appear at the end of the relation’s column list and must also be listed in key.columns.<br></p><p><strong>Required</strong>: No<br><strong>Default</strong>: None<br><strong>Type</strong>: String<br><strong>Valid values</strong>: One or more valid column names from the object’s column list, separated by commas.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| `delivery.guarantee`    | <p>The fault tolerance guarantees applied when producing to this changelog.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> <code>at\_least\_once</code><br><strong>Type:</strong> String<br><strong>Valid values:</strong></p><ul><li><code>exactly\_once</code>: Produces to the changelog using Kafka transactions. These transactions commit when the query takes a checkpoint. On the consumer side, when setting the <a href="https://kafka.apache.org/documentation/#consumerconfigs_isolation.level">Kafka consumer <code>isolation.level</code> configuration</a> to <code>read\_committed</code>, only the committed records display. Since records aren't committed until the query takes a checkpoint, there is some additional delay when you use this setting.</li><li><code>at\_least\_once</code>: Ensures that records are output to the changelog at least once. During query checkpointing, the query waits to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible as the query attempts to reprocess old data.</li><li><code>none</code>: There is no fault tolerance guarantee when producing to the changelog. If there are issues on the Kafka broker then records may be lost; if there are issues with the query then output records may be duplicated.</li></ul> |
| `sink.timestamp.column` | <p>Specifies the name of the value column. Use this to set the Kafka record’s timestamp when you write to the Kafka sink’s entity.</p><p>If you do not specify a timestamp column, the system creates the Kafka producer record without an explicit timestamp. This allows the sink’s store to assign a timestamp according to its configured policy.</p><p><br><strong>Required:</strong> No<br><strong>Default value:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> One of the column names from the sink relation’s column list. Must be of type <code>BIGINT</code> or <code>TIMESTAMP</code> or <code>TIMESTAMP\_LTZ</code>. See <a data-mention href="../data-types">data-types</a>.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
| `enable.upsert.mode`    | <p>Enables upsert mode for a Changelog. When set to <code>true</code>, DeltaStream automatically derives the relation key definition from the Changelog's PRIMARY KEY columns. Upsert mode is only supported on Kafka stores.<br><br><strong>Required</strong>: No<br><strong>Default value</strong>: false<br><strong>Type</strong>: Boolean<br><strong>Valid values</strong>: <code>true</code>, <code>false</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |

### **Kinesis-Specific Parameters**

| Parameter Name | Description                                                                                                                                                                                                                                                                                                                                                                         |
| -------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `topic.shards` | <p>The number of shards to use when creating the Kinesis stream, if applicable.<br></p><p><strong>Required:</strong> Yes, unless entity already exists.<br><strong>Default values:</strong> Leftmost source relation entity’s shard count.<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]<br><strong>Alias:</strong> <code>kinesis.shards</code></p> |

Kinesis stores provide a delivery guarantee of `at_least_once` when producing events into a sink [#entity](https://docs.deltastream.io/overview/core-concepts/store#entity "mention").

### Format-Specific Properties

All format-specific properties applicable to a changelog can be provided as a `changelog_parameter`. See [#format-specific-parameters](https://docs.deltastream.io/reference/ddl/create-changelog#format-specific-parameters "mention") for more details.

## Examples <a href="#example" id="example"></a>

#### Create a copy changelog

The following creates a replica of the source changelog.

```sql
CREATE CHANGELOG users_clog AS SELECT * FROM users_log;
```

#### Create a changelog with passthrough configuration for retention

```sql
CREATE CHANGELOG us_customers_log WITH (
  'store' = 'kafka_store',
  'topic.partitions' = 1, 
  'topic.replicas' = 2, 
  'kafka.topic.retention.ms' = '172800000') 
AS SELECT * FROM customers_log WHERE region = 'US';
```

#### Create a changelog in a specific schema within a default database

The following creates a replica of the source changelog, but the new relation belongs to the schema named `schema2` in the session’s current database.

```sql
CREATE CHANGELOG schema2.users_log_copy AS SELECT * FROM users_log;
```

#### Create a changelog in a specific schema and database

The following creates a replica of the source changelog, but the new relation belongs to the schema named `schema2` in the database named `db`.

```sql
CREATE CHANGELOG db.schema2.users_log_copy AS SELECT * FROM users_log;
```

#### Create a case-sensitive changelog

The following creates a replica of the source changelog, and the new sink relation has a case-sensitive name.

```sql
CREATE CHANGELOG "Users" AS SELECT * FROM users_log;
```

#### Create a case-sensitive changelog in a case-sensitive schema and database

The following creates a replica of the source changelog. The new sink relation has a case-sensitive name and is in a case-sensitive database and schema.

```sql
CREATE CHANGELOG "DataBase"."Schema"."Users" AS SELECT * FROM users_log;
```

#### Create a new changelog backed by a specific entity

The following moves data from a Kafka store to a Kinesis store. The query creates a replica of the source changelog, but this new changelog is associated with the specified entity called `userstwo`.

```sql
CREATE CHANGELOG
  users2
  WITH ('topic' = 'userstwo')
AS SELECT * FROM users_log;
```

#### Copy data from one store to another

The following creates a replica of the source changelog, but this new changelog is associated with the specified store called `kinesis_store`.

```sql
CREATE CHANGELOG
  users_kinesis
  WITH ('store' = 'kinesis_store')
AS SELECT * FROM users_kafka;
```

#### Convert data from JSON to Avro

The following creates a replica of the source changelog that has a data format of JSON, but the new sink changelog has a data format of Avro for its value and key.

```sql
CREATE CHANGELOG users_avro
  WITH ('value.format' = 'avro', 'key.format' = 'AVRO') AS 
SELECT * FROM users_json;
```

#### Simple projection to a Kafka topic with a specific number of partitions and replicas

The following is a simple projection query where the sink Kafka topic has a specific number of partitions and replicas set.

```sql
CREATE CHANGELOG users2
  WITH ('topic.partitions' = '5', 'topic.replicas' = '3') AS 
SELECT
  registertime,
  userid AS uid,
  interests[1] AS top_interest
FROM users_log;
```

#### Simple projection to a Kinesis stream with a specific number of shards

The following is a simple projection query where the sink Kinesis stream has a specific number of shards set.

```sql
CREATE CHANGELOG
  users2
  WITH ('topic.shards' = '4')
AS SELECT
  registertime,
  userid AS uid,
  interests[1] AS top_interest
FROM users_log;
```

#### Create a changelog from aggregation

Aggregations of data on streams result in a `CHANGELOG` output relation type. The `PRIMARY KEY` for the following would be `(userid)`.

```sql
CREATE CHANGELOG
  visitlogs
  WITH ('topic' = 'pglogs')
AS SELECT
  userid,
  count(pageid) AS pgcount
FROM pageviews
GROUP BY userid;
```

#### Create a changelog from HOP window aggregation

Aggregations of data on streams result in a `CHANGELOG` output relation type. The `PRIMARY KEY` for the following would be `(window_start, window_end, userid, pageid)`.

```sql
CREATE CHANGELOG
  averagetime
AS SELECT 
  window_start, 
  window_end, 
  userid, 
  pageid, 
  avg(viewtime) AS avgtime 
FROM HOP(pageviews, size 8 second, advance by 4 seconds)
GROUP BY
  window_start, 
  window_end, 
  userid,
  pageid;
```

#### Create a changelog with specifying the timestamp column

The below statement creates a new changelog, called `userslogs2`, from an already existing changelog with the name `userslogs`. The `timestamp` changelog parameter, specified in the `WITH` clause, is used to mark the `registertime` column in `userslogs2` as the timestamp column. Any subsequent query that refers to `userslogs2` in its `FROM` clause uses this column for time-based operations.

```sql
CREATE CHANGELOG userslogs2
  WITH ('timestamp' = 'registertime') AS
SELECT userid, registertime, contactInfo['city'] AS city 
FROM userslog;
```

#### Create a changelog with specifying the Kafka delivery guarantee

The below statement creates a new changelog, called `users_exactly_once`, from the already existing changelog `userslog`. The `delivery.guarantee` changelog parameter, specified in the `WITH` clause, overrides the default `delivery.guarantee` of `at_least_once` to `exactly_once`. You may wish to use this configuration if your application can tolerate higher latencies but cannot tolerate duplicate outputs.

```sql
CREATE CHANGELOG users_exactly_once 
  WITH ('delivery.guarantee'='exactly_once') AS
SELECT *
FROM userslog;
```

#### Create a changelog with sink timestamp column

The below statement creates a new changelog, called `users_contact`, from the pre-existing changelog `userslog`. In this statement you select the primary key (for example, `userid`) and phone number for each user from the `contactinfo` struct, in addition to the time the user registered. The value of `registertime` column is what sets the timestamp for records written to the Kafka sink.

```sql
-- Assuming the sink Store is Kafka
CREATE CHANGELOG users_contact 
  WITH ('sink.timestamp.column' = 'registertime') AS
SELECT registertime, userid, contactinfo->phone
FROM userslog;
```

#### Create a changelog with format-specific properties for Avro

The following creates a new changelog, `usersInfo,` by selecting records' key and value from another given changelog `users_avro`. Assuming `users_avro` key and value are in `avro`, two subjects are provided to generate the Avro schemas for `userInfo` 's key and value. These subjects are stored in the schema registry of a store called `sr_store`. `users_data-key` subject is used to generate key's Avro schema and `users_data-value` subject is used to generate value's Avro schema for the records written into `usersInfo.`

```sql
CREATE CHANGELOG "usersInfo" 
WITH ('topic'='usersInfo', 
      'avro.base.store.name' = sr_store,
      'avro.base.subject.key' = 'users_data-key',
      'avro.base.subject.value' = 'users_data-value') AS
SELECT * FROM users_avro;  
```

#### Create a changelog with with key columns for sink

The following creates a new changelog, `keyed_aggr` , by grouping records in the `pageviews` stream based on `userid` and counting how many pages each user has visited so far. Since `key.format` for the sink is set to be `json` and `userid` is chosen as the key column, each record in the sink has a key of `STRUCT` type with a field named `userid`. The value in each record has two columns: `userid` and `cnt` .

```sql
CREATE CHANGELOG keyed_aggr
WITH ('key.format'='json', 
      'key.columns' = 'userid') AS
SELECT userid, count(*) AS cnt  
FROM pageviews
GROUP BY userid;  
```

#### Create a changelog from an upsert source

When the source changelog has upsert mode enabled, the sink changelog automatically inherits upsert mode. The PRIMARY KEY of the sink is derived from the projected primary key columns of the source. In this example, if `upsert_log` was created with `enable.upsert.mode`=`true` and `PRIMARY KEY(userid)`, then `upsert_copy` is automatically created with upsert mode enabled and `userid` as its primary key.

```sql
CREATE CHANGELOG upsert_copy
AS SELECT userid, pageid
FROM upsert_log;
```

#### Create a filtered changelog from an upsert source with explicit store

```sql
CREATE CHANGELOG us_orders
  WITH ('store' = 'kafka_store', 'topic.partitions' = 3)
AS SELECT orderid, userid, price
FROM upsert_orders
WHERE price > 100;
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.deltastream.io/reference/sql-syntax/query/create-changelog-as.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
