Comment on page
CREATE STREAM
CREATE STREAM stream_name (
column_name data_type [, ...]
) WITH (stream_parameter = value [, ...]);
A Stream is a sequence of immutable, partitioned, and partially ordered events (we use events and records synonymously). A Stream is a relational representation of data in streaming Store, such as the data in a Kafka topic or a Kinesis stream. The records in a Stream are independent of each other, meaning there is no correlation between two records in a Stream. A Stream declares the schema of the records, which includes the column name along with the column type and optional constraints. A Stream is a type of Relation. Each Relation belongs to a Schema in a Database, so the fully qualified name of the Relation would be
<database>.<schema>.<relation>
.Specifies the name of the new Stream. For case-sensitive names, the name must be wrapped in double quotes, otherwise, the lowercased name will be used.
The name of a column to be created in the new Stream. For case-sensitive names, the name must be wrapped in double quotes, otherwise, the lowercased name will be used.
The data type of the column. This can include array specifiers. For more information on the data types supported by DeltaStream, refer to the Data Types reference.
Parameter Name | Description |
---|---|
topic | Name of the Topic that has the data for this Stream. If the Topic doesn't exist, a Topic with this name is created in the corresponding store .
Required: No
Default value: Lowecase stream_name
Type: String |
store | Name of the store that hosts the Topic for this Stream.
Required: No
Default value: Current session's store name
Type: String
Valid values: See LIST STORES. |
value.format | Format of message value in the Topic. See Data Formats (Serialization) for more information regarding serialization formats.
Required: Yes
Type: String
Valid values: JSON , AVRO , PROTOBUF , PRIMITIVE |
timestamp | Name of the column in the Stream to use as the timestamp. If not set, the timestamp of the message is used for time based operations such as window aggregations and joins. If the type of this timestamp field is BIGINT , it is expected that the values are epoch milliseconds UTC.
Required: No
Default value: Record's timestamp
Type: String
Valid values: Must be of type BIGINT or TIMESTAMP . See Data Types. |
timestamp.format | Required: No
Default value: sql
Type: String
Valid values: sql , iso8601 |
Parameter Name | Description |
---|---|
topic.partitions | The number of partitions to use when creating the Topic, if applicable. If the topic already exist, then this value must be equal to the number of partitions in the existing Kafka Topic.
Required: Yes, unless topic already exists
Default value: Leftmost source Relation topic's partition count
Type: Integer
Valid values: [1, ...] |
topic.replicas | The number of replicas to use when creating the topic, if applicable. If the topic already exists, then this value must be equal to the number of replicas in the existing Kafka Topic.
Required: Yes, unless topic already exists
Default values: Leftmost source Relation topic's replica count
Type: Integer
Valid values: [1, ...] |
key.format | Format of message key in the Topic. This value can be the same as or different from the value.format . See Data Formats (Serialization) for more information regarding serialization formats.
Required: No, unless key.type is set
Default value: None
Type: String
Valid values: JSON , AVRO , PROTOBUF , PRIMITIVE |
key.type | Declares the names and data types of key columns. The type is a STRUCT when key.format is a non-primitive value, e.g. 'key.type'='STRUCT<id BIGINT, name VARCHAR>' . For primitive values, the type is one of the Primitive Data Types, e.g. 'key.type'='VARCHAR' .
Required: No, unless key.format is set
Default value: None
Type: String
Valid values: See STRUCT in Data Types. |
delivery.guarantee | The fault tolerance guarantees applied when producing to this Stream.
Required: No
Default value: at_least_once
Type: String
Valid values:
|
Parameter Name | Description |
---|---|
topic.shards | The number of shards to use when creating the topic, if applicable. If the topic already exists, then this value must be equal to the number of shards in the existing Kinesis Stream.
Required: Yes, unless topic already exists
Default values: Leftmost source Relation topic's shard count
Type: Integer
Valid values: [1, ...]
Alias: kinesis.shards |
Kinesis stores provide a delivery guarantee of
at_least_once
when producing events into a sink Topic.The following creates a new Stream with name
pageviews_json
. This Stream reads from an existing topic named pageviews
in the default store demostore
, and has a value.format
of JSON
. Additionally in the WITH
clause, we specify that this Stream has a key of type VARCHAR
and uses the viewtime
column as its timestamp
:demodb.public/demostore# LIST TOPICS;
Topic name
-----------------------
pageviews
demodb.public/demostore# CREATE STREAM pageviews_json (
viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR
) WITH (
'topic'='pageviews',
'value.format'='json',
'key.format'='primitive',
'key.type'='VARCHAR',
'timestamp'='viewtime');
The following creates a new Stream
pv_kinesis
. This Stream reads from an existing topic named pageviews
in the store kinesis_store
:demodb.public/demostore# CREATE STREAM pv_kinesis (
viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR
) WITH ( 'topic'='pageviews', 'store'='kinesis_store', 'value.format'='json' );
demodb.public/demostore# LIST STREAMS;
Name | Type | Owner | Created at | Updated at
--------------+--------+----------+----------------------+-----------------------
pv_kinesis | Stream | sysadmin | 2023-02-10T21:48:21Z | 2023-02-10T21:48:21Z
The following creates a new Stream
visit_count
, and since its corresponding topic doesn't exist in the store kinesis_store
, it requires an additional topic parameter, i.e. topic.shards
to create the new Kinesis Data Stream pv_count
in the store:demodb.public/kinesis_store# LIST TOPICS;
Topic name
--------------------------
pageviews
demodb.public/kinesis_store# CREATE STREAM visit_count (userid VARCHAR, pgcount BIGINT) WITH ('topic'='pv_count', 'value.format'='json', 'topic.shards' = '1');
demodb.public/kinesis_store# LIST TOPICS;
Topic name
--------------------------
pageviews
pv_count
demodb.public/kinesis_store# DESCRIBE TOPIC visit_count;
Name | Shards | Descriptor
--------------+--------+-------------
pv_count | 1 |
demodb.public/kinesis_store# LIST STREAMS;
Name | Type | Owner | Created at | Updated at
--------------+--------+----------+----------------------+-----------------------
visit_count | Stream | sysadmin | 2023-02-10T21:48:21Z | 2023-02-10T21:48:21Z
CREATE STREAM "users" (
registertime BIGINT,
userid VARCHAR,
regionid VARCHAR,
gender VARCHAR,
interests ARRAY<VARCHAR>,
contactinfo STRUCT<
phone VARCHAR,
city VARCHAR,
"state" VARCHAR,
zipcode VARCHAR>
) WITH ( 'value.format'='json' );
The following creates a new Stream,
CaseSensitivePV
in the Database DataBase
and Schema Schema2
. This Stream reads from a topic named case_sensitive_pageviews
in store OtherStore
and has a value.format
of AVRO and key.format
of PROTOBUF. Since the key.format
is included, it is required that key.type
is also provided and the value in this example is STRUCT<pageid VARCHAR>
. Note that many of the columns are in quotes, indicating they are case-sensitive. The case insensitive column named CaseInsensitiveCol
will be lowercase as caseinsensitivecol
when the Relation is created. In the parameters, the timestamp
for this Relation is also specified so queries processing data using this Relation as the source will refer to the timestamp
column ViewTime
as the event's timestamp.CREATE STREAM "DataBase"."Schema2"."CaseSensitivePV" (
"ViewTime" BIGINT,
"userID" VARCHAR,
"PageId" VARCHAR,
"CaseSensitiveCol" BIGINT,
CaseInsensitiveCol BIGINT
)
WITH (
'topic'='case_sensitive_pageviews',
'store'='OtherStore',
'value.format'='avro',
'key.format'='protobuf',
'key.type'='STRUCT<pageid VARCHAR>',
'timestamp'='ViewTime'
);
Last modified 13d ago