Comment on page
CREATE CHANGELOG
CREATE CHANGELOG changelog_name (
column_name data_type [, ...],
PRIMARY KEY (column_name [, ...])
) WITH (changelog_parameter = value [, ...]);
A Changelog defines a
PRIMARY KEY
for records that is used to represent the change over time for records with the same PRIMARY KEY
. Records in a Changelog correlate with each other based on the PRIMARY KEY
. This means that a record in a Changelog either is an insert record if it’s the first time the record with the given PRIMARY KEY
is appended to the Changelog or upsert records if a previous record with the same PRIMARY KEY
has been inserted into the Changelog.This specifies the name of the new Changelog. For case-sensitive names, the name must be wrapped in double quotes; otherwise, the lowercase name will be used.
This is the name of a column to be created in the new Changelog. For case-sensitive names, the name must be wrapped in double quotes; otherwise, the lowercase name will be used.
This refers to the data type of the column, which can include array specifiers. For more information on the data types supported by DeltaStream, refer to the data types reference.
The
PRIMARY KEY
constraint specifies that column(s) of a table can contain only unique (non-duplicate), non-null values.Parameter Name | Description |
---|---|
topic | Name of the Topic that has the data for this Changelog. If the Topic doesn’t exist, a Topic with this name is created in the corresponding store .
Required: No
Default value: Lowercase changelog_name
Type: String |
store | Name of the store that hosts the Topic for this Changelog.
|
value.format | Format of message value in the Topic. See Data Formats (Serialization) for more information regarding serialization formats.
Required: Yes
Type: String
Valid values: JSON , AVRO , PROTOBUF , PRIMITIVE |
timestamp | Name of the column in the Changelog to use as the timestamp. If not set, the timestamp of the message is used for time based operations such as window aggregations and joins. If the type of this timestamp field is BIGINT , it is expected that the values are epoch milliseconds UTC.
Required: No
Default value: Record’s timestamp
Type: String
Valid values: Must be of type BIGINT or TIMESTAMP . See Data Types. |
timestamp.format | Required: No
Default value: sql
Type: String
Valid values: sql , iso8601 |
Parameter Name | Description |
---|---|
topic.partitions | The number of partitions to use when creating the Topic, if applicable. If the Topic already exists, then this value must be equal to the number of partitions in the existing Kafka Topic.
Required: Yes, unless Topic already exists
Default value: Leftmost source Relation Topic’s partition count
Type: Integer
Valid values: [1, ...] |
topic.replicas | The number of replicas to use when creating the Topic, if applicable. If the Topic already exists, then this value must be equal to the number of replicas in the existing Kafka Topic.
Required: Yes, unless Topic already exists
Default values: Leftmost source Relation Topic's replica count
Type: Integer
Valid values: [1, ...] |
key.format | Format of the message key in the Topic. This value can be the same as or different from the value.format . See Data Formats (Serialization) for more information regarding serialization formats.
Required: No, unless key.type is set
Default value: None
Type: String
Valid values: JSON , AVRO , PROTOBUF , PRIMITIVE |
key.type | Declares the names and data types of key columns. The type is a STRUCT when key.format is a non-primitive value, e.g. 'key.type'='STRUCT<id BIGINT, name VARCHAR>' . For primitive values, the type is one of the Primitive Data Types, e.g. 'key.type'='VARCHAR' .Required: No, unless key.format is set
Default value: None
Type: String
Valid values: See STRUCT in Data Types. |
delivery.guarantee | The fault tolerance guarantees applied when producing to this Changelog.
Required: No
Default value: at_least_once
Type: String
Valid values:
|
Parameter Name | Description |
---|---|
topic.shards | The number of shards to use when creating the Topic, if applicable. If the Topic already exists, then this value must be equal to the number of shards in the existing Kinesis Data Stream.
Required: Yes, unless Topic already exists
Default values: Leftmost source Relation topic’s shard count
Type: Integer
Valid values: [1, ...]
Alias: kinesis.shards |
Kinesis stores provide a delivery guarantee of
at_least_once
when producing events into a sink Topic.The following creates a new Changelog,
user_last_page
. This Changelog reads from a topic named pageviews
and has a value.format
of JSON
. Note that this query also specifies userid
as the PRIMARY KEY
for the Changelog:CREATE CHANGELOG user_last_page (
viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR,
PRIMARY KEY(userid)
)
WITH (
'topic'='pageviews',
'value.format'='json'
);
The following creates a new
users
Changelog for the existing users
Topic in the current Store. This DDL implies that the name of the Changelog should be used as the name of the Topic that hosts the records. This DDL also implies the original structure for the users
Topic with a PRIMARY KEY
for updates:CREATE CHANGELOG "users" (
registertime BIGINT,
userid VARCHAR,
regionid VARCHAR,
gender VARCHAR,
interests ARRAY<VARCHAR>,
contactinfo STRUCT<
phone VARCHAR,
city VARCHAR,
"state" VARCHAR,
zipcode VARCHAR>,
PRIMARY KEY(userid)
) WITH ( 'value.format'='json' );
The following creates a new Changelog,
pagevisits
. This Changelog reads from a Topic named pageviews
and has a value.format
of JSON
. Note that this query also specifies (userid, pageid)
as the PRIMARY KEY
for the Changelog:CREATE CHANGELOG pagevisits (
viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR,
PRIMARY KEY(userid, pageid)
) WITH ( 'topic'='pageviews', 'value.format'='json' );
The following creates a new Changelog,
LatestPageVisitor
, in the Database, DataBase
, and Schema, Schema2
. This Changelog reads from a topic named case_sensitive_pageviews
from the store OtherStore
and has a value.format
of Avro and a key.format
of PROTOBUF
. Since the key.format
is included, it is required that the key.type
is also provided and the value in this example is STRUCT<pageid VARCHAR>
. Note that this query also specifies PageId
as the PRIMARY KEY
for the Changelog, and many of the columns are in quotes, indicating they are case-sensitive. The case-insensitive column named CaseInsensitiveCol
will be lowercase as caseinsensitivecol
when the Relation is created. In the parameters, the timestamp
for this Relation is also specified so queries processing data using this Relation as the source will refer to the timestamp
column ViewTime
as the event’s timestamp:CREATE CHANGELOG "DataBase"."Schema2"."LatestPageVisitor" (
"ViewTime" BIGINT,
"userID" VARCHAR,
"PageId" VARCHAR,
"CaseSensitiveCol" BIGINT,
CaseInsensitiveCol BIGINT,
PRIMARY KEY("PageId")
) WITH (
'topic'='case_sensitive_pageviews',
'store'='OtherStore',
'value.format'='avro',
'key.format'='protobuf',
'key.type'='STRUCT<"PageId" VARCHAR>',
'timestamp'='ViewTime'
);
The following creates a new Changelog,
user_exactly_once
. This Changelog reads from a Topic named users
and has a delivery.guarantee
of exactly_once
. By specifying the delivery.guarantee
, we are overriding the default value of at_least_once
. A user may want to use this configuration if their application can tolerate higher latencies but cannot tolerate duplicate records. When this Changelog is used as the sink in an INSERT INTO query, the query will use the delivery.guarantee
specified here.CREATE CHANGELOG user_exactly_once (
viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR,
PRIMARY KEY(userid)
)
WITH (
'topic'='users',
'value.format'='json',
'delivery.guarantee'='exactly_once'
);
Last modified 13d ago