CREATE CHANGELOG AS SELECT
Last updated
Last updated
CREATE CHANGELOG AS
is essentially a combination of two statements:
A DDL statement that creates a new .
An INSERT INTO statement that runs a SELECT statement and adds the results into the newly created changelog.
This specifies the name of the new changelog. Optionally, use <database_name>.<schema_name>
as the prefix to the name to create the relation in that scope. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.
Optionally, this clause specifies the #_changelog_parameters.
This statement specifies the SELECT statement to run.
topic
The name of the entity that has the data for a newly created sink changelog. If the entity doesn’t exist in the store, an entity with this name is created in the store, and an entity with the changelog_name
is created in the corresponding store.
store
value.format
timestamp
Name of the column in the changelog to use as the timestamp. If not set, the timestamp of the message is used for time-based operations such as window aggregations and joins.
timestamp.format
Required: No
Default value: sql
Type: String
Valid values: sql
, iso8601
topic.partitions
The number of partitions to use when creating the Kafka topic, if applicable.
Required: Yes, unless entity already exists. Default value: Leftmost source relation entity’s partition count. Type: Integer Valid values: [1, ...]
topic.replicas
The number of replicas to use when creating the Kafka topic, if applicable.
Required: Yes, unless entity already exists. Default values: Leftmost source relation entity’s replica count. Type: Integer Valid values: [1, ...]
key.format
Required: No
Default value: Key format from the leftmost source relation’s key (if any) or the same as value.format
.
Type: KEY_FORMAT
Valid values: JSON
, AVRO
, PROTOBUF
, PRIMITIVE
key.type
delivery.guarantee
The fault tolerance guarantees applied when producing to this changelog.
Required: No
Default value: at_least_once
Type: String
Valid values:
at_least_once
: Ensures that records are output to the changelog at least once. During query checkpointing, the query will wait to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible as the query attempts to reprocess old data.
none
: There is no fault tolerance guarantee when producing to the changelog. If there are issues on the Kafka broker then records may be lost; if there are issues with the query then output records may be duplicated.
sink.timestamp.strategy
Determines how the timestamp values for records, written to a Kafka sink's entity, are set.
Required: No
Default value: proc.time
Type: String
Valid values:
event.time
: Use timestamp of the records, coming from the source topic.
proc.time
: Uses the current time of the Kafka producer when writing into the sink's entity. Note that the final timestamp used by Kafka will depend on the timestamp type configured for the Kafka topic.
topic.shards
The number of shards to use when creating the Kinesis stream, if applicable.
Required: Yes, unless entity already exists.
Default values: Leftmost source relation entity’s shard count.
Type: Integer
Valid values: [1, ...]
Alias: kinesis.shards
The following creates a replica of the source changelog.
The following creates a replica of the source changelog, but the new relation belongs to the schema named schema2
in the session’s current database.
The following creates a replica of the source changelog, but the new relation belongs to the schema named schema2
in the database named db
.
The following creates a replica of the source changelog, and the new sink relation has a case-sensitive name.
The following creates a replica of the source changelog. The new sink relation has a case-sensitive name and is in a case-sensitive database and schema.
The following moves data from a Kafka store to a Kinesis store. The query creates a replica of the source changelog, but this new changelog is associated with the specified entity called userstwo
.
The following creates a replica of the source changelog, but this new changelog is associated with the specified store called kinesis_store
.
The following creates a replica of the source changelog that has a data format of JSON, but the new sink changelog has a data format of Avro for its value and key.
The following is a simple projection query where the sink Kafka topic has a specific number of partitions and replicas set.
The following is a simple projection query where the sink Kinesis stream has a specific number of shards set.
Aggregations of data on streams result in a CHANGELOG
output relation type. The PRIMARY KEY
for the following would be (userid)
.
Aggregations of data on streams result in a CHANGELOG
output relation type. The PRIMARY KEY
for the following would be (window_start, window_end, userid, pageid)
.
The below statement creates a new changelog, called userslogs2
, from an already existing changelog with the name userslogs
. The timestamp
changelog parameter, specified in the WITH
clause, is used to mark the registertime
column in userslogs2
as the timestamp column. Therefore, any subsequent query that refers to userslogs2
in its FROM
clause uses this column for time-based operations.
The below statement creates a new changelog, called users_exactly_once
, from the already existing changelog userslog
. The delivery.guarantee
changelog parameter, specified in the WITH
clause, is used to override the default delivery.guarantee
of at_least_once
to exactly_once
. You may wish to use this configuration if your application can tolerate higher latencies but cannot tolerate duplicate outputs.
The below statement creates a new changelog, called users_contact
, from the already existing changelog userslog
by selecting its primary key (for example, userid
) and phone number for each user from the contactinfo
struct. The sink timestamp strategy is set to event.time
; this way when writing to the sink's Kafka topic, the timestamp for each record in users_contact
is set to its source record's timestamp, coming from the entity backing userslog
.
The following creates a new changelog, usersInfo,
by selecting records' key and value from another given changelog users_avro
. Assuming users_avro
key and value are in avro
, two subjects are provided to generate the Avro schemas for userInfo
's key and value. These subjects are stored in the schema registry of a store called sr_store
, and users_data-key
subject is used to generate key's Avro schema and users_data-value
subject is used to generate value's Avro schema for the records written into usersInfo.
Required: No
Default value: Lowercase changelog_name
.
Type: String
Valid values: See
The name of the store that hosts the dntity for this changelog. Required: No Default value: User’s default .
Type: String Valid values: See
Format of the message value in the . See for more information regarding serialization formats.
Required: No
Default value: Value format from the leftmost source relation.
Type: VALUE_FORMAT
Valid values: JSON
, AVRO
, PROTOBUF
, PRIMITIVE
Required: No
Default value: Record’s timestamp.
Type: String
Valid values: Must be of type BIGINT
or TIMESTAMP
. See .
The format to use for TIMESTAMP-
typed fields. See .
The format of a message key in the . See for more information regarding serialization formats.
Declares the names and data types of key columns. The type is a STRUCT
when key.format
is a non-primitive value — for example,'key.type'='STRUCT<id BIGINT, name VARCHAR>'
. For primitive values, the type is one of the , e.g. 'key.type'='VARCHAR'
.
Required: No, unless key.format
is set and there is no default value.
Default value: For certain query semantics (i.e. queries using JOIN or GROUP BY), a generated key type is used by default. For queries that do not generate a key type, the key type from the leftmost source relation’s key is used by default (if any). See .
Type: String
Valid values: See STRUCT
in
exactly_once
: Produces to the changelog using Kafka transactions. These transactions will be committed when the query takes a checkpoint. On the consumer side, when setting the to read_committed
, only the committed records will be seen. Since records aren't committed until the query takes a checkpoint, there is some additional delay when using this setting.
Kinesis stores provide a delivery guarantee of at_least_once
when producing events into a sink .
All format-specific properties that are applicable to a changelog can be provided as a changelog_parameter
. See for more details.