Comment on page
CREATE CHANGELOG AS SELECT
CREATE CHANGELOG
changelog_name
[WITH (changelog_parameter = value [, ... ])]
AS select_statement;
CREATE CHANGELOG AS
is essentially a combination of two statements:- An INSERT INTO statement that runs a SELECT statement and adds the results into the newly created Changelog.
This specifies the name of the new Changelog. Optionally, use
<database_name>.<schema_name>
as the prefix to the name to create the Relation in that scope. For case-sensitive names, the name must be wrapped in double quotes; otherwise, the lowercase name will be used.Parameter Name | Description |
---|---|
topic | The name of the Topic that has the data for a newly created sink Changelog. If the Topic doesn’t exist in the Store, a Topic with this name is created in the Store, and a Topic with the changelog_name is created in the corresponding Store.
|
store | The name of the Store that hosts the Topic for this Changelog.
Required: No
Default value: User’s default Store. |
value.format | Format of the message value in the Topic. See Data Formats (Serialization) for more information regarding serialization formats.
Required: No
Default value: Value format from the leftmost source Relation.
Type: VALUE_FORMAT
Valid values: JSON , AVRO , PROTOBUF , PRIMITIVE |
timestamp | Name of the column in the Changelog to use as the timestamp. If not set, the timestamp of the message is used for time based operations such as window aggregations and joins.
Required: No
Default value: Record’s timestamp.
Type: String
Valid values: Must be of type BIGINT or TIMESTAMP . See Data Types. |
timestamp.format | Required: No
Default value: sql
Type: String
Valid values: sql , iso8601 |
Parameter Name | Description |
---|---|
topic.partitions | The number of partitions to use when creating the Kafka topic, if applicable.
Required: Yes, unless Topic already exists.
Default value: Leftmost source Relation Topic’s partition count.
Type: Integer
Valid values: [1, ...] |
topic.replicas | The number of replicas to use when creating the Kafka topic, if applicable.
Required: Yes, unless Topic already exists.
Default values: Leftmost source Relation Topic’s replica count.
Type: Integer
Valid values: [1, ...] |
key.format | The format of a message key in the Topic. See Data Formats (Serialization) for more information regarding serialization formats.
Required: No
Default value: Key format from the leftmost source Relation’s key (if any) or the same as value.format .
Type: KEY_FORMAT
Valid values: JSON , AVRO , PROTOBUF , PRIMITIVE |
key.type | Declares the names and data types of key columns. The type is a STRUCT when key.format is a non-primitive value, e.g. 'key.type'='STRUCT<id BIGINT, name VARCHAR>' . For primitive values, the type is one of the Primitive Data Types, e.g. 'key.type'='VARCHAR' .
Required: No, unless key.format is set and there is no default value.
Default value: For certain query semantics (i.e. queries using JOIN or GROUP BY), a generated key type is used by default. For queries that do not generate a key type, the key type from the leftmost source Relation’s key is used by default (if any). See Row Key Definition.
Type: String
Valid values: See STRUCT in Data Types |
delivery.guarantee | The fault tolerance guarantees applied when producing to this Changelog.
Required: No
Default value: at_least_once
Type: String
Valid values:
|
Parameter Name | Description |
---|---|
topic.shards | The number of shards to use when creating the Kinesis stream, if applicable.
Required: Yes, unless Topic already exists.
Default values: Leftmost source Relation Topic’s shard count.
Type: Integer
Valid values: [1, ...]
Alias: kinesis.shards |
Kinesis stores provide a delivery guarantee of
at_least_once
when producing events into a sink Topic.The following creates a replica of the source Changelog.
CREATE CHANGELOG users_clog AS SELECT * FROM users_log;
The following creates a replica of the source Changelog, but the new Relation belongs to the Schema named
schema2
in the session’s current Database.CREATE CHANGELOG schema2.users_log_copy AS SELECT * FROM users_log;
The following creates a replica of the source Changelog, but the new Relation belongs to the Schema named
schema2
in the Database named db
.CREATE CHANGELOG db.schema2.users_log_copy AS SELECT * FROM users_log;
The following creates a replica of the source Changelog, and the new sink Relation has a case-sensitive name.
CREATE CHANGELOG "Users" AS SELECT * FROM users_log;
The following creates a replica of the source Changelog. The new sink Relation has a case-sensitive name and is in a case-sensitive Database and Schema.
CREATE CHANGELOG "DataBase"."Schema"."Users" AS SELECT * FROM users_log;
The following moves data from a Kafka Store to a Kinesis Store. The query creates a replica of the source Changelog, but this new Changelog is associated with the specified Topic called
userstwo
.CREATE CHANGELOG
users2
WITH ('topic' = 'userstwo')
AS SELECT * FROM users_log;
The following creates a replica of the source Changelog, but this new Changelog is associated with the specified Store called
kinesis_store
.CREATE CHANGELOG
users_kinesis
WITH ('store' = 'kinesis_store')
AS SELECT * FROM users_kafka;
The following creates a replica of the source Changelog that has a data format of JSON, but the new sink Changelog has a data format of Avro for its value and key.
CREATE CHANGELOG users_avro
WITH ('value.format' = 'avro', 'key.format' = 'AVRO') AS
SELECT * FROM users_json;
The following is a simple projection query where the sink Kafka topic has a specific number of partitions and replicas set.
CREATE CHANGELOG users2
WITH ('topic.partitions' = '5', 'topic.replicas' = '3') AS
SELECT
registertime,
userid AS uid,
interests[1] AS top_interest
FROM users_log;
The following is a simple projection query where the sink Kinesis stream has a specific number of shards set.
CREATE CHANGELOG
users2
WITH ('topic.shards' = '4')
AS SELECT
registertime,
userid AS uid,
interests[1] AS top_interest
FROM users_log;
Aggregations of data on Streams result in a
CHANGELOG
output Relation type. The PRIMARY KEY
for the following would be (userid)
.CREATE CHANGELOG
visitlogs
WITH ('topic' = 'pglogs')
AS SELECT
userid,
count(pageid) AS pgcount
FROM pageviews
GROUP BY userid;
Aggregations of data on Streams result in a
CHANGELOG
output Relation type. The PRIMARY KEY
for the following would be (window_start, window_end, userid, pageid)
.CREATE CHANGELOG
averagetime
AS SELECT
window_start,
window_end,
userid,
pageid,
avg(viewtime) AS avgtime
FROM HOP(pageviews, size 8 second, advance by 4 seconds)
GROUP BY
window_start,
window_end,
userid,
pageid;
The below statement creates a new Changelog, called
userslogs2
, from an already existing Changelog with the name userslogs
. The timestamp
Changelog parameter, specified in the WITH
clause, is used to mark the registertime
column in userslogs2
as the timestamp column. Therefore, any subsequent query that refers to userslogs2
in its FROM
clause will use this column for time based operations.CREATE CHANGELOG userslogs2
WITH ('timestamp' = 'registertime') AS
SELECT userid, registertime, contactInfo['city'] AS city
FROM userslog;
The below statement creates a new Changelog, called
users_exactly_once
, from the already existing Changelog userslog
. The delivery.guarantee
Changelog parameter, specified in the WITH
clause, is used to override the default delivery.guarantee
of at_least_once
to exactly_once
. A user may want to use this configuration if their application can tolerate higher latencies but cannot tolerate duplicate outputs.CREATE CHANGELOG users_exactly_once
WITH ('delivery.guarantee'='exactly_once') AS
SELECT *
FROM userslog;
Last modified 13d ago