CREATE STREAM
Syntax
Description
A stream is a sequence of immutable, partitioned, and partially-ordered events.
Note In DeltaStream, the terms events and records are synonymous.
A stream is a relational representation of data in a streaming Store, such as the data in a Kafka topic or a Kinesis stream. The records in a stream are independent of each other; this means there is no correlation between two records in a stream.
A stream declares the schema of the records, which includes the column name along with the column type and optional constraints. A stream is a type of Relation. Each relation belongs to a Schema in a Database, so the fully-qualified name of the relation would be <database>.<schema>.<relation>
.
Arguments
stream_name
Specifies the name of the new stream. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.
column_name
The name of a column to be created in the new stream. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.
data_type
The data type of the column. This can include array specifiers. For more information on the data types supported by DeltaStream, see the Data Types reference.
NOT NULL
Defines a constraint on the column, ensuring it cannot contain NULL
values.
WITH (stream_parameter = value [, … ])
Optionally, this clause specifies Stream Parameters.
Stream Parameters
topic
Name of the #entity that has the data for this stream. If the entity doesn't exist, an entity with this name is created in the corresponding store
.
Required: No
Default value: Lowercase stream_name
Type: String
store
Name of the store that hosts the #entity for this stream. Required: No Default value: Current session's store name Type: String Valid values: See LIST STORES.
value.format
Format of message value in the #entity. See Data Formats (Serialization) for more information regarding serialization formats.
Required: Yes
Type: String
Valid values: JSON
, AVRO
, PROTOBUF
, PRIMITIVE
timestamp
Name of the column in the stream to use as the timestamp. If not set, the timestamp of the message is used for time based operations such as window aggregations and joins. If the type of this timestamp field is BIGINT
, it is expected that the values are epoch milliseconds UTC.
Required: No
Default value: Record's timestamp
Type: String
Valid values: Must be of type BIGINT
or TIMESTAMP
. See Data Types.
timestamp.format
The format to use for TIMESTAMP
typed fields. See Data Types.
Required: No
Default value: sql
Type: String
Valid values: sql
, iso8601
Kafka Specific Parameters
Parameters to be used if the associated Store is type KAFKA
:
topic.partitions
The number of partitions to use when creating the entity, if applicable. If the topic already exist, then this value must be equal to the number of partitions in the existing Kafka entity.
Required: Yes, unless topic already exists Default value: Leftmost source relation topic's partition count Type: Integer Valid values: [1, ...]
topic.replicas
The number of replicas to use when creating the topic, if applicable. If the topic already exists, then this value must be equal to the number of replicas in the existing Kafka entity.
Required: Yes, unless topic already exists Default values: Leftmost source relation topic's replica count Type: Integer Valid values: [1, ...]
key.format
Format of message key in the #entity. This value can be the same as or different from the value.format
. See Data Formats (Serialization) for more information regarding serialization formats.
Required: No, unless key.type
is set
Default value: None
Type: String
Valid values: JSON
, AVRO
, PROTOBUF
, PRIMITIVE
key.type
Declares the names and data types of key columns. The type is a STRUCT
when key.format
is a non-primitive value, e.g. 'key.type'='STRUCT<id BIGINT, name VARCHAR>'
. For primitive values, the type is one of the Primitive Data Types, e.g. 'key.type'='VARCHAR'
.
Required: No, unless key.format
is set
Default value: None
Type: String
Valid values: See STRUCT
in Data Types.
delivery.guarantee
The fault tolerance guarantees applied when producing to this stream.
Required: No
Default value: at_least_once
Type: String
Valid values:
exactly_once
: Produces to the stream using Kafka transactions. These transactions are committed when the query takes a checkpoint. On the consumer side, when setting the Kafka consumerisolation.level
configuration toread_committed
, only the committed records display. Since records aren't committed until the query takes a checkpoint, there is some additional delay when using this setting.at_least_once
: Ensures that records are output to the stream at least once. During query checkpointing, the query waits to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible, as the query attempts to reprocess old data.none
: There is no fault tolerance guarantee when producing to the stream. If there are issues on the Kafka broker then records may be lost; if there are issues with the query then output records may be duplicated.
Kinesis Specific Parameters
Parameters to be used if the associated Store is type KINESIS
:
topic.shards
The number of shards to use when creating the topic, if applicable. If the topic already exists, then this value must be equal to the number of shards in the existing Kinesis stream.
Required: Yes, unless topic already exists
Default values: Leftmost source relation topic's shard count
Type: Integer
Valid values: [1, ...]
Alias: kinesis.shards
Kinesis stores provide a delivery guarantee of at_least_once
when producing events into a sink #entity.
Format-Specific Parameters
Avro
Parameters to be used when writing records into a stream if associated key.format
or value.format
is avro
and the default Avro schema generation must be changed using a base schema for the key and/or value.
When generating an Avro schema for a column using a base schema:
if the base schema has a field with the same name and data type as that of the column, then the field's definition from the base is used in the generated schema. This includes retaining the base schema's
doc
andlogicalType
for the field.if the base schema has a field with the same name as that of the column, but has a different data type, then an Avro schema type definition is generated from the column's data type with the field's
doc
taken from the its corresponding field in the base schema.
Notes
Currently supported schema registries are Confluent Cloud and Confluent Platform.
Known limitation: Confluent Schema Registry must use the default TopicNameStrategy for creating subject names. See CREATE SCHEMA_REGISTRY for more details.
avro.base.schema.store
Name of the store whose schema registry contains the Avro schema subject(s) to be used as the base schema for generating Avro schema for stream's key and/or value.
Required: No Default values: Current session's store name Type: Identifier Valid values: See LIST STORES.
avro.base.subject.key
Name of the subject in the schema registry to obtain the base schema for generating Avro schema for stream's key.
Required: No, unless key.format
is set to avro
and key.type
is defined.
Type: String
avro.base.subject.value
Name of the subject in the schema registry to obtain the base schema for generating Avro schema for stream's value columns.
Required: No, unless value.format
is set to avro
.
Type: String
Examples
Create a new stream with timestamp column and key/value formats
The following creates a new stream with name pageviews_json
. This stream reads from an existing topic named pageviews
in the default store demostore
, and has a value.format
of JSON
. Additionally in the WITH
clause, we specify that this stream has a key of type VARCHAR
and uses the viewtime
column as its timestamp
:
Create a new stream in a specific store
The following creates a new stream pv_kinesis
. This stream reads from an existing topic named pageviews
in the store kinesis_store
:
Create a new stream without an existing entity
The following creates a new stream visit_count
. As its corresponding topic doesn't exist in the store kinesis_store
, it requires an additional topic parameter — for example, topic.shards
— to create the new Kinesis data stream pv_count
in the store:
Create a new stream for an existing entity
The following creates a new users
stream for the existing users
#entity in the current Store. This DDL implies that the name of the stream should be used as the name of the entity that hosts the records. This DDL also implies the original structure for the users
entity:
Create a new stream with case-sensitive columns
The following creates a new stream CaseSensitivePV
in the database DataBase
and schema Schema2
. This stream reads from a topic named case_sensitive_pageviews
in store OtherStore
and has a value.format
of AVRO and key.format
of PROTOBUF. As the key.format
is included, key.type
must be provided and the value in this example is STRUCT<pageid VARCHAR>
.
Note that many of the columns are in quotes, indicating they are case-sensitive. The case insensitive column named CaseInsensitiveCol
is lowercase as caseinsensitivecol
when the relation is created. In the parameters, the timestamp
for this relation is also specified; queries processing data using this relation as the source refer to the timestamp
column ViewTime
as the event's timestamp.
Create a new stream with `NOT NULL` column
The following creates a new stream, users
. Two columns in this stream are defined with the NOT NULL
constraint:
registertime
contactinfo
This means in any valid record from this stream, these two columns are not allowed to contain null values.
Create a new stream with format-specific properties for Avro
The following creates a new stream, usersInfo,
. The key and value of the records in this stream are in avro
format. The stream uses subjects from a store called sr_store
as the base Avro schema to generate Avro schema for usersInfo
's key and value. users_data-key
subject is used to generate the key's Avro schema; the users_data-value
subject is used to generate the value's Avro schema for the records written into usersInfo.
Last updated