LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Region
      • SQL
      • Store
      • Database
      • Query
      • Visualizing Data Lineage
      • Function
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • Tutorials
    • Managing Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Creating Stores for Streaming Data
    • Using Multiple Stores in Queries
    • Creating Relations to Structure Raw Data
    • Namespacing with Database and Schema
    • Creating and Querying Materialized Views
    • Creating a Function
    • Securing Your Connections to Data Stores
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Integrations
      • Connecting to Confluent Cloud
      • Databricks
      • PostgreSQL
      • Snowflake
      • WarpStream
    • Serialization
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Reference
    • Enterprise Security Integrations
      • Okta SAML Integration
      • Okta SCIM Integration
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST REGIONS
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • Rest API
Powered by GitBook
On this page
  • Syntax
  • Description
  • Arguments
  • Stream Parameters
  • Kafka-Specific Parameters
  • Kinesis-Specific Parameters
  • Format-Specific Parameters
  • Examples
  1. Reference
  2. SQL Syntax
  3. DDL

CREATE STREAM

PreviousCREATE STORENextCREATE TABLE

Last updated 25 days ago

Syntax

CREATE STREAM stream_name (
    column_name data_type [NOT NULL] [, ...]
) WITH (stream_parameter = value [, ...]);

Description

A stream is a sequence of immutable, partitioned, and partially-ordered events.

Note In DeltaStream, the terms events and records are synonymous.

A stream is a relational representation of data in a streaming Store, such as the data in a Kafka topic or a Kinesis stream. The records in a stream are independent of each other; this means there is no correlation between two records in a stream.

A stream declares the schema of the records, which includes the column name along with the column type and optional constraints. A stream is a type of . Each relation belongs to a in a Database, so the fully-qualified name of the relation would be <database>.<schema>.<relation>.

Arguments

stream_name

Specifies the name of the new stream. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.

column_name

The name of a column to be created in the new stream. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.

data_type

The data type of the column. This can include array specifiers. For more information on the data types supported by DeltaStream, see the Data Types reference.

NOT NULL

Defines a constraint on the column, ensuring it cannot contain NULL values.

WITH (stream_parameter = value [, …​ ])

Optionally, this clause specifies Stream Parameters.

Stream Parameters

Parameter Name
Description

topic

store

value.format

timestamp

timestamp.format

Required: No Default value: sql Type: String Valid values: sql, iso8601

Kafka-Specific Parameters

Parameters to be used if the associated Store is type KAFKA:

Parameter Name
Description

topic.partitions

The number of partitions to use when creating the entity, if applicable. If the topic already exist, then this value must be equal to the number of partitions in the existing Kafka entity.

Required: Yes, unless topic already exists Default value: Leftmost source relation topic's partition count Type: Integer Valid values: [1, ...]

topic.replicas

The number of replicas to use when creating the topic, if applicable. If the topic already exists, then this value must be equal to the number of replicas in the existing Kafka entity.

Required: Yes, unless topic already exists Default values: Leftmost source relation topic's replica count Type: Integer Valid values: [1, ...]

kafka.topic.*

key.format

key.type

delivery.guarantee

The fault tolerance guarantees applied when producing to this stream.

Required: No Default value: at_least_once Type: String Valid values:

  • at_least_once: Ensures that records are output to the stream at least once. During query checkpointing, the query waits to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible, as the query attempts to reprocess old data.

  • none: There is no fault tolerance guarantee when producing to the stream. If there are issues on the Kafka broker then records may be lost; if there are issues with the query then output records may be duplicated.

Kinesis-Specific Parameters

Parameters to be used if the associated Store is type KINESIS:

Parameter Name
Description

topic.shards

The number of shards to use when creating the topic, if applicable. If the topic already exists, then this value must be equal to the number of shards in the existing Kinesis stream.

Required: Yes, unless topic already exists Default values: Leftmost source relation topic's shard count Type: Integer Valid values: [1, ...] Alias: kinesis.shards

Format-Specific Parameters

Avro

Parameters to be used when writing records into a stream if associated key.format or value.format is avro and the default Avro schema generation must be changed using a base schema for the key and/or value.

When generating an Avro schema for a column using a base schema:

  • if the base schema has a field with the same name and data type as that of the column, then the field's definition from the base is used in the generated schema. This includes retaining the base schema's doc and logicalType for the field.

  • if the base schema has a field with the same name as that of the column, but has a different data type, then an Avro schema type definition is generated from the column's data type with the field's doc taken from the its corresponding field in the base schema.

Notes

  • Currently supported schema registries are Confluent Cloud and Confluent Platform.

Parameter Name
Description

avro.base.schema.store

avro.base.subject.key

Required: No, unless key.format is set to avro and key.type is defined. Type: String

avro.base.subject.value

Required: No, unless value.format is set to avro . Type: String

Examples

Create a new stream with passthrough configuration for retention

CREATE STREAM customers (
  ts BIGINT, customer_id VARCHAR, full_name BIGINT, region VARCHAR
) WITH (
  'store' = 'kafka_store',
  'topic.partitions' = 1, 
  'topic.replicas' = 2, 
  'kafka.topic.retention.ms' = '172800000');

Create a new stream with timestamp column and key/value formats

The following creates a new stream with name pageviews_json. This stream reads from an existing topic named pageviews in the default store demostore, and has a value.format of JSON. Additionally in the WITH clause, we specify that this stream has a key of type VARCHAR and uses the viewtime column as its timestamp:

demodb.public/demostore# LIST ENTITIES;
+------------------------------------------+------------+
|  Name                                    |  Is Leaf   |
+==========================================+============+
| pageviews                                | true       |
+------------------------------------------+------------+
demodb.public/demostore# CREATE STREAM pageviews_json (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR
) WITH (
   'topic'='pageviews',
   'value.format'='json',
   'key.format'='primitive',
   'key.type'='VARCHAR',
   'timestamp'='viewtime');
+------------+------------------------------+------------+------------------------------------------+
|  Type      |  Name                        |  Command   |  Summary                                 |
+============+==============================+============+==========================================+
| stream     | demodb.public.pageviews_json | CREATE     | stream "pageviews_json" was              |
|            |                              |            | successfully created                     |
+------------+------------------------------+------------+------------------------------------------+

Create a new stream in a specific store

The following creates a new stream pv_kinesis. This stream reads from an existing topic named pageviews in the store kinesis_store:

demodb.public/demostore# CREATE STREAM pv_kinesis (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR
) WITH ( 'topic'='pageviews', 'store'='kinesis_store', 'value.format'='json' );
+------------+--------------------------+------------+------------------------------------------+
|  Type      |  Name                    |  Command   |  Summary                                 |
+============+==========================+============+==========================================+
| stream     | demodb.public.pv_kinesis | CREATE     | stream "pv_kinesis" was successfully     |
|            |                          |            | created                                  |
+------------+--------------------------+------------+------------------------------------------+
demodb.public/demostore# LIST STREAMS;
+------------+------------+------------+------------+------------------------------------------+-------------------------------+-------------------------------+
|  Name      |  Type      |  Owner     |  State     |  Properties                              |  Created At                   |  Updated At                   |
+============+============+============+============+==========================================+===============================+===============================+
| pv_kinesis | stream     | sysadmin   | created    | { "value.format": "json", "topic":       | 2024-07-02 21:28:35 +0000 UTC | 2024-07-02 21:28:36 +0000 UTC |
|            |            |            |            | "pageviews", "store":                    |                               |                               |
|            |            |            |            | "kinesis_store" }                        |                               |                               |
+------------+------------+------------+------------+------------------------------------------+-------------------------------+-------------------------------+

Create a new stream without an existing entity

The following creates a new stream visit_count. As its corresponding topic doesn't exist in the store kinesis_store, it requires an additional topic parameter — for example, topic.shards — to create the new Kinesis data stream pv_count in the store:

demodb.public/kinesis_store# LIST ENTITIES;
+------------------------------------------+------------+
|  Name                                    |  Is Leaf   |
+==========================================+============+
| pageviews                                | true       |
+------------------------------------------+------------+                  

demodb.public/kinesis_store# CREATE STREAM visit_count (userid VARCHAR, pgcount BIGINT) WITH ('topic'='pv_count', 'value.format'='json', 'topic.shards' = 1);
+------------+---------------------------+------------+------------------------------------------+
|  Type      |  Name                     |  Command   |  Summary                                 |
+============+===========================+============+==========================================+
| stream     | demodb.public.visit_count | CREATE     | stream "visit_count" was successfully    |
|            |                           |            | created                                  |
+------------+---------------------------+------------+------------------------------------------+
demodb.public/kinesis_store# LIST ENTITIES;
+------------------------------------------+------------+
|  Name                                    |  Is Leaf   |
+==========================================+============+
| pageviews                                | true       |
| pv_count                                 | true       |
+------------------------------------------+------------+
  
demodb.public/kinesis_store# DESCRIBE ENTITY visit_count;
+------------------+------------+-------------+
|  Name            |  Shards    |  Descriptor |
+==================+============+=============+
| pageviews_one_kb | 1          | <null>      |
+------------------+------------+-------------+

demodb.public/kinesis_store# LIST STREAMS;
+-------------+------------+------------+------------+------------------------------------------+-------------------------------+-------------------------------+
|  Name       |  Type      |  Owner     |  State     |  Properties                              |  Created At                   |  Updated At                   |
+=============+============+============+============+==========================================+===============================+===============================+
| visit_count | stream     | sysadmin   | created    | { "topic.shards": 1, "value.format":     | 2024-07-02 21:32:20 +0000 UTC | 2024-07-02 21:32:32 +0000 UTC |
|             |            |            |            | "json", "topic": "pv_count", "store":    |                               |                               |
|             |            |            |            | "kinesis_iam_store" }                    |                               |                               |
+-------------+------------+------------+------------+------------------------------------------+-------------------------------+-------------------------------+

Create a new stream for an existing entity

CREATE STREAM "users" (
    registertime BIGINT,
    userid VARCHAR,
    regionid VARCHAR,
    gender VARCHAR,
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<
        phone VARCHAR,
        city VARCHAR,
        "state" VARCHAR,
        zipcode VARCHAR>
) WITH ( 'value.format'='json' );

Create a new stream with case-sensitive columns

The following creates a new stream CaseSensitivePV in the database DataBase and schema Schema2. This stream reads from a topic named case_sensitive_pageviews in store OtherStore and has a value.format of AVRO and key.format of PROTOBUF. As the key.format is included, key.type must be provided and the value in this example is STRUCT<pageid VARCHAR>.

Note that many of the columns are in quotes, indicating they are case-sensitive. The case insensitive column named CaseInsensitiveCol is lowercase as caseinsensitivecol when the relation is created. In the parameters, the timestamp for this relation is also specified; queries processing data using this relation as the source refer to the timestamp column ViewTime as the event's timestamp.

CREATE STREAM "DataBase"."Schema2"."CaseSensitivePV" (
   "ViewTime" BIGINT,
   "userID" VARCHAR,
   "PageId" VARCHAR,
   "CaseSensitiveCol" BIGINT,
   CaseInsensitiveCol BIGINT
)
WITH (
   'topic'='case_sensitive_pageviews',
   'store'='OtherStore',
   'value.format'='avro',
   'key.format'='protobuf',
   'key.type'='STRUCT<pageid VARCHAR>',
   'timestamp'='ViewTime'
);

Create a new stream with `NOT NULL` column

The following creates a new stream, users. Two columns in this stream are defined with the NOT NULL constraint:

  1. registertime

  2. contactinfo

This means in any valid record from this stream, these two columns are not allowed to contain null values.

CREATE STREAM "users" (
    registertime BIGINT NOT NULL,
    userid VARCHAR, 
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR> NOT NULL
)
WITH (
   'topic'='users', 
    'key.format'='json', 
    'key.type'='STRUCT<userid VARCHAR>', 
    'value.format'='json'
);

Create a new stream with format-specific properties for Avro

The following creates a new stream, usersInfo,. The key and value of the records in this stream are in avro format. The stream uses subjects from a store called sr_store as the base Avro schema to generate Avro schema for usersInfo's key and value. users_data-key subject is used to generate the key's Avro schema; the users_data-value subject is used to generate the value's Avro schema for the records written into usersInfo.

CREATE STREAM "usersInfo" (
    registertime BIGINT NOT NULL,
    userid VARCHAR, 
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR> NOT NULL
)
WITH (
    'topic'='usersInfo', 
    'key.format'='avro',
    'key.type'='STRUCT<userid VARCHAR>', 
    'value.format'='avro',
    'avro.base.store.name' = sr_store,
    'avro.base.subject.key' = 'users_data-key',
    'avro.base.subject.value' = 'users_data-value'
);

Name of the that has the data for this stream. If the entity doesn't exist, an entity with this name is created in the corresponding store. Required: No Default value: Lowercase stream_name Type: String

Name of the store that hosts the for this stream. Required: No Default value: Current session's store name Type: String Valid values: See .

Format of message value in the . See for more information regarding serialization formats. Required: Yes Type: String Valid values: JSON, AVRO, PROTOBUF, PRIMITIVE

Name of the column in the stream to use as the timestamp. If not set, the timestamp of the message is used for time based operations such as window aggregations and joins. If the type of this timestamp field is BIGINT, it is expected that the values are epoch milliseconds UTC. Required: No Default value: Record's timestamp Type: String Valid values: Must be of type BIGINT or TIMESTAMP. See .

The format to use for TIMESTAMP typed fields. See .

A configuration specific for the topic being created — for example, . Required: No Default value: None Type: String Valid values: Kafka topic configuration specific to the underlying type.

Format of message key in the . This value can be the same as or different from the value.format. See for more information regarding serialization formats. Required: No, unless key.type is set Default value: None Type: String Valid values: JSON, AVRO, PROTOBUF, PRIMITIVE

Declares the names and data types of key columns. The type is a STRUCT when key.format is a non-primitive value, e.g. 'key.type'='STRUCT<id BIGINT, name VARCHAR>'. For primitive values, the type is one of the , e.g. 'key.type'='VARCHAR'.

Required: No, unless key.format is set Default value: None Type: String Valid values: See STRUCT in .

exactly_once: Produces to the stream using Kafka transactions. These transactions are committed when the query takes a checkpoint. On the consumer side, when setting the to read_committed, only the committed records display. Since records aren't committed until the query takes a checkpoint, there is some additional delay when using this setting.

Kinesis stores provide a delivery guarantee of at_least_once when producing events into a sink .

Known limitation: Confluent Schema Registry must use the default for creating subject names. See CREATE SCHEMA_REGISTRY for more details.

Name of the store whose contains the Avro schema subject(s) to be used as the base schema for generating Avro schema for stream's key and/or value.

Required: No Default values: Current session's store name Type: Identifier Valid values: See .

Name of the subject in the to obtain the base schema for generating Avro schema for stream's key.

Name of the subject in the to obtain the base schema for generating Avro schema for stream's value columns.

The following creates a new users stream for the existing users in the current Store. This DDL implies that the name of the stream should be used as the name of the entity that hosts the records. This DDL also implies the original structure for the users entity:

TopicNameStrategy
Data Types
Data Types
Data Types
LIST STORES
Store
Kafka Entity Configuration for Confluent Platform
Kafka consumer isolation.level configuration
LIST STORES
Data Formats (Serialization)
Data Formats (Serialization)
#entity
#entity
#entity
#entity
#entity
#entity
schema registry
schema registry
schema registry
Relation
Schema
Primitive Data Types