CREATE CHANGELOG AS SELECT
Syntax
CREATE CHANGELOG changelog_name
[WITH (changelog_parameter = value [, ... ])]
AS select_statement;Description
CREATE CHANGELOG AS is essentially a combination of two statements:
- A DDL statement that creates a new Database. 
- An INSERT INTO statement that runs a SELECT statement and adds the results into the newly-created changelog. 
Arguments
changelog_name
Specifies the name of the new changelog. Optionally, use <database_name>.<schema_name> as the prefix to the name to create the relation in that scope. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.
WITH (changelog_parameter = value [, … ])
Optionally, this clause specifies the CREATE CHANGELOG AS SELECT.
select_statement
This statement specifies the SELECT statement to run.
Changelog Parameters
topic
The name of the entity that has the data for a newly-created sink changelog. If the entity doesn’t exist in the store, an entity with this name is created, and an entity with the changelog_name is created in the corresponding store.
Required: No
Default value: Lowercase changelog_name.
Type: String
Valid values: See LIST ENTITIES
store
The name of the store that hosts the entity for this changelog. Required: No Default value: User’s default Data Store.
Type: String Valid values: See LIST STORES
value.format
Format of the message value in the Data Store. For more information regarding serialization formats, see Data Formats (Serialization) .
Required: No
Default value: Value format from the leftmost source relation.
Type: VALUE_FORMAT
Valid values: JSON, AVRO, PROTOBUF, PRIMITIVE
timestamp
Name of the column in the changelog to use as the timestamp. If not set, the timestamp of the message is used for time-based operations such as window aggregations and joins.
Required: No
Default value: Record’s timestamp.
Type: String
Valid values: Must be of type BIGINT or TIMESTAMP or TIMESTAMP_LTZ. See Data Types.
timestamp.format
The format to use for TIMESTAMP-typed fields. See Data Types.
Required: No
Default value: sql
Type: String
Valid values: sql, iso8601
Kafka-Specific Parameters
topic.partitions
The number of partitions to use when creating the Kafka topic, if applicable. Note that the number of partitions must be compatible with the Kafka cluster's configuration. If there's any mismatch, a query may time out when communicating with the cluster at record creation time.
Required: Yes, unless entity already exists. Default value: Leftmost source relation entity’s partition count. Type: Integer Valid values: [1, ...]
topic.replicas
The number of replicas to use when creating the Kafka topic, if applicable. Note that the number of replicas must be compatible with the Kafka cluster's configuration. If there's any mismatch, a query may time out when communicating with the cluster at record creation time.
Required: Yes, unless entity already exists. Default values: Leftmost source relation entity’s replica count. Type: Integer Valid values: [1, ...]
kafka.topic.*
A configuration specific for the topic being created — for example, Kafka Entity Configuration for Confluent Platform. Required: No Default value: None Type: String Valid values: Kafka topic configuration specific to the underlying Data Store type.
key.format
The format of a message key in the Data Store. For more information regarding serialization formats see Data Formats (Serialization).
Required: No
Default value: None
Type: String 
Valid values: JSON, AVRO, PROTOBUF, PRIMITIVE
key.columns
Specifies the name(s) of the value columns, separated by commas, that are used to construct the record key.
For a non-primitive key.format, the record key is created as a STRUCT whose fields are the columns listed in this property.
For a primitive key.format, this property must contain exactly one column with a primitive data type.
Required: No Default: None Type: String Valid values: One or more valid column names from the sink relation’s column list, separated by commas.
value.columns.exclude
Specifies the name(s) of the columns, separated by commas, that should be excluded from the record’s value and included only in its key.
This property can only be set if key.columns is already defined.
The excluded columns must appear at the end of the relation’s column list and must also be listed in key.columns.
Required: No Default: None Type: String Valid values: One or more valid column names from the object’s column list, separated by commas.
delivery.guarantee
The fault tolerance guarantees applied when producing to this changelog.
Required: No
Default value: at_least_once
Type: String
Valid values:
- exactly_once: Produces to the changelog using Kafka transactions. These transactions commit when the query takes a checkpoint. On the consumer side, when setting the Kafka consumer- isolation.levelconfiguration to- read_committed, only the committed records display. Since records aren't committed until the query takes a checkpoint, there is some additional delay when you use this setting.
- at_least_once: Ensures that records are output to the changelog at least once. During query checkpointing, the query waits to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible as the query attempts to reprocess old data.
- none: There is no fault tolerance guarantee when producing to the changelog. If there are issues on the Kafka broker then records may be lost; if there are issues with the query then output records may be duplicated.
sink.timestamp.column
Specifies the name of the value column. Use this to set the Kafka record’s timestamp when you write to the Kafka sink’s entity.
If you do not specify a timestamp column, the system creates the Kafka producer record without an explicit timestamp. This allows the sink’s store to assign a timestamp according to its configured policy.
Required: No
Default value: None
Type: String
Valid values: One of the column names from the sink relation’s column list. Must be of type BIGINT or TIMESTAMP or TIMESTAMP_LTZ. See Data Types.
Kinesis-Specific Parameters
topic.shards
The number of shards to use when creating the Kinesis stream, if applicable.
Required: Yes, unless entity already exists.
Default values: Leftmost source relation entity’s shard count.
Type: Integer
Valid values: [1, ...]
Alias: kinesis.shards
Kinesis stores provide a delivery guarantee of at_least_once when producing events into a sink Data Store.
Format-Specific Properties
All format-specific properties applicable to a changelog can be provided as a changelog_parameter. See CREATE CHANGELOG #Format-Specific Parameters for more details.
Examples
Create a copy changelog
The following creates a replica of the source changelog.
CREATE CHANGELOG users_clog AS SELECT * FROM users_log;Create a changelog with passthrough configuration for retention
CREATE CHANGELOG us_customers_log WITH (
  'store' = 'kafka_store',
  'topic.partitions' = 1, 
  'topic.replicas' = 2, 
  'kafka.topic.retention.ms' = '172800000') 
AS SELECT * FROM customers_log WHERE region = 'US';Create a changelog in a specific schema within a default database
The following creates a replica of the source changelog, but the new relation belongs to the schema named schema2 in the session’s current database.
CREATE CHANGELOG schema2.users_log_copy AS SELECT * FROM users_log;Create a changelog in a specific schema and database
The following creates a replica of the source changelog, but the new relation belongs to the schema named schema2 in the database named db.
CREATE CHANGELOG db.schema2.users_log_copy AS SELECT * FROM users_log;Create a case-sensitive changelog
The following creates a replica of the source changelog, and the new sink relation has a case-sensitive name.
CREATE CHANGELOG "Users" AS SELECT * FROM users_log;Create a case-sensitive changelog in a case-sensitive schema and database
The following creates a replica of the source changelog. The new sink relation has a case-sensitive name and is in a case-sensitive database and schema.
CREATE CHANGELOG "DataBase"."Schema"."Users" AS SELECT * FROM users_log;Create a new changelog backed by a specific entity
The following moves data from a Kafka store to a Kinesis store. The query creates a replica of the source changelog, but this new changelog is associated with the specified entity called userstwo.
CREATE CHANGELOG
  users2
  WITH ('topic' = 'userstwo')
AS SELECT * FROM users_log;Copy data from one store to another
The following creates a replica of the source changelog, but this new changelog is associated with the specified store called kinesis_store.
CREATE CHANGELOG
  users_kinesis
  WITH ('store' = 'kinesis_store')
AS SELECT * FROM users_kafka;Convert data from JSON to Avro
The following creates a replica of the source changelog that has a data format of JSON, but the new sink changelog has a data format of Avro for its value and key.
CREATE CHANGELOG users_avro
  WITH ('value.format' = 'avro', 'key.format' = 'AVRO') AS 
SELECT * FROM users_json;Simple projection to a Kafka topic with a specific number of partitions and replicas
The following is a simple projection query where the sink Kafka topic has a specific number of partitions and replicas set.
CREATE CHANGELOG users2
  WITH ('topic.partitions' = '5', 'topic.replicas' = '3') AS 
SELECT
  registertime,
  userid AS uid,
  interests[1] AS top_interest
FROM users_log;Simple projection to a Kinesis stream with a specific number of shards
The following is a simple projection query where the sink Kinesis stream has a specific number of shards set.
CREATE CHANGELOG
  users2
  WITH ('topic.shards' = '4')
AS SELECT
  registertime,
  userid AS uid,
  interests[1] AS top_interest
FROM users_log;Create a changelog from aggregation
Aggregations of data on streams result in a CHANGELOG output relation type. The PRIMARY KEY for the following would be (userid).
CREATE CHANGELOG
  visitlogs
  WITH ('topic' = 'pglogs')
AS SELECT
  userid,
  count(pageid) AS pgcount
FROM pageviews
GROUP BY userid;Create a changelog from HOP window aggregation
Aggregations of data on streams result in a CHANGELOG output relation type. The PRIMARY KEY for the following would be (window_start, window_end, userid, pageid).
CREATE CHANGELOG
  averagetime
AS SELECT 
  window_start, 
  window_end, 
  userid, 
  pageid, 
  avg(viewtime) AS avgtime 
FROM HOP(pageviews, size 8 second, advance by 4 seconds)
GROUP BY
  window_start, 
  window_end, 
  userid,
  pageid;Create a changelog with specifying the timestamp column
The below statement creates a new changelog, called userslogs2, from an already existing changelog with the name userslogs. The timestamp changelog parameter, specified in the WITH clause, is used to mark the registertime column in userslogs2 as the timestamp column. Any subsequent query that refers to userslogs2 in its FROM clause uses this column for time-based operations.
CREATE CHANGELOG userslogs2
  WITH ('timestamp' = 'registertime') AS
SELECT userid, registertime, contactInfo['city'] AS city 
FROM userslog;Create a changelog with specifying the Kafka delivery guarantee
The below statement creates a new changelog, called users_exactly_once, from the already existing changelog userslog. The delivery.guarantee changelog parameter, specified in the WITH clause, overrides the default delivery.guarantee of at_least_once to exactly_once. You may wish to use this configuration if your application can tolerate higher latencies but cannot tolerate duplicate outputs.
CREATE CHANGELOG users_exactly_once 
  WITH ('delivery.guarantee'='exactly_once') AS
SELECT *
FROM userslog;Create a changelog with sink timestamp column
The below statement creates a new changelog, called users_contact, from the pre-existing changelog userslog.  In this statement you select the primary key (for example, userid) and phone number for each user from the contactinfo struct, in addition to the time the user registered.   The value of registertime column is what sets the timestamp for records written to the Kafka sink. 
-- Assuming the sink Store is Kafka
CREATE CHANGELOG users_contact 
  WITH ('sink.timestamp.column' = 'registertime') AS
SELECT registertime, userid, contactinfo->phone
FROM userslog;Create a changelog with format-specific properties for Avro
The following creates a new changelog, usersInfo, by selecting records' key and value from another given changelog users_avro. Assuming users_avro key and value are in avro, two subjects are provided to generate the Avro schemas for userInfo 's key and value. These subjects are stored in the schema registry of a store called sr_store. users_data-key subject is used to generate key's Avro schema and users_data-value subject is used to generate value's Avro schema for the records written into usersInfo.
CREATE CHANGELOG "usersInfo" 
WITH ('topic'='usersInfo', 
      'avro.base.store.name' = sr_store,
      'avro.base.subject.key' = 'users_data-key',
      'avro.base.subject.value' = 'users_data-value') AS
SELECT * FROM users_avro;  Create a changelog with with key columns for sink
The following creates a new changelog, keyed_aggr , by grouping records in the pageviews stream based on userid and counting how many pages each user has visited so far. Since key.format for the sink is set to be json and userid is chosen as the key column, each record in the sink has a key of STRUCT type with a field named userid. The value in each record has two columns: userid and cnt .
CREATE CHANGELOG keyed_aggr
WITH ('key.format'='json', 
      'key.columns' = 'userid') AS
SELECT userid, count(*) AS cnt  
FROM pageviews
GROUP BY userid;  Last updated

