CREATE CHANGELOG AS SELECT

Syntax

CREATE CHANGELOG changelog_name
[WITH (changelog_parameter = value [, ... ])]
AS select_statement;

Description

CREATE CHANGELOG AS is essentially a combination of two statements:

  • A DDL statement that creates a new #_changelog.

  • An INSERT INTO statement that runs a SELECT statement and adds the results into the newly-created changelog.

Arguments

changelog_name

Specifies the name of the new changelog. Optionally, use <database_name>.<schema_name> as the prefix to the name to create the relation in that scope. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.

WITH (changelog_parameter = value [, …​ ])

Optionally, this clause specifies the #_changelog_parameters.

select_statement

This statement specifies the SELECT statement to run.

Changelog Parameters

Parameter Name
Description

topic

The name of the entity that has the data for a newly-created sink changelog. If the entity doesn’t exist in the store, an entity with this name is created, and an entity with the changelog_name is created in the corresponding store.

Required: No Default value: Lowercase changelog_name. Type: String Valid values: See LIST ENTITIES

store

The name of the store that hosts the entity for this changelog. Required: No Default value: User’s default Store.

Type: String Valid values: See LIST STORES

value.format

Format of the message value in the #entity. For more information regarding serialization formats, see Data Formats (Serialization) . Required: No Default value: Value format from the leftmost source relation. Type: VALUE_FORMAT Valid values: JSON, AVRO, PROTOBUF, PRIMITIVE

timestamp

Name of the column in the changelog to use as the timestamp. If not set, the timestamp of the message is used for time-based operations such as window aggregations and joins.

Required: No Default value: Record’s timestamp. Type: String Valid values: Must be of type BIGINT or TIMESTAMP. See Data Types.

timestamp.format

The format to use for TIMESTAMP-typed fields. See Data Types.

Required: No Default value: sql Type: String Valid values: sql, iso8601

Kafka Specific Parameters

Parameter Name
Description

topic.partitions

The number of partitions to use when creating the Kafka topic, if applicable.

Required: Yes, unless entity already exists. Default value: Leftmost source relation entity’s partition count. Type: Integer Valid values: [1, ...]

topic.replicas

The number of replicas to use when creating the Kafka topic, if applicable.

Required: Yes, unless entity already exists. Default values: Leftmost source relation entity’s replica count. Type: Integer Valid values: [1, ...]

key.format

The format of a message key in the #entity. For more information regarding serialization formats see Data Formats (Serialization).

Required: No Default value: Key format from the leftmost source relation’s key (if any) or the same as value.format. Type: KEY_FORMAT Valid values: JSON, AVRO, PROTOBUF, PRIMITIVE

key.type

Declares the names and data types of key columns. The type is a STRUCT when key.format is a non-primitive value — for example,'key.type'='STRUCT<id BIGINT, name VARCHAR>'. For primitive values, the type is one of the Primitive Data Types, e.g. 'key.type'='VARCHAR'.

Required: No, unless key.format is set and there is no default value. Default value: For certain query semantics (such as queries using JOIN or GROUP BY), a generated key type is used by default. For queries that do not generate a key type, the key type from the leftmost source relation’s key (if any) is used by default. See Row Key Definition. Type: String Valid values: See STRUCT in Data Types

delivery.guarantee

The fault tolerance guarantees applied when producing to this changelog.

Required: No Default value: at_least_once Type: String Valid values:

  • exactly_once: Produces to the changelog using Kafka transactions. These transactions commit when the query takes a checkpoint. On the consumer side, when setting the Kafka consumer isolation.level configuration to read_committed, only the committed records display. Since records aren't committed until the query takes a checkpoint, there is some additional delay when you use this setting.

  • at_least_once: Ensures that records are output to the changelog at least once. During query checkpointing, the query waits to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible as the query attempts to reprocess old data.

  • none: There is no fault tolerance guarantee when producing to the changelog. If there are issues on the Kafka broker then records may be lost; if there are issues with the query then output records may be duplicated.

sink.timestamp.strategy

Determines how the timestamp values for records, written to a Kafka sink's entity, are set. Required: No Default value: proc.time Type: String Valid values:

  • event.time: Use timestamp of the records, coming from the source topic.

  • proc.time: Uses the current time of the Kafka producer when writing into the sink's entity. Note that the final timestamp used by Kafka depends on the timestamp type configured for the Kafka topic.

Kinesis Specific Parameters

Parameter Name
Description

topic.shards

The number of shards to use when creating the Kinesis stream, if applicable.

Required: Yes, unless entity already exists. Default values: Leftmost source relation entity’s shard count. Type: Integer Valid values: [1, ...] Alias: kinesis.shards

Kinesis stores provide a delivery guarantee of at_least_once when producing events into a sink #entity.

Format-Specific Properties

Examples

Create a copy changelog

The following creates a replica of the source changelog.

CREATE CHANGELOG users_clog AS SELECT * FROM users_log;

Create a changelog in a specific schema within a default database

The following creates a replica of the source changelog, but the new relation belongs to the schema named schema2 in the session’s current database.

CREATE CHANGELOG schema2.users_log_copy AS SELECT * FROM users_log;

Create a changelog in a specific schema and database

The following creates a replica of the source changelog, but the new relation belongs to the schema named schema2 in the database named db.

CREATE CHANGELOG db.schema2.users_log_copy AS SELECT * FROM users_log;

Create a case-sensitive changelog

The following creates a replica of the source changelog, and the new sink relation has a case-sensitive name.

CREATE CHANGELOG "Users" AS SELECT * FROM users_log;

Create a case-sensitive changelog in a case-sensitive schema and database

The following creates a replica of the source changelog. The new sink relation has a case-sensitive name and is in a case-sensitive database and schema.

CREATE CHANGELOG "DataBase"."Schema"."Users" AS SELECT * FROM users_log;

Create a new changelog backed by a specific entity

The following moves data from a Kafka store to a Kinesis store. The query creates a replica of the source changelog, but this new changelog is associated with the specified entity called userstwo.

CREATE CHANGELOG
  users2
  WITH ('topic' = 'userstwo')
AS SELECT * FROM users_log;

Copy data from one store to another

The following creates a replica of the source changelog, but this new changelog is associated with the specified store called kinesis_store.

CREATE CHANGELOG
  users_kinesis
  WITH ('store' = 'kinesis_store')
AS SELECT * FROM users_kafka;

Convert data from JSON to Avro

The following creates a replica of the source changelog that has a data format of JSON, but the new sink changelog has a data format of Avro for its value and key.

CREATE CHANGELOG users_avro
  WITH ('value.format' = 'avro', 'key.format' = 'AVRO') AS 
SELECT * FROM users_json;

Simple projection to a Kafka topic with a specific number of partitions and replicas

The following is a simple projection query where the sink Kafka topic has a specific number of partitions and replicas set.

CREATE CHANGELOG users2
  WITH ('topic.partitions' = '5', 'topic.replicas' = '3') AS 
SELECT
  registertime,
  userid AS uid,
  interests[1] AS top_interest
FROM users_log;

Simple projection to a Kinesis stream with a specific number of shards

The following is a simple projection query where the sink Kinesis stream has a specific number of shards set.

CREATE CHANGELOG
  users2
  WITH ('topic.shards' = '4')
AS SELECT
  registertime,
  userid AS uid,
  interests[1] AS top_interest
FROM users_log;

Create a changelog from aggregation

Aggregations of data on streams result in a CHANGELOG output relation type. The PRIMARY KEY for the following would be (userid).

CREATE CHANGELOG
  visitlogs
  WITH ('topic' = 'pglogs')
AS SELECT
  userid,
  count(pageid) AS pgcount
FROM pageviews
GROUP BY userid;

Create a changelog from HOP window aggregation

Aggregations of data on streams result in a CHANGELOG output relation type. The PRIMARY KEY for the following would be (window_start, window_end, userid, pageid).

CREATE CHANGELOG
  averagetime
AS SELECT 
  window_start, 
  window_end, 
  userid, 
  pageid, 
  avg(viewtime) AS avgtime 
FROM HOP(pageviews, size 8 second, advance by 4 seconds)
GROUP BY
  window_start, 
  window_end, 
  userid,
  pageid;

Create a changelog with specifying the timestamp column

The below statement creates a new changelog, called userslogs2, from an already existing changelog with the name userslogs. The timestamp changelog parameter, specified in the WITH clause, is used to mark the registertime column in userslogs2 as the timestamp column. Any subsequent query that refers to userslogs2 in its FROM clause uses this column for time-based operations.

CREATE CHANGELOG userslogs2
  WITH ('timestamp' = 'registertime') AS
SELECT userid, registertime, contactInfo['city'] AS city 
FROM userslog;

Create a changelog with specifying the Kafka delivery guarantee

The below statement creates a new changelog, called users_exactly_once, from the already existing changelog userslog. The delivery.guarantee changelog parameter, specified in the WITH clause, overrides the default delivery.guarantee of at_least_once to exactly_once. You may wish to use this configuration if your application can tolerate higher latencies but cannot tolerate duplicate outputs.

CREATE CHANGELOG users_exactly_once 
  WITH ('delivery.guarantee'='exactly_once') AS
SELECT *
FROM userslog;

Create a changelog with `event.time` sink timestamp strategy

The below statement creates a new changelog, called users_contact, from the already existing changelog userslog by selecting its primary key (for example, userid) and phone number for each user from the contactinfo struct. The sink timestamp strategy is set to event.time; this way, when writing to the sink's Kafka topic, the timestamp for each record in users_contact is set to its source record's timestamp, coming from the entity backing userslog.

-- Assuming the sink Store is Kafka
CREATE CHANGELOG users_contact 
  WITH ('sink.timestamp.strategy' = 'event.time') AS
SELECT userid, contactinfo->phone
FROM userslog;

Create a changelog with format-specific properties for Avro

The following creates a new changelog, usersInfo, by selecting records' key and value from another given changelog users_avro. Assuming users_avro key and value are in avro, two subjects are provided to generate the Avro schemas for userInfo 's key and value. These subjects are stored in the schema registry of a store called sr_store. users_data-key subject is used to generate key's Avro schema and users_data-value subject is used to generate value's Avro schema for the records written into usersInfo.

CREATE CHANGELOG "usersInfo" 
WITH ('topic'='usersInfo', 
      'avro.base.store.name' = sr_store,
      'avro.base.subject.key' = 'users_data-key',
      'avro.base.subject.value' = 'users_data-value') AS
SELECT * FROM users_avro;  

Last updated