CREATE CHANGELOG AS SELECT
Syntax
Description
CREATE CHANGELOG AS
is essentially a combination of two statements:
A DDL statement that creates a new #_changelog.
An INSERT INTO statement that runs a SELECT statement and adds the results into the newly created Changelog.
Arguments
changelog_name
This specifies the name of the new Changelog. Optionally, use <database_name>.<schema_name>
as the prefix to the name to create the Relation in that scope. For case-sensitive names, the name must be wrapped in double quotes; otherwise, the lowercase name will be used.
WITH (changelog_parameter = value [, … ])
Optionally, this clause specifies the #_changelog_parameters.
select_statement
This statement specifies the SELECT statement to run.
Changelog Parameters
Parameter Name | Description |
---|---|
| The name of the Entity that has the data for a newly created sink Changelog. If the Entity doesn’t exist in the Store, an Entity with this name is created in the Store, and an Entity with the Required: No
Default value: Lowercase |
| The name of the Store that hosts the Entity for this Changelog. Required: No Default value: User’s default Store. Type: String Valid values: See LIST STORES |
| |
| Name of the column in the Changelog to use as the timestamp. If not set, the timestamp of the message is used for time based operations such as window aggregations and joins. Required: No
Default value: Record’s timestamp.
Type: String
Valid values: Must be of type |
| The format to use for Required: No
Default value: |
Kafka Specific Parameters
Parameter Name | Description |
---|---|
| The number of partitions to use when creating the Kafka topic, if applicable. Required: Yes, unless Entity already exists. Default value: Leftmost source Relation Entity’s partition count. Type: Integer Valid values: [1, ...] |
| The number of replicas to use when creating the Kafka topic, if applicable. Required: Yes, unless Entity already exists. Default values: Leftmost source Relation Entity’s replica count. Type: Integer Valid values: [1, ...] |
| Required: No
Default value: Key format from the leftmost source Relation’s key (if any) or the same as |
| Required: No, unless |
| The fault tolerance guarantees applied when producing to this Changelog. Required: No
Default value:
|
| Determines how the timestamp values for records, written to a Kafka sink's Entity, are set.
Required: No
Default value:
|
Kinesis Specific Parameters
Parameter Name | Description |
---|---|
| The number of shards to use when creating the Kinesis stream, if applicable. Required: Yes, unless Entity already exists.
Default values: Leftmost source Relation Entity’s shard count.
Type: Integer
Valid values: [1, ...]
Alias: |
Kinesis stores provide a delivery guarantee of at_least_once
when producing events into a sink Entity.
Format Specific Properties
Examples
Create a copy Changelog
The following creates a replica of the source Changelog.
Create a Changelog in a specific Schema within a default Database
The following creates a replica of the source Changelog, but the new Relation belongs to the Schema named schema2
in the session’s current Database.
Create a Changelog in a specific Schema and Database
The following creates a replica of the source Changelog, but the new Relation belongs to the Schema named schema2
in the Database named db
.
Create a case-sensitive Changelog
The following creates a replica of the source Changelog, and the new sink Relation has a case-sensitive name.
Create a case-sensitive Changelog in a case-sensitive Schema and Database
The following creates a replica of the source Changelog. The new sink Relation has a case-sensitive name and is in a case-sensitive Database and Schema.
Create a new Changelog that is backed by a specific Entity
The following moves data from a Kafka Store to a Kinesis Store. The query creates a replica of the source Changelog, but this new Changelog is associated with the specified Entity called userstwo
.
Copy data from one Store to another
The following creates a replica of the source Changelog, but this new Changelog is associated with the specified Store called kinesis_store
.
Convert data from JSON to Avro
The following creates a replica of the source Changelog that has a data format of JSON, but the new sink Changelog has a data format of Avro for its value and key.
Simple projection to a Kafka topic with a specific number of partitions and replicas
The following is a simple projection query where the sink Kafka topic has a specific number of partitions and replicas set.
Simple projection to a Kinesis stream with a specific number of shards
The following is a simple projection query where the sink Kinesis stream has a specific number of shards set.
Create a Changelog from aggregation
Aggregations of data on Streams result in a CHANGELOG
output Relation type. The PRIMARY KEY
for the following would be (userid)
.
Create a Changelog from HOP window aggregation
Aggregations of data on Streams result in a CHANGELOG
output Relation type. The PRIMARY KEY
for the following would be (window_start, window_end, userid, pageid)
.
Create a Changelog with specifying the timestamp column
The below statement creates a new Changelog, called userslogs2
, from an already existing Changelog with the name userslogs
. The timestamp
Changelog parameter, specified in the WITH
clause, is used to mark the registertime
column in userslogs2
as the timestamp column. Therefore, any subsequent query that refers to userslogs2
in its FROM
clause will use this column for time based operations.
Create a Changelog with specifying the Kafka delivery guarantee
The below statement creates a new Changelog, called users_exactly_once
, from the already existing Changelog userslog
. The delivery.guarantee
Changelog parameter, specified in the WITH
clause, is used to override the default delivery.guarantee
of at_least_once
to exactly_once
. A user may want to use this configuration if their application can tolerate higher latencies but cannot tolerate duplicate outputs.
Create a Changelog with `event.time` Sink timestamp strategy
The below statement creates a new Changelog, called users_contact
, from the already existing Changelog userslog
by selecting its primary key (i.e. userid
) and phone number for each user from the contactinfo
Struct. The sink timestamp strategy is set to event.time
; this way when writing to the sink's Kafka topic, the timestamp for each record in users_contact
is set to its source record's timestamp, coming from the Entity backing userslog
.
Create a Changelog with format specific properties for Avro
The following creates a new Changelog, usersInfo,
by selecting records' key and value from another given Changelog users_avro
. Assuming users_avro
key and value are in avro
, two subjects are provided to generate the Avro schemas for userInfo
's key and value. These subjects are stored in the Schema Registry of a Store called sr_store
, and users_data-key
subject is used to generate key's Avro schema and users_data-value
subject is used to generate value's Avro schema for the records written into usersInfo.
Last updated