LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Region
      • SQL
      • Store
      • Database
      • Query
      • Visualizing Data Lineage
      • Function
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • Tutorials
    • Managing Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Creating Stores for Streaming Data
    • Using Multiple Stores in Queries
    • Creating Relations to Structure Raw Data
    • Namespacing with Database and Schema
    • Creating and Querying Materialized Views
    • Creating a Function
    • Securing Your Connections to Data Stores
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Integrations
      • Connecting to Confluent Cloud
      • Databricks
      • PostgreSQL
      • Snowflake
      • WarpStream
    • Serialization
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Reference
    • Enterprise Security Integrations
      • Okta SAML Integration
      • Okta SCIM Integration
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST REGIONS
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • Rest API
Powered by GitBook
On this page
  • Syntax
  • Description
  • Arguments
  • Changelog Parameters
  • Kafka-Specific Parameters
  • Kinesis-Specific Parameters
  • Format-Specific Properties
  • Examples
  1. Reference
  2. SQL Syntax
  3. Query

CREATE CHANGELOG AS SELECT

PreviousChange Data Capture (CDC)NextCREATE STREAM AS SELECT

Last updated 25 days ago

Syntax

CREATE CHANGELOG changelog_name
[WITH (changelog_parameter = value [, ... ])]
AS select_statement;

Description

CREATE CHANGELOG AS is essentially a combination of two statements:

  • A DDL statement that creates a new .

  • An INSERT INTO statement that runs a SELECT statement and adds the results into the newly-created changelog.

Arguments

changelog_name

Specifies the name of the new changelog. Optionally, use <database_name>.<schema_name> as the prefix to the name to create the relation in that scope. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.

WITH (changelog_parameter = value [, …​ ])

Optionally, this clause specifies the #_changelog_parameters.

select_statement

This statement specifies the SELECT statement to run.

Changelog Parameters

Parameter Name
Description

topic

The name of the entity that has the data for a newly-created sink changelog. If the entity doesn’t exist in the store, an entity with this name is created, and an entity with the changelog_name is created in the corresponding store.

store

value.format

timestamp

Name of the column in the changelog to use as the timestamp. If not set, the timestamp of the message is used for time-based operations such as window aggregations and joins.

timestamp.format

Required: No Default value: sql Type: String Valid values: sql, iso8601

Kafka-Specific Parameters

Parameter Name
Description

topic.partitions

The number of partitions to use when creating the Kafka topic, if applicable.

Required: Yes, unless entity already exists. Default value: Leftmost source relation entity’s partition count. Type: Integer Valid values: [1, ...]

topic.replicas

The number of replicas to use when creating the Kafka topic, if applicable.

Required: Yes, unless entity already exists. Default values: Leftmost source relation entity’s replica count. Type: Integer Valid values: [1, ...]

kafka.topic.*

key.format

Required: No Default value: Key format from the leftmost source relation’s key (if any) or the same as value.format. Type: KEY_FORMAT Valid values: JSON, AVRO, PROTOBUF, PRIMITIVE

key.type

delivery.guarantee

The fault tolerance guarantees applied when producing to this changelog.

Required: No Default value: at_least_once Type: String Valid values:

  • at_least_once: Ensures that records are output to the changelog at least once. During query checkpointing, the query waits to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible as the query attempts to reprocess old data.

  • none: There is no fault tolerance guarantee when producing to the changelog. If there are issues on the Kafka broker then records may be lost; if there are issues with the query then output records may be duplicated.

sink.timestamp.strategy

Determines how the timestamp values for records, written to a Kafka sink's entity, are set. Required: No Default value: proc.time Type: String Valid values:

  • event.time: Use timestamp of the records, coming from the source topic.

  • proc.time: Uses the current time of the Kafka producer when writing into the sink's entity. Note that the final timestamp used by Kafka depends on the timestamp type configured for the Kafka topic.

Kinesis-Specific Parameters

Parameter Name
Description

topic.shards

The number of shards to use when creating the Kinesis stream, if applicable.

Required: Yes, unless entity already exists. Default values: Leftmost source relation entity’s shard count. Type: Integer Valid values: [1, ...] Alias: kinesis.shards

Format-Specific Properties

Examples

Create a copy changelog

The following creates a replica of the source changelog.

CREATE CHANGELOG users_clog AS SELECT * FROM users_log;

Create a changelog with passthrough configuration for retention

CREATE CHANGELOG us_customers_log WITH (
  'store' = 'kafka_store',
  'topic.partitions' = 1, 
  'topic.replicas' = 2, 
  'kafka.topic.retention.ms' = '172800000') 
AS SELECT * FROM customers_log WHERE region = 'US';

Create a changelog in a specific schema within a default database

The following creates a replica of the source changelog, but the new relation belongs to the schema named schema2 in the session’s current database.

CREATE CHANGELOG schema2.users_log_copy AS SELECT * FROM users_log;

Create a changelog in a specific schema and database

The following creates a replica of the source changelog, but the new relation belongs to the schema named schema2 in the database named db.

CREATE CHANGELOG db.schema2.users_log_copy AS SELECT * FROM users_log;

Create a case-sensitive changelog

The following creates a replica of the source changelog, and the new sink relation has a case-sensitive name.

CREATE CHANGELOG "Users" AS SELECT * FROM users_log;

Create a case-sensitive changelog in a case-sensitive schema and database

The following creates a replica of the source changelog. The new sink relation has a case-sensitive name and is in a case-sensitive database and schema.

CREATE CHANGELOG "DataBase"."Schema"."Users" AS SELECT * FROM users_log;

Create a new changelog backed by a specific entity

The following moves data from a Kafka store to a Kinesis store. The query creates a replica of the source changelog, but this new changelog is associated with the specified entity called userstwo.

CREATE CHANGELOG
  users2
  WITH ('topic' = 'userstwo')
AS SELECT * FROM users_log;

Copy data from one store to another

The following creates a replica of the source changelog, but this new changelog is associated with the specified store called kinesis_store.

CREATE CHANGELOG
  users_kinesis
  WITH ('store' = 'kinesis_store')
AS SELECT * FROM users_kafka;

Convert data from JSON to Avro

The following creates a replica of the source changelog that has a data format of JSON, but the new sink changelog has a data format of Avro for its value and key.

CREATE CHANGELOG users_avro
  WITH ('value.format' = 'avro', 'key.format' = 'AVRO') AS 
SELECT * FROM users_json;

Simple projection to a Kafka topic with a specific number of partitions and replicas

The following is a simple projection query where the sink Kafka topic has a specific number of partitions and replicas set.

CREATE CHANGELOG users2
  WITH ('topic.partitions' = '5', 'topic.replicas' = '3') AS 
SELECT
  registertime,
  userid AS uid,
  interests[1] AS top_interest
FROM users_log;

Simple projection to a Kinesis stream with a specific number of shards

The following is a simple projection query where the sink Kinesis stream has a specific number of shards set.

CREATE CHANGELOG
  users2
  WITH ('topic.shards' = '4')
AS SELECT
  registertime,
  userid AS uid,
  interests[1] AS top_interest
FROM users_log;

Create a changelog from aggregation

Aggregations of data on streams result in a CHANGELOG output relation type. The PRIMARY KEY for the following would be (userid).

CREATE CHANGELOG
  visitlogs
  WITH ('topic' = 'pglogs')
AS SELECT
  userid,
  count(pageid) AS pgcount
FROM pageviews
GROUP BY userid;

Create a changelog from HOP window aggregation

Aggregations of data on streams result in a CHANGELOG output relation type. The PRIMARY KEY for the following would be (window_start, window_end, userid, pageid).

CREATE CHANGELOG
  averagetime
AS SELECT 
  window_start, 
  window_end, 
  userid, 
  pageid, 
  avg(viewtime) AS avgtime 
FROM HOP(pageviews, size 8 second, advance by 4 seconds)
GROUP BY
  window_start, 
  window_end, 
  userid,
  pageid;

Create a changelog with specifying the timestamp column

The below statement creates a new changelog, called userslogs2, from an already existing changelog with the name userslogs. The timestamp changelog parameter, specified in the WITH clause, is used to mark the registertime column in userslogs2 as the timestamp column. Any subsequent query that refers to userslogs2 in its FROM clause uses this column for time-based operations.

CREATE CHANGELOG userslogs2
  WITH ('timestamp' = 'registertime') AS
SELECT userid, registertime, contactInfo['city'] AS city 
FROM userslog;

Create a changelog with specifying the Kafka delivery guarantee

The below statement creates a new changelog, called users_exactly_once, from the already existing changelog userslog. The delivery.guarantee changelog parameter, specified in the WITH clause, overrides the default delivery.guarantee of at_least_once to exactly_once. You may wish to use this configuration if your application can tolerate higher latencies but cannot tolerate duplicate outputs.

CREATE CHANGELOG users_exactly_once 
  WITH ('delivery.guarantee'='exactly_once') AS
SELECT *
FROM userslog;

Create a changelog with `event.time` sink timestamp strategy

The below statement creates a new changelog, called users_contact, from the already existing changelog userslog by selecting its primary key (for example, userid) and phone number for each user from the contactinfo struct. The sink timestamp strategy is set to event.time; this way, when writing to the sink's Kafka topic, the timestamp for each record in users_contact is set to its source record's timestamp, coming from the entity backing userslog.

-- Assuming the sink Store is Kafka
CREATE CHANGELOG users_contact 
  WITH ('sink.timestamp.strategy' = 'event.time') AS
SELECT userid, contactinfo->phone
FROM userslog;

Create a changelog with format-specific properties for Avro

The following creates a new changelog, usersInfo, by selecting records' key and value from another given changelog users_avro. Assuming users_avro key and value are in avro, two subjects are provided to generate the Avro schemas for userInfo 's key and value. These subjects are stored in the schema registry of a store called sr_store. users_data-key subject is used to generate key's Avro schema and users_data-value subject is used to generate value's Avro schema for the records written into usersInfo.

CREATE CHANGELOG "usersInfo" 
WITH ('topic'='usersInfo', 
      'avro.base.store.name' = sr_store,
      'avro.base.subject.key' = 'users_data-key',
      'avro.base.subject.value' = 'users_data-value') AS
SELECT * FROM users_avro;  

Required: No Default value: Lowercase changelog_name. Type: String Valid values: See

The name of the store that hosts the entity for this changelog. Required: No Default value: User’s default .

Type: String Valid values: See

Format of the message value in the . For more information regarding serialization formats, see . Required: No Default value: Value format from the leftmost source relation. Type: VALUE_FORMAT Valid values: JSON, AVRO, PROTOBUF, PRIMITIVE

Required: No Default value: Record’s timestamp. Type: String Valid values: Must be of type BIGINT or TIMESTAMP. See .

The format to use for TIMESTAMP-typed fields. See .

A configuration specific for the topic being created — for example, . Required: No Default value: None Type: String Valid values: Kafka topic configuration specific to the underlying type.

The format of a message key in the . For more information regarding serialization formats see .

Declares the names and data types of key columns. The type is a STRUCT when key.format is a non-primitive value — for example,'key.type'='STRUCT<id BIGINT, name VARCHAR>'. For primitive values, the type is one of the , e.g. 'key.type'='VARCHAR'.

Required: No, unless key.format is set and there is no default value. Default value: For certain query semantics (such as queries using JOIN or GROUP BY), a generated key type is used by default. For queries that do not generate a key type, the key type from the leftmost source relation’s key (if any) is used by default. See . Type: String Valid values: See STRUCT in

exactly_once: Produces to the changelog using Kafka transactions. These transactions commit when the query takes a checkpoint. On the consumer side, when setting the to read_committed, only the committed records display. Since records aren't committed until the query takes a checkpoint, there is some additional delay when you use this setting.

Kinesis stores provide a delivery guarantee of at_least_once when producing events into a sink .

All format-specific properties applicable to a changelog can be provided as a changelog_parameter. See for more details.

LIST ENTITIES
Store
LIST STORES
Data Types
Data Types
Row Key Definition
Data Types
#entity
Data Formats (Serialization)
#entity
Data Formats (Serialization)
#entity
#_changelog
Format-Specific Parameters
Primitive Data Types
Store
Kafka Entity Configuration for Confluent Platform
Kafka consumer isolation.level configuration