CREATE STREAM

Syntax

CREATE STREAM stream_name (
    column_name data_type [NOT NULL] [, ...]
) WITH (stream_parameter = value [, ...]);

Description

Arguments

stream_name

Specifies the name of the new Stream. For case-sensitive names, the name must be wrapped in double quotes, otherwise, the lowercased name will be used.

column_name

The name of a column to be created in the new Stream. For case-sensitive names, the name must be wrapped in double quotes, otherwise, the lowercased name will be used.

data_type

The data type of the column. This can include array specifiers. For more information on the data types supported by DeltaStream, refer to the Data Types reference.

NOT NULL

Defines a constraint on the column, ensuring it cannot contain NULL values.

WITH (stream_parameter = value [, …​ ])

Optionally, this clause specifies #_stream_parameters.

Stream Parameters

Kafka Specific Parameters

Parameters to be used if the associated Store is type KAFKA:

Kinesis Specific Parameters

Parameters to be used if the associated Store is type KINESIS:

Examples

Create a new Stream with timestamp column and key/value formats

The following creates a new Stream with name pageviews_json. This Stream reads from an existing topic named pageviews in the default store demostore, and has a value.format of JSON. Additionally in the WITH clause, we specify that this Stream has a key of type VARCHAR and uses the viewtime column as its timestamp:

demodb.public/demostore# LIST TOPICS;
      Topic name       
-----------------------
  pageviews            

demodb.public/demostore# CREATE STREAM pageviews_json (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR
) WITH (
   'topic'='pageviews',
   'value.format'='json',
   'key.format'='primitive',
   'key.type'='VARCHAR',
   'timestamp'='viewtime');

Create a new Stream in a specific Store

The following creates a new Stream pv_kinesis. This Stream reads from an existing topic named pageviews in the store kinesis_store:

demodb.public/demostore# CREATE STREAM pv_kinesis (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR
) WITH ( 'topic'='pageviews', 'store'='kinesis_store', 'value.format'='json' );

demodb.public/demostore# LIST STREAMS;
     Name     |  Type  |  Owner   |      Created at      |      Updated at       
--------------+--------+----------+----------------------+-----------------------
  pv_kinesis  | Stream | sysadmin | 2023-02-10T21:48:21Z | 2023-02-10T21:48:21Z  

Create a new Stream without an existing Topic

The following creates a new Stream visit_count, and since its corresponding topic doesn't exist in the store kinesis_store, it requires an additional topic parameter, i.e. topic.shards to create the new Kinesis Data Stream pv_count in the store:

demodb.public/kinesis_store# LIST TOPICS;
       Topic name         
--------------------------
  pageviews                          

demodb.public/kinesis_store# CREATE STREAM visit_count (userid VARCHAR, pgcount BIGINT) WITH ('topic'='pv_count', 'value.format'='json', 'topic.shards' = '1');
demodb.public/kinesis_store# LIST TOPICS;
       Topic name         
--------------------------
  pageviews                          
  pv_count
  
demodb.public/kinesis_store# DESCRIBE TOPIC visit_count;
     Name     | Shards | Descriptor  
--------------+--------+-------------
   pv_count   |      1 |             

demodb.public/kinesis_store# LIST STREAMS;
     Name     |  Type  |  Owner   |      Created at      |      Updated at       
--------------+--------+----------+----------------------+-----------------------
  visit_count | Stream | sysadmin | 2023-02-10T21:48:21Z | 2023-02-10T21:48:21Z  

Create a new Stream for an existing Topic

CREATE STREAM "users" (
    registertime BIGINT,
    userid VARCHAR,
    regionid VARCHAR,
    gender VARCHAR,
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<
        phone VARCHAR,
        city VARCHAR,
        "state" VARCHAR,
        zipcode VARCHAR>
) WITH ( 'value.format'='json' );

Create a new Stream with case-sensitive columns

The following creates a new Stream, CaseSensitivePV in the Database DataBase and Schema Schema2. This Stream reads from a topic named case_sensitive_pageviews in store OtherStore and has a value.format of AVRO and key.format of PROTOBUF. Since the key.format is included, it is required that key.type is also provided and the value in this example is STRUCT<pageid VARCHAR>. Note that many of the columns are in quotes, indicating they are case-sensitive. The case insensitive column named CaseInsensitiveCol will be lowercase as caseinsensitivecol when the Relation is created. In the parameters, the timestamp for this Relation is also specified so queries processing data using this Relation as the source will refer to the timestamp column ViewTime as the event's timestamp.

CREATE STREAM "DataBase"."Schema2"."CaseSensitivePV" (
   "ViewTime" BIGINT,
   "userID" VARCHAR,
   "PageId" VARCHAR,
   "CaseSensitiveCol" BIGINT,
   CaseInsensitiveCol BIGINT
)
WITH (
   'topic'='case_sensitive_pageviews',
   'store'='OtherStore',
   'value.format'='avro',
   'key.format'='protobuf',
   'key.type'='STRUCT<pageid VARCHAR>',
   'timestamp'='ViewTime'
);

Create a new Stream with `NOT NULL` column

The following creates a new Stream, users. Two columns in this Stream are defined with the NOT NULL constraint: registertime and contactinfo . This means in any valid record from this Stream, these two columns are not allowed to contain null values.

CREATE STREAM users (
    registertime BIGINT NOT NULL,
    userid VARCHAR, 
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR> NOT NULL
)
WITH (
   'topic'='users', 
    'key.format'='json', 
    'key.type'='STRUCT<userid VARCHAR>', 
    'value.format'='json'
);

Last updated