CREATE CHANGELOG AS SELECT
Syntax
CREATE CHANGELOG changelog_name
[WITH (changelog_parameter = value [, ... ])]
AS select_statement;
Description
CREATE CHANGELOG AS
is essentially a combination of two statements:
A DDL statement that creates a new Database.
An INSERT INTO statement that runs a SELECT statement and adds the results into the newly-created changelog.
Arguments
changelog_name
Specifies the name of the new changelog. Optionally, use <database_name>.<schema_name>
as the prefix to the name to create the relation in that scope. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.
WITH (changelog_parameter = value [, … ])
Optionally, this clause specifies the CREATE CHANGELOG AS SELECT.
select_statement
This statement specifies the SELECT statement to run.
Changelog Parameters
topic
The name of the entity that has the data for a newly-created sink changelog. If the entity doesn’t exist in the store, an entity with this name is created, and an entity with the changelog_name
is created in the corresponding store.
Required: No
Default value: Lowercase changelog_name
.
Type: String
Valid values: See LIST ENTITIES
store
The name of the store that hosts the entity for this changelog. Required: No Default value: User’s default Data Store.
Type: String Valid values: See LIST STORES
value.format
Format of the message value in the Data Store. For more information regarding serialization formats, see Data Formats (Serialization) .
Required: No
Default value: Value format from the leftmost source relation.
Type: VALUE_FORMAT
Valid values: JSON
, AVRO
, PROTOBUF
, PRIMITIVE
timestamp
Name of the column in the changelog to use as the timestamp. If not set, the timestamp of the message is used for time-based operations such as window aggregations and joins.
Required: No
Default value: Record’s timestamp.
Type: String
Valid values: Must be of type BIGINT
or TIMESTAMP
. See Data Types.
timestamp.format
The format to use for TIMESTAMP-
typed fields. See Data Types.
Required: No
Default value: sql
Type: String
Valid values: sql
, iso8601
Kafka-Specific Parameters
topic.partitions
The number of partitions to use when creating the Kafka topic, if applicable. Note that the number of partitions must be compatible with the Kafka cluster's configuration. If there's any mismatch, a query may time out when communicating with the cluster at record creation time.
Required: Yes, unless entity already exists. Default value: Leftmost source relation entity’s partition count. Type: Integer Valid values: [1, ...]
topic.replicas
The number of replicas to use when creating the Kafka topic, if applicable. Note that the number of replicas must be compatible with the Kafka cluster's configuration. If there's any mismatch, a query may time out when communicating with the cluster at record creation time.
Required: Yes, unless entity already exists. Default values: Leftmost source relation entity’s replica count. Type: Integer Valid values: [1, ...]
kafka.topic.*
A configuration specific for the topic being created — for example, Kafka Entity Configuration for Confluent Platform. Required: No Default value: None Type: String Valid values: Kafka topic configuration specific to the underlying Data Store type.
key.format
The format of a message key in the Data Store. For more information regarding serialization formats see Data Formats (Serialization).
Required: No
Default value: None
Type: String
Valid values: JSON
, AVRO
, PROTOBUF
, PRIMITIVE
key.columns
Specifies the name(s) of the value columns, separated by commas, that are used to construct the record key.
For a non-primitive key.format, the record key is created as a STRUCT whose fields are the columns listed in this property.
For a primitive key.format
, this property must contain exactly one column with a primitive data type.
Required: No Default: None Type: String Valid values: One or more valid column names from the sink relation’s column list, separated by commas.
value.columns.exclude
Specifies the name(s) of the columns, separated by commas, that should be excluded from the record’s value and included only in its key.
This property can only be set if key.columns is already defined.
The excluded columns must appear at the end of the relation’s column list and must also be listed in key.columns.
Required: No Default: None Type: String Valid values: One or more valid column names from the object’s column list, separated by commas.
delivery.guarantee
The fault tolerance guarantees applied when producing to this changelog.
Required: No
Default value: at_least_once
Type: String
Valid values:
exactly_once
: Produces to the changelog using Kafka transactions. These transactions commit when the query takes a checkpoint. On the consumer side, when setting the Kafka consumerisolation.level
configuration toread_committed
, only the committed records display. Since records aren't committed until the query takes a checkpoint, there is some additional delay when you use this setting.at_least_once
: Ensures that records are output to the changelog at least once. During query checkpointing, the query waits to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible as the query attempts to reprocess old data.none
: There is no fault tolerance guarantee when producing to the changelog. If there are issues on the Kafka broker then records may be lost; if there are issues with the query then output records may be duplicated.
sink.timestamp.strategy
Determines how the timestamp values for records, written to a Kafka sink's entity, are set.
Required: No
Default value: proc.time
Type: String
Valid values:
event.time
: Use timestamp of the records, coming from the source topic.proc.time
: Uses the current time of the Kafka producer when writing into the sink's entity. Note that the final timestamp used by Kafka depends on the timestamp type configured for the Kafka topic.
Kinesis-Specific Parameters
topic.shards
The number of shards to use when creating the Kinesis stream, if applicable.
Required: Yes, unless entity already exists.
Default values: Leftmost source relation entity’s shard count.
Type: Integer
Valid values: [1, ...]
Alias: kinesis.shards
Kinesis stores provide a delivery guarantee of at_least_once
when producing events into a sink Data Store.
Format-Specific Properties
All format-specific properties applicable to a changelog can be provided as a changelog_parameter
. See Format-Specific Parameters for more details.
Examples
Create a copy changelog
The following creates a replica of the source changelog.
CREATE CHANGELOG users_clog AS SELECT * FROM users_log;
Create a changelog with passthrough configuration for retention
CREATE CHANGELOG us_customers_log WITH (
'store' = 'kafka_store',
'topic.partitions' = 1,
'topic.replicas' = 2,
'kafka.topic.retention.ms' = '172800000')
AS SELECT * FROM customers_log WHERE region = 'US';
Create a changelog in a specific schema within a default database
The following creates a replica of the source changelog, but the new relation belongs to the schema named schema2
in the session’s current database.
CREATE CHANGELOG schema2.users_log_copy AS SELECT * FROM users_log;
Create a changelog in a specific schema and database
The following creates a replica of the source changelog, but the new relation belongs to the schema named schema2
in the database named db
.
CREATE CHANGELOG db.schema2.users_log_copy AS SELECT * FROM users_log;
Create a case-sensitive changelog
The following creates a replica of the source changelog, and the new sink relation has a case-sensitive name.
CREATE CHANGELOG "Users" AS SELECT * FROM users_log;
Create a case-sensitive changelog in a case-sensitive schema and database
The following creates a replica of the source changelog. The new sink relation has a case-sensitive name and is in a case-sensitive database and schema.
CREATE CHANGELOG "DataBase"."Schema"."Users" AS SELECT * FROM users_log;
Create a new changelog backed by a specific entity
The following moves data from a Kafka store to a Kinesis store. The query creates a replica of the source changelog, but this new changelog is associated with the specified entity called userstwo
.
CREATE CHANGELOG
users2
WITH ('topic' = 'userstwo')
AS SELECT * FROM users_log;
Copy data from one store to another
The following creates a replica of the source changelog, but this new changelog is associated with the specified store called kinesis_store
.
CREATE CHANGELOG
users_kinesis
WITH ('store' = 'kinesis_store')
AS SELECT * FROM users_kafka;
Convert data from JSON to Avro
The following creates a replica of the source changelog that has a data format of JSON, but the new sink changelog has a data format of Avro for its value and key.
CREATE CHANGELOG users_avro
WITH ('value.format' = 'avro', 'key.format' = 'AVRO') AS
SELECT * FROM users_json;
Simple projection to a Kafka topic with a specific number of partitions and replicas
The following is a simple projection query where the sink Kafka topic has a specific number of partitions and replicas set.
CREATE CHANGELOG users2
WITH ('topic.partitions' = '5', 'topic.replicas' = '3') AS
SELECT
registertime,
userid AS uid,
interests[1] AS top_interest
FROM users_log;
Simple projection to a Kinesis stream with a specific number of shards
The following is a simple projection query where the sink Kinesis stream has a specific number of shards set.
CREATE CHANGELOG
users2
WITH ('topic.shards' = '4')
AS SELECT
registertime,
userid AS uid,
interests[1] AS top_interest
FROM users_log;
Create a changelog from aggregation
Aggregations of data on streams result in a CHANGELOG
output relation type. The PRIMARY KEY
for the following would be (userid)
.
CREATE CHANGELOG
visitlogs
WITH ('topic' = 'pglogs')
AS SELECT
userid,
count(pageid) AS pgcount
FROM pageviews
GROUP BY userid;
Create a changelog from HOP window aggregation
Aggregations of data on streams result in a CHANGELOG
output relation type. The PRIMARY KEY
for the following would be (window_start, window_end, userid, pageid)
.
CREATE CHANGELOG
averagetime
AS SELECT
window_start,
window_end,
userid,
pageid,
avg(viewtime) AS avgtime
FROM HOP(pageviews, size 8 second, advance by 4 seconds)
GROUP BY
window_start,
window_end,
userid,
pageid;
Create a changelog with specifying the timestamp column
The below statement creates a new changelog, called userslogs2
, from an already existing changelog with the name userslogs
. The timestamp
changelog parameter, specified in the WITH
clause, is used to mark the registertime
column in userslogs2
as the timestamp column. Any subsequent query that refers to userslogs2
in its FROM
clause uses this column for time-based operations.
CREATE CHANGELOG userslogs2
WITH ('timestamp' = 'registertime') AS
SELECT userid, registertime, contactInfo['city'] AS city
FROM userslog;
Create a changelog with specifying the Kafka delivery guarantee
The below statement creates a new changelog, called users_exactly_once
, from the already existing changelog userslog
. The delivery.guarantee
changelog parameter, specified in the WITH
clause, overrides the default delivery.guarantee
of at_least_once
to exactly_once
. You may wish to use this configuration if your application can tolerate higher latencies but cannot tolerate duplicate outputs.
CREATE CHANGELOG users_exactly_once
WITH ('delivery.guarantee'='exactly_once') AS
SELECT *
FROM userslog;
Create a changelog with `event.time` sink timestamp strategy
The below statement creates a new changelog, called users_contact
, from the already existing changelog userslog
by selecting its primary key (for example, userid
) and phone number for each user from the contactinfo
struct. The sink timestamp strategy is set to event.time
; this way, when writing to the sink's Kafka topic, the timestamp for each record in users_contact
is set to its source record's timestamp, coming from the entity backing userslog
.
-- Assuming the sink Store is Kafka
CREATE CHANGELOG users_contact
WITH ('sink.timestamp.strategy' = 'event.time') AS
SELECT userid, contactinfo->phone
FROM userslog;
Create a changelog with format-specific properties for Avro
The following creates a new changelog, usersInfo,
by selecting records' key and value from another given changelog users_avro
. Assuming users_avro
key and value are in avro
, two subjects are provided to generate the Avro schemas for userInfo
's key and value. These subjects are stored in the schema registry of a store called sr_store
. users_data-key
subject is used to generate key's Avro schema and users_data-value
subject is used to generate value's Avro schema for the records written into usersInfo.
CREATE CHANGELOG "usersInfo"
WITH ('topic'='usersInfo',
'avro.base.store.name' = sr_store,
'avro.base.subject.key' = 'users_data-key',
'avro.base.subject.value' = 'users_data-value') AS
SELECT * FROM users_avro;
Create a changelog with with key columns for sink
The following creates a new changelog, keyed_aggr
, by grouping records in the pageviews
stream based on userid
and counting how many pages each user has visited so far. Since key.format
for the sink is set to be json
and userid
is chosen as the key column, each record in the sink has a key of STRUCT
type with a field named userid
. The value in each record has two columns: userid
and cnt
.
CREATE CHANGELOG keyed_aggr
WITH ('key.format'='json',
'key.columns' = 'userid') AS
SELECT userid, count(*) AS cnt
FROM pageviews
GROUP BY userid;
Last updated