CREATE CHANGELOG AS SELECT
Syntax
Description
CREATE CHANGELOG AS
is essentially a combination of two statements:
A DDL statement that creates a new #_changelog.
An INSERT INTO statement that runs a SELECT statement and adds the results into the newly-created changelog.
Arguments
changelog_name
Specifies the name of the new changelog. Optionally, use <database_name>.<schema_name>
as the prefix to the name to create the relation in that scope. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.
WITH (changelog_parameter = value [, … ])
Optionally, this clause specifies the #_changelog_parameters.
select_statement
This statement specifies the SELECT statement to run.
Changelog Parameters
topic
The name of the entity that has the data for a newly-created sink changelog. If the entity doesn’t exist in the store, an entity with this name is created, and an entity with the changelog_name
is created in the corresponding store.
Required: No
Default value: Lowercase changelog_name
.
Type: String
Valid values: See LIST ENTITIES
store
The name of the store that hosts the entity for this changelog. Required: No Default value: User’s default Store.
Type: String Valid values: See LIST STORES
value.format
Format of the message value in the #entity. For more information regarding serialization formats, see Data Formats (Serialization) .
Required: No
Default value: Value format from the leftmost source relation.
Type: VALUE_FORMAT
Valid values: JSON
, AVRO
, PROTOBUF
, PRIMITIVE
timestamp
Name of the column in the changelog to use as the timestamp. If not set, the timestamp of the message is used for time-based operations such as window aggregations and joins.
Required: No
Default value: Record’s timestamp.
Type: String
Valid values: Must be of type BIGINT
or TIMESTAMP
. See Data Types.
timestamp.format
The format to use for TIMESTAMP-
typed fields. See Data Types.
Required: No
Default value: sql
Type: String
Valid values: sql
, iso8601
Kafka Specific Parameters
topic.partitions
The number of partitions to use when creating the Kafka topic, if applicable.
Required: Yes, unless entity already exists. Default value: Leftmost source relation entity’s partition count. Type: Integer Valid values: [1, ...]
topic.replicas
The number of replicas to use when creating the Kafka topic, if applicable.
Required: Yes, unless entity already exists. Default values: Leftmost source relation entity’s replica count. Type: Integer Valid values: [1, ...]
key.format
The format of a message key in the #entity. For more information regarding serialization formats see Data Formats (Serialization).
Required: No
Default value: Key format from the leftmost source relation’s key (if any) or the same as value.format
.
Type: KEY_FORMAT
Valid values: JSON
, AVRO
, PROTOBUF
, PRIMITIVE
key.type
Declares the names and data types of key columns. The type is a STRUCT
when key.format
is a non-primitive value — for example,'key.type'='STRUCT<id BIGINT, name VARCHAR>'
. For primitive values, the type is one of the Primitive Data Types, e.g. 'key.type'='VARCHAR'
.
Required: No, unless key.format
is set and there is no default value.
Default value: For certain query semantics (such as queries using JOIN or GROUP BY), a generated key type is used by default. For queries that do not generate a key type, the key type from the leftmost source relation’s key (if any) is used by default. See Row Key Definition.
Type: String
Valid values: See STRUCT
in Data Types
delivery.guarantee
The fault tolerance guarantees applied when producing to this changelog.
Required: No
Default value: at_least_once
Type: String
Valid values:
exactly_once
: Produces to the changelog using Kafka transactions. These transactions commit when the query takes a checkpoint. On the consumer side, when setting the Kafka consumerisolation.level
configuration toread_committed
, only the committed records display. Since records aren't committed until the query takes a checkpoint, there is some additional delay when you use this setting.at_least_once
: Ensures that records are output to the changelog at least once. During query checkpointing, the query waits to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible as the query attempts to reprocess old data.none
: There is no fault tolerance guarantee when producing to the changelog. If there are issues on the Kafka broker then records may be lost; if there are issues with the query then output records may be duplicated.
sink.timestamp.strategy
Determines how the timestamp values for records, written to a Kafka sink's entity, are set.
Required: No
Default value: proc.time
Type: String
Valid values:
event.time
: Use timestamp of the records, coming from the source topic.proc.time
: Uses the current time of the Kafka producer when writing into the sink's entity. Note that the final timestamp used by Kafka depends on the timestamp type configured for the Kafka topic.
Kinesis Specific Parameters
topic.shards
The number of shards to use when creating the Kinesis stream, if applicable.
Required: Yes, unless entity already exists.
Default values: Leftmost source relation entity’s shard count.
Type: Integer
Valid values: [1, ...]
Alias: kinesis.shards
Kinesis stores provide a delivery guarantee of at_least_once
when producing events into a sink #entity.
Format-Specific Properties
Examples
Create a copy changelog
The following creates a replica of the source changelog.
Create a changelog in a specific schema within a default database
The following creates a replica of the source changelog, but the new relation belongs to the schema named schema2
in the session’s current database.
Create a changelog in a specific schema and database
The following creates a replica of the source changelog, but the new relation belongs to the schema named schema2
in the database named db
.
Create a case-sensitive changelog
The following creates a replica of the source changelog, and the new sink relation has a case-sensitive name.
Create a case-sensitive changelog in a case-sensitive schema and database
The following creates a replica of the source changelog. The new sink relation has a case-sensitive name and is in a case-sensitive database and schema.
Create a new changelog backed by a specific entity
The following moves data from a Kafka store to a Kinesis store. The query creates a replica of the source changelog, but this new changelog is associated with the specified entity called userstwo
.
Copy data from one store to another
The following creates a replica of the source changelog, but this new changelog is associated with the specified store called kinesis_store
.
Convert data from JSON to Avro
The following creates a replica of the source changelog that has a data format of JSON, but the new sink changelog has a data format of Avro for its value and key.
Simple projection to a Kafka topic with a specific number of partitions and replicas
The following is a simple projection query where the sink Kafka topic has a specific number of partitions and replicas set.
Simple projection to a Kinesis stream with a specific number of shards
The following is a simple projection query where the sink Kinesis stream has a specific number of shards set.
Create a changelog from aggregation
Aggregations of data on streams result in a CHANGELOG
output relation type. The PRIMARY KEY
for the following would be (userid)
.
Create a changelog from HOP window aggregation
Aggregations of data on streams result in a CHANGELOG
output relation type. The PRIMARY KEY
for the following would be (window_start, window_end, userid, pageid)
.
Create a changelog with specifying the timestamp column
The below statement creates a new changelog, called userslogs2
, from an already existing changelog with the name userslogs
. The timestamp
changelog parameter, specified in the WITH
clause, is used to mark the registertime
column in userslogs2
as the timestamp column. Any subsequent query that refers to userslogs2
in its FROM
clause uses this column for time-based operations.
Create a changelog with specifying the Kafka delivery guarantee
The below statement creates a new changelog, called users_exactly_once
, from the already existing changelog userslog
. The delivery.guarantee
changelog parameter, specified in the WITH
clause, overrides the default delivery.guarantee
of at_least_once
to exactly_once
. You may wish to use this configuration if your application can tolerate higher latencies but cannot tolerate duplicate outputs.
Create a changelog with `event.time` sink timestamp strategy
The below statement creates a new changelog, called users_contact
, from the already existing changelog userslog
by selecting its primary key (for example, userid
) and phone number for each user from the contactinfo
struct. The sink timestamp strategy is set to event.time
; this way, when writing to the sink's Kafka topic, the timestamp for each record in users_contact
is set to its source record's timestamp, coming from the entity backing userslog
.
Create a changelog with format-specific properties for Avro
The following creates a new changelog, usersInfo,
by selecting records' key and value from another given changelog users_avro
. Assuming users_avro
key and value are in avro
, two subjects are provided to generate the Avro schemas for userInfo
's key and value. These subjects are stored in the schema registry of a store called sr_store
. users_data-key
subject is used to generate key's Avro schema and users_data-value
subject is used to generate value's Avro schema for the records written into usersInfo.
Last updated