LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Region
      • SQL
      • Store
      • Database
      • Query
      • Visualizing Data Lineage
      • Function
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • Tutorials
    • Managing Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Creating Stores for Streaming Data
    • Using Multiple Stores in Queries
    • Creating Relations to Structure Raw Data
    • Namespacing with Database and Schema
    • Creating and Querying Materialized Views
    • Creating a Function
    • Securing Your Connections to Data Stores
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Integrations
      • Connecting to Confluent Cloud
      • Databricks
      • PostgreSQL
      • Snowflake
      • WarpStream
    • Serialization
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Reference
    • Enterprise Security Integrations
      • Okta SAML Integration
      • Okta SCIM Integration
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST REGIONS
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • Rest API
Powered by GitBook
On this page
  • Syntax
  • Description
  • Arguments
  • Stream Parameters
  • Kafka-Specific Parameters
  • Kinesis-Specific Parameters
  • Format-Specific Properties
  • Examples
  1. Reference
  2. SQL Syntax
  3. Query

CREATE STREAM AS SELECT

PreviousCREATE CHANGELOG AS SELECTNextCREATE TABLE AS SELECT

Last updated 25 days ago

Syntax

CREATE STREAM stream_name
[WITH (stream_parameter = value [, ... ])] 
AS select_statement
[PARTITION BY partition_by_clause];

Description

CREATE STREAM AS is essentially a combination of two statements:

  • A DDL statement that creates a new .

  • An INSERT INTO statement that runs a SELECT statement and adds the results into the newly-created stream.

Arguments

stream_name

This specifies the name of the new stream. Optionally, use <database_name>.<schema_name> as the prefix to the name to create the relation in that scope. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.

WITH (<stream_parameter> = <value> [, …​ ])

Optionally, this clause specifies #_stream_parameters.

select_statement

This statement specifies the SELECT statement to run.

PARTITION BY partition_by_clause

Optionally, this clause enables you to set the partition key of records according to their values for a given set of columns. The PARTITION BY clause in the statement defines a list of one or more columns (separated by commas) as partitioning columns. By default, the key for the sink's records have a data format equal to the sink's value data format. To set a specific key format, set the key.format stream parameter to specify a different key format. PARTITION BY is supported for CREATE STREAM AS SELECT and INSERT INTO queries where the sink is a stream. Currently, PARTITION BY only applies for queries whose sink stream is backed by a Kafka store.

Stream Parameters

Parameter Name
Description

topic

The name of the entity that has the data for a newly-created sink stream. If the entity doesn’t exist in the store, an entity with the stream_name is created in the corresponding store.

store

The name of the store that hosts the entity for this stream.

value.format

Required: No Default value: Value format from the leftmost source relation. Type: VALUE_FORMAT Valid values: JSON, AVRO, PROTOBUF, PRIMITIVE

timestamp

Name of the column in the stream to use as the timestamp. If not set, the timestamp of the message is used for time-based operations such as window aggregations and joins.

timestamp.format

Required: No Default value: sql Type: String Valid values: sql, iso8601

Kafka-Specific Parameters

Parameter Name
Description

topic.partitions

The number of partitions to use when creating the Kafka topic, if applicable.

Required: Yes, unless entity already exists. Default value: Leftmost source relation entity’s partition count. Type: Integer Valid values: [1, ...]

topic.replicas

The number of replicas to use when creating the Kafka topic, if applicable.

Required: Yes, unless entity already exists. Default values: Leftmost source relation entity's replica count. Type: Integer Valid values: [1, ...]

kafka.topic.*

key.format

Required: No Default value: Key format from the leftmost source relation’s key (if any) or the same as value.format. Type: KEY_FORMAT Valid values: JSON, AVRO, PROTOBUF, PRIMITIVE

key.type

delivery.guarantee

The fault tolerance guarantees applied when producing to this stream.

Required: No Default value: at_least_once Type: String Valid values:

  • at_least_once: Ensures that records are output to the stream at least once. During query checkpointing, the query waits to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible, as the query attempts to reprocess old data.

  • none: There is no fault tolerance guarantee when producing to the stream. If there are issues on the Kafka broker then records may be lost; if there are issues with the query then output records may be duplicated.

sink.timestamp.strategy

Determines how the timestamp values for records, written to a Kafka sink's entity, are set. Required: No Default value: proc.time Type: String Valid values:

  • event.time: Use timestamp of the records, coming from the source topic.

  • proc.time: Uses the current time of the Kafka producer when writing into the sink's entity. Note that the final timestamp used by Kafka depends on the timestamp type configured for the Kafka topic.

Kinesis-Specific Parameters

Parameter Name
Description

topic.shards

The number of shards to use when creating the Kinesis stream, if applicable.

Required: Yes, unless entity already exists. Default values: Leftmost source relation entity’s shard count. Type: Integer Valid values: [1, ...] Alias: kinesis.shards

Format-Specific Properties

Examples

Create a copy stream

The following creates a replica of the source stream.

CREATE STREAM pageviews_copy AS SELECT * FROM pageviews;

Create a stream with passthrough configuration for retention

CREATE STREAM us_customers WITH (
  'store' = 'kafka_store',
  'topic.partitions' = 1, 
  'topic.replicas' = 2, 
  'kafka.topic.retention.ms' = '172800000') 
AS SELECT * FROM customers WHERE region = 'US';

Create a stream in a specific schema within default database

The following creates a replica of the source stream, but the new relation belongs to the schema named schema2 in the session’s current database.

CREATE STREAM schema2.pageviews_copy AS SELECT * FROM pageviews;

Create stream in specific schema and database

The following creates a replica of the source stream, but the new relation belongs to the schema named schema2 in the database named db.

CREATE STREAM db.schema2.pageviews_copy AS SELECT * FROM pageviews;

Create a case-sensitive stream

The following creates a replica of the source stream. The new sink relation has a case-sensitive name.

CREATE STREAM "PageViews" AS SELECT * FROM pageviews;

Create a case-sensitive stream in a case-sensitive schema and database

The following creates a replica of the source stream. The new sink relation has a case-sensitive name and is in a case-sensitive database and schema.

CREATE STREAM "DataBase"."Schema"."PageViews :)" AS SELECT * FROM pageviews;

Create a new stream that is backed by a specific entity

The following creates a replica of the source stream, but this new stream is associated with the specified entity called pageviewstwo.

CREATE STREAM
  pageviews2
  WITH ('topic' = 'pageviewstwo')
AS SELECT * FROM pageviews;

Copy data from one store to another

The following moves data from a Kafka store to a Kinesis store. The query creates a replica of the source stream, but this new stream is associated with the specified store called kinesis_store.

CREATE STREAM pageviews_kinesis
  WITH ('store' = 'kinesis_store') AS 
SELECT * FROM pageviews_kafka;

Convert data from JSON to Avro with a Kafka store

The following creates a replica of the source stream that has a data format of JSON, but the new sink stream has a data format of Avro for its value and key.

CREATE STREAM
  pageviews_avro
  WITH ('value.format' = 'avro', 'key.format' = 'AVRO')
AS SELECT * FROM pageviews_json;

Convert data from JSON to Avro with a Kinesis store

The following creates a replica of the source stream that has a data format of JSON, but the new sink stream has a data format of Avro for its value. Since the sink is a Kinesis stream, there is no key associated with the record, and so the value.format property is the only one that is necessary.

CREATE STREAM
  pageviews_avro
  WITH ('store' = 'kinesis_store', 'value.format' = 'avro')
AS SELECT * FROM pageviews_json;

Simple projection to a Kafka topic with a specific number of partitions and replicas

The following is a simple projection query where the sink Kafka topic has a specific number of partitions and replicas set.

CREATE STREAM
  pageviews2
  WITH ('topic.partitions' = '5', 'topic.replicas' = '3')
AS SELECT
  viewtime AS vtime,
  pageid AS pid
FROM pageviews;

Simple projection to a Kinesis stream with a specific number of shards

The following is a simple projection query where the sink Kinesis stream has a specific number of shards set.

CREATE STREAM pageviews2
  WITH ('topic.shards' = '4') AS 
SELECT
  viewtime AS vtime,
  pageid AS pid
FROM pageviews;

Create a stream using an interval join

Interval joins between two streams result in a STREAM sink relation type.

CREATE STREAM pageviews_enriched AS
SELECT 
  p.userid AS pvid, 
  u.userid AS uid, 
  u.gender, 
  p.pageid, 
  u.interests[1] AS top_interest 
FROM 
  pageviews p JOIN users u WITHIN 5 minutes 
  ON u.userid = p.userid;

Create a stream using a temporal join

A temporal join of two relations where the left join side source is a stream and the right join side source is a changelog. This results in a STREAM output relation type. In the example below, a new stream called users_visits is created by performing a temporal join between the pageviews stream and the users_log changelog.

CREATE STREAM users_visits AS 
SELECT 
  p.userid AS pvid, 
  u.userid AS uid, 
  u.gender, 
  p.pageid, 
  u.interests[1] AS top_interest 
FROM 
  pageviews p JOIN users_log u 
  ON u.userid = p.userid;

Create a stream with specifying the timestamp column

The below statement creates a new stream, called pagestats, from the already existing stream pageviews. The timestamp stream parameter, specified in the WITH clause, marks the viewtime column in pagestats as the timestamp column. Therefore, any subsequent query that refers to pagestats in its FROM clause uses this column for time-based operations.

CREATE STREAM pagestats 
  WITH ('timestamp'='viewtime') AS
SELECT viewtime, pageid 
FROM pageviews;

Create a stream with specifying the Kafka delivery guarantee

The below statement creates a new stream, called pageviews_exactly_once, from the already existing stream pageviews. The delivery.guarantee stream parameter, specified in the WITH clause, overrides the default delivery.guarantee of at_least_once to exactly_once. You may wish to use this configuration if your use case can tolerate higher latencies but cannot tolerate duplicate outputs.

CREATE STREAM pageviews_exactly_once 
  WITH ('delivery.guarantee'='exactly_once') AS
SELECT *
FROM pageviews;

Create a stream with the PARTITION BY clause

The below statement creates a new stream, called pageviews_partition_by, from the already existing stream pageviews. The PARTITION BY clause sets the key type for the output pageviews_partition_by stream. Notice in this example the source stream's records don't set a key value and the sink stream has the PARTITION BY values as key. The sink stream's key data format is JSON in this example because it inherits the sink's value data format by default.

CREATE STREAM pageviews_partition_by AS
SELECT viewtime, userid AS `UID`, pageid 
FROM pageviews
PARTITION BY "UID", pageID;

Given this input for pageviews:

KEY     VALUE
{}	{"viewtime":1690327704650, "userid":"User_9", "pageid":"Page_11"}
{}	{"viewtime":1690327705651, "userid":"User_6", "pageid":"Page_94"}

You can expect the following output in pageviews_partition_by:

KEY                                     VALUE
{"UID":"User_9", "pageid":"Page_11"}	{"viewtime":1690327704650, "UID":"User_9", "pageid":"Page_11"}
{"UID":"User_6", "pageid":"Page_94"}	{"viewtime":1690327705651, "UID":"User_6", "pageid":"Page_94"}

Create a stream with the PARTITION BY clause to override existing key

The below statement creates a new stream, called pageviews_partition_by, from the already existing stream pageviews. The PARTITION BY clause sets the key type for the output pageviews_partition_by stream. Further, this query also sets the key.format property for the sink stream to be PRIMITIVE. Notice in this example the source stream's records have the pageid column value set as the key in JSON format and the output stream has the PARTITION BY value as key in the PRIMITIVE format.

CREATE STREAM pageviews_partition_by
WITH ('key.format'='PRIMITIVE') AS
SELECT viewtime, userid AS `UID`, pageid 
FROM pageviews
PARTITION BY "UID";

Given this input for pageviews:

KEY                     VALUE
{"pageid":"Page_11"}	{"viewtime":1690327704650, "userid":"User_9", "pageid":"Page_11"}
{"pageid":"Page_94"}	{"viewtime":1690327705651, "userid":"User_6", "pageid":"Page_94"}

We can expect the following output in pageviews_partition_by:

KEY             VALUE
"User_9"	{"viewtime":1690327704650, "UID":"User_9", "pageid":"Page_11"}
"User_6"	{"viewtime":1690327705651, "UID":"User_6", "pageid":"Page_94"}

Create a stream with `event.time` sink timestamp strategy

The below statement creates a new stream, called pageviews_copy, from the already existing stream pageviews by selecting all columns. The sink timestamp strategy is set to event.time; this way when writing to the sink's Kafka topic, the timestamp for each record in pageviews_copy is set to its source record's timestamp, coming from the entity that backs pageviews.

-- Assuming the sink Store is Kafka
CREATE STREAM pageviews_copy 
  WITH ('sink.timestamp.strategy' = 'event.time')
SELECT *
FROM pageviews;

Create a stream with format-specific properties for Avro

The following creates a new stream, usersInfo, by selecting records' key and value from another given stream users_avro. Assuming users_avro key and value are in avro, two subjects are provided to generate the Avro schemas for userInfo 's key and value. These subjects are stored in the schema registry of a store called sr_store. users_data-key subject generates key's Avro schema, and users_data-value subject generates value's Avro schema for the records written into usersInfo.

CREATE STREAM "usersInfo" 
WITH ('topic'='usersInfo', 
      'avro.base.store.name' = sr_store,
      'avro.base.subject.key' = 'users_data-key',
      'avro.base.subject.value' = 'users_data-value') AS
SELECT * FROM users_avro;  

Required: No Default value: Lowercase stream_name. Type: String Valid values: See

Required: No Default value: User’s default .

Type: String Valid values: See

Format of the message value in the . For more information regarding serialization formats see .

Required: No Default value: Record’s timestamp. Type: String Valid values: Must be of type BIGINT or TIMESTAMP. See .

The format to use for TIMESTAMP typed fields. See .

A configuration specific for the topic being created — for example, . Required: No Default value: None Type: String Valid values: Kafka topic configuration specific to the underlying type.

The format of a message key in the . For more information regarding serialization formats see .

Declares the names and data types of key columns. The type is a STRUCT when key.format is a non-primitive value, e.g. 'key.type'='STRUCT<id BIGINT, name VARCHAR>'. For primitive values, the type is one of the , such as 'key.type'='VARCHAR'.

Required: No, unless key.format is set and there is no default value. Default value: For certain query semantics (that is, queries using JOIN or GROUP BY), a generated key type is used by default. For queries that do not generate a key type, the key type from the leftmost source relation’s key is used by default (if any). See . Type: String Valid values: See STRUCT in

exactly_once: Produces to the stream using Kafka transactions. These transactions commit when the query takes a checkpoint. On the consumer side, when setting the to read_committed, only the committed records display. Since records aren't committed until the query takes a checkpoint, there is some additional delay when using this setting.

Kinesis stores provide a delivery guarantee of at_least_once when producing events into a sink .

All format-specific properties that are applicable to a stream can be provided as a stream_parameter. See for more details.

LIST ENTITIES
Store
LIST STORES
Data Types
Data Types
Row Key Definition
Data Types
Store
Kafka Entity Configuration for Confluent Platform
Kafka consumer isolation.level configuration
Data Formats (Serialization)
Data Formats (Serialization)
Format-Specific Parameters
#_stream
Primitive Data Types
#entity
#entity
#entity