CREATE CHANGELOG
Syntax
Description
A changelog is a sequence of partitioned and partially-ordered events. It's a relational representation of data in the streaming stores, such as the data in a Apache Kafka topic or an Amazon Kinesis stream.
Note DeltaStream uses the terms events and records synonymously.
A changelog defines a PRIMARY KEY
for records that is used to represent the change over time for records with the same PRIMARY KEY
. Records in a changelog correlate with each other based on the PRIMARY KEY
. This means a record in a changelog is either an insert record or an upsert record.
It's an insert record if it’s the first time the record with the given
PRIMARY KEY
is appended to the changelogIt's an upsert record if a previous record with the same
PRIMARY KEY
has been inserted into the changelog.
In Deltastream a changelog is a type of relation. Each relation belongs to a schema in a database, so the fully-qualified name of the relation would be <database>.<schema>.<relation>
.
Arguments
changelog_name
This specifies the name of the new changelog. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lowercase name.
column_name
This is the name of a column to be created in the new changelog. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lowercase name.
data_type
This refers to the data type of the column, which can include array specifiers. For more information on the data types supported by DeltaStream, refer to the data types page.
NOT NULL
Defines a constraint on the column, ensuring it cannot contain NULL
values.
PRIMARY KEY (column_name [, …])
The PRIMARY KEY
constraint specifies that column(s) of a table can contain only unique (non-duplicate), non-null values.
WITH (changelog_parameter = value [, … ])
This clause specifies Changelog Parameters.
Changelog Parameters
topic
Required: No
Default value: Lowercase changelog_name
Type: String
store
Name of the store that hosts the entity for this changelog.
Required: No Default value: User’s default store name Type: String Valid values: See LIST STORES.
value.format
Required: Yes
Type: String
Valid values: JSON
, AVRO
, PROTOBUF
, PRIMITIVE
timestamp
Name of the column in the changelog to use as the timestamp. If not set, the timestamp of the message is used for time-based operations such as window aggregations and joins. If the type of this timestamp field is BIGINT
, DeltaStream expects the values in epoch milliseconds UTC.
Required: No
Default value: Record’s timestamp
Type: String
Valid values: Must be of type BIGINT
or TIMESTAMP
. See Data Types.
timestamp.format
The format to use for TIMESTAMP
typed fields. See Data Types.
Required: No
Default value: sql
Type: String
Valid values: sql
, iso8601
Kafka Specific Parameters
Parameters to be used if the associated Store is type KAFKA
:
topic.partitions
The number of partitions to use when creating the entity, if applicable. If the entity already exists, then this value must be equal to the number of partitions in the existing Kafka entity.
Required: Yes, unless entity already exists Default value: Leftmost source relation Entity’s partition count Type: Integer Valid values: [1, ...]
topic.replicas
The number of replicas to use when creating the entity, if applicable. If the entity already exists, then this value must be equal to the number of replicas in the existing Kafka entity.
Required: Yes, unless entity already exists Default values: Leftmost source relation Entity's replica count Type: Integer Valid values: [1, ...]
key.format
Required: No, unless key.type
is set
Default value: None
Type: String
Valid values: JSON
, AVRO
, PROTOBUF
, PRIMITIVE
key.type
Required: No, unless key.format
is set
Default value: None
Type: String
Valid values: See STRUCT
in Data Types.
delivery.guarantee
The fault tolerance guarantees applied when producing to this changelog.
Required: No
Default value: at_least_once
Type: String
Valid values:
exactly_once
: Produces to the changelog using Kafka transactions. These transactions are committed when the query takes a checkpoint. On the consumer side, when setting the Kafka consumerisolation.level
configuration toread_committed
, only the committed records are displayed. Since records aren’t committed until the query takes a checkpoint, there is some additional delay when using this setting.at_least_once
: Ensures that records are output to the changelog at least once. During query checkpointing, the query waits to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible as the query will try to reprocess old data.none
: There is no fault tolerance guarantee when producing to the changelog. If there are issues on the Kafka broker, then records may be lost, and if there are issues with the query then output records may be duplicated.
Kinesis Specific Parameters
Parameters to be used if the associated Store is type KINESIS
:
topic.shards
The number of shards to use when creating the entity, if applicable. If the entity already exists, this value must be equal to the number of shards in the existing Kinesis data stream.
Required: Yes, unless entity already exists
Default values: Leftmost source relation topic’s shard count
Type: Integer
Valid values: [1, ...]
Alias: kinesis.shards
Format Specific Parameters
Avro
Parameters to be used when writing records into a changelog if associated key.format
or value.format
is avro
and the default Avro schema generation must be changed using a base schema for the key and/or value.
When generating an Avro schema for a column using a base schema:
if the base schema has a field with the same name and data type as those of the column, then the field's definition from the base is used in the generated schema. This includes retaining the base schema's
doc
andlogicalType
for the field.if the base schema has a field with the same name as that of the column but has a different data type, then an Avro schema type definition is generated from the column's data type with the field's
doc
taken from the its corresponding field in the base schema.
Note Currently supported schema registries are Confluent Cloud and Confluent Platform.
Known limitation: Confluent schema registry must use the default TopicNameStrategy for creating subject names.
Check CREATE SCHEMA_REGISTRY for more details.
avro.base.schema.store
Name of the store whose schema registry contains the Avro schema subject(s) to be used as the base schema for generating the Avro schema for the changelog's key and/or value.
Required: No Default values: Current session's store name Type: Identifier Valid values: See LIST STORES.
avro.base.subject.key
Name of the subject in the schema registry to obtain the base schema for generating Avro schema for changelog's key.
Required: No, unless key.format
is set to avro
and key.type
is defined.
Type: String
avro.base.subject.value
Name of the subject in the schema registry to obtain the base schema for generating Avro schema for changelog's value columns.
Required: No, unless value.format
is set to avro
.
Type: String
Examples
Create a new changelog
The following creates a new changelog, user_last_page
. This changelog reads from a topic named pageviews
and has a value.format
of JSON
. Note that this query also specifies userid
as the PRIMARY KEY
for the changelog:
Create a new changelog for an existing entity
Create a new changelog with a multi-column primary key
The following creates a new changelog, pagevisits
. This changelog reads from an entity named pageviews
and has a value.format
of JSON
. Note that this query also specifies (userid, pageid)
as the PRIMARY KEY
for the changelog:
Create a new changelog with specifying key and timestamp
The following creates a new changelog, LatestPageVisitor
, in the database, DataBase
, and schema, Schema2
. This changelog reads from a topic named case_sensitive_pageviews
from the store OtherStore
and has a value.format
of Avro and a key.format
of PROTOBUF
. Since the key.format
is included, it also requires the key.type
and the value in this example is STRUCT<pageid VARCHAR>
. This query also specifies PageId
as the PRIMARY KEY
for the changelog. Also, many of the columns are in quotes, indicating they are case-sensitive. The case-insensitive column named CaseInsensitiveCol
is in lowercase as caseinsensitivecol
when the relation is created. In the parameters, the timestamp
for this relation is also specified, so queries processing data using this relation as the source refer to the timestamp
column ViewTime
as the event’s timestamp:
Create a new changelog specifying Kafka delivery guarantee
The following creates a new changelog, user_exactly_once
. This changelog reads from an entity named users
and has a delivery.guarantee
of exactly_once
. By specifying the delivery.guarantee
, you override the default value of at_least_once
. You may wish to use this configuration if your application can tolerate higher latencies but cannot tolerate duplicate records. When you use this changelog as the sink in an INSERT INTO query, the query uses the delivery.guarantee
specified here.
Create a new changelog with `NOT NULL` column
The following creates a new changelog, users_log
. Two columns in this changelog are defined with the NOT NULL
constraint: registertime
and contactinfo
. As a result these two columns are not allowed to contain null values in any valid record from this changelog.
Create a new changelog with format specific properties for Avro
The following creates a new changelog, usersInfo,
whose records' key and value are in avro
format. It uses subjects from a store called sr_store
as the base Avro schema to generate Avro schema for usersInfo
's key and value. It uses users_data-key
subject to generate key's Avro schema. It also uses users_data-value
subject to generate the value's Avro schema for the records written into usersInfo.
Last updated