Comment on page
CREATE STREAM AS SELECT
CREATE STREAM
stream_name
[WITH (stream_parameter = value [, ... ])]
AS select_statement
[PARTITION BY partition_by_clause];
CREATE STREAM AS
is essentially a combination of two statements:- An INSERT INTO statement that runs a SELECT statement and adds the results into the newly created Stream.
This specifies the name of the new Stream. Optionally, use
<database_name>.<schema_name>
as the prefix to the name to create the Relation in that scope. For case-sensitive names, the name must be wrapped in double quotes; otherwise, the lowercase name will be used.Optionally, this clause allows the user to set the partition key of records according to their values for a given set of columns. The
PARTITION BY
clause in the statement defines a list of one or more columns (separated by commas) as partitioning columns. By default, the key for the sink's records will have a data format equal to the sink's value data format. To set a specific key format, Set the key.format
Stream parameter to specify a different key format. PARTITION BY
is supported for CREATE STREAM AS SELECT and INSERT INTO queries where the sink is a Stream. Currently, PARTITION BY
only applies for queries whose sink Stream is backed by a Kafka store.Parameter Name | Description |
---|---|
topic | The name of the Topic that has the data for a newly created sink Stream. If the Topic doesn’t exist in the Store, a Topic with the stream_name is created in the corresponding Store.
|
store | The name of the Store that hosts the Topic for this Stream.
|
value.format | Format of the message value in the Topic. See Data Formats (Serialization) for more information regarding serialization formats.
Required: No
Default value: Value format from the leftmost source Relation.
Type: VALUE_FORMAT
Valid values: JSON , AVRO , PROTOBUF , PRIMITIVE |
timestamp | Name of the column in the Stream to use as the timestamp. If not set, the timestamp of the message is used for time based operations such as window aggregations and joins.
Required: No
Default value: Record’s timestamp.
Type: String
Valid values: Must be of type BIGINT or TIMESTAMP . See Data Types. |
timestamp.format | Required: No
Default value: sql
Type: String
Valid values: sql , iso8601 |
Parameter Name | Description |
---|---|
topic.partitions | The number of partitions to use when creating the Kafka topic, if applicable.
Required: Yes, unless Topic already exists.
Default value: Leftmost source Relation Topic’s partition count.
Type: Integer
Valid values: [1, ...] |
topic.replicas | The number of replicas to use when creating the Kafka topic, if applicable.
Required: Yes, unless Topic already exists.
Default values: Leftmost source Relation Topic's replica count.
Type: Integer
Valid values: [1, ...] |
key.format | The format of a message key in the Topic. See Data Formats (Serialization) for more information regarding serialization formats.
Required: No
Default value: Key format from the leftmost source Relation’s key (if any) or the same as value.format .
Type: KEY_FORMAT
Valid values: JSON , AVRO , PROTOBUF , PRIMITIVE |
key.type | Declares the names and data types of key columns. The type is a STRUCT when key.format is a non-primitive value, e.g. 'key.type'='STRUCT<id BIGINT, name VARCHAR>' . For primitive values, the type is one of the Primitive Data Types, e.g. 'key.type'='VARCHAR' .
Required: No, unless key.format is set and there is no default value.
Default value: For certain query semantics (i.e. queries using JOIN or GROUP BY), a generated key type is used by default. For queries that do not generate a key type, the key type from the leftmost source Relation’s key is used by default (if any). See Row Key Definition.
Type: String
Valid values: See STRUCT in Data Types |
delivery.guarantee | The fault tolerance guarantees applied when producing to this Stream.
Required: No
Default value: at_least_once
Type: String
Valid values:
|
Parameter Name | Description |
---|---|
topic.shards | The number of shards to use when creating the Kinesis stream, if applicable.
Required: Yes, unless Topic already exists.
Default values: Leftmost source Relation Topic’s shard count.
Type: Integer
Valid values: [1, ...]
Alias: kinesis.shards |
Kinesis stores provide a delivery guarantee of
at_least_once
when producing events into a sink Topic.The following creates a replica of the source Stream.
CREATE STREAM pageviews_copy AS SELECT * FROM pageviews;
The following creates a replica of the source Stream, but the new Relation belongs to the Schema named
schema2
in the session’s current Database.CREATE STREAM schema2.pageviews_copy AS SELECT * FROM pageviews;
The following creates a replica of the source Stream, but the new Relation belongs to the Schema named
schema2
in the Database named db
.CREATE STREAM db.schema2.pageviews_copy AS SELECT * FROM pageviews;
The following creates a replica of the source Stream. The new sink Relation has a case-sensitive name.
CREATE STREAM "PageViews" AS SELECT * FROM pageviews;
The following creates a replica of the source Stream. The new sink Relation has a case-sensitive name and is in a case-sensitive Database and Schema.
CREATE STREAM "DataBase"."Schema"."PageViews :)" AS SELECT * FROM pageviews;
The following creates a replica of the source Stream, but this new Stream is associated with the specified Topic called
pageviewstwo
.CREATE STREAM
pageviews2
WITH ('topic' = 'pageviewstwo')
AS SELECT * FROM pageviews;
The following moves data from a Kafka Store to a Kinesis Store. The query creates a replica of the source Stream, but this new Stream is associated with the specified Store called
kinesis_store
.CREATE STREAM pageviews_kinesis
WITH ('store' = 'kinesis_store') AS
SELECT * FROM pageviews_kafka;
The following creates a replica of the source Stream that has a data format of JSON, but the new sink Stream has a data format of Avro for its value and key.
CREATE STREAM
pageviews_avro
WITH ('value.format' = 'avro', 'key.format' = 'AVRO')
AS SELECT * FROM pageviews_json;
The following creates a replica of the source Stream that has a data format of JSON, but the new sink Stream has a data format of Avro for its value. Since the sink is a Kinesis stream, there is no key associated with the record, and so the
value.format
property is the only one that is necessary.CREATE STREAM
pageviews_avro
WITH ('store' = 'kinesis_store', 'value.format' = 'avro')
AS SELECT * FROM pageviews_json;
The following is a simple projection query where the sink Kafka topic has a specific number of partitions and replicas set.
CREATE STREAM
pageviews2
WITH ('topic.partitions' = '5', 'topic.replicas' = '3')
AS SELECT
viewtime AS vtime,
pageid AS pid
FROM pageviews;
The following is a simple projection query where the sink Kinesis stream has a specific number of shards set.
CREATE STREAM pageviews2
WITH ('topic.shards' = '4') AS
SELECT
viewtime AS vtime,
pageid AS pid
FROM pageviews;
Interval joins between two Streams result in a
STREAM
sink Relation type.CREATE STREAM pageviews_enriched AS
SELECT
p.userid AS pvid,
u.userid AS uid,
u.gender,
p.pageid,
u.interests[1] AS top_interest
FROM
pageviews p JOIN users u WITHIN 5 minutes
ON u.userid = p.userid;
A temporal join of two Relations where the left join side source is a Stream and the right join side source is a Changelog results in a
STREAM
output Relation type. In the example below, a new Stream called users_visits
is created by performing a temporal join between the pageviews
Stream and the users_log
Changelog.CREATE STREAM users_visits AS
SELECT
p.userid AS pvid,
u.userid AS uid,
u.gender,
p.pageid,
u.interests[1] AS top_interest
FROM
pageviews p JOIN users_log u
ON u.userid = p.userid;
The below statement creates a new Stream, called
pagestats
, from the already existing Stream pageviews
. The timestamp
Stream parameter, specified in the WITH
clause, is used to mark the viewtime
column in pagestats
as the timestamp column. Therefore, any subsequent query that refers to pagestats
in its FROM
clause will use this column for time based operations.CREATE STREAM pagestats
WITH ('timestamp'='viewtime') AS
SELECT viewtime, pageid
FROM pageviews;
The below statement creates a new Stream, called
pageviews_exactly_once
, from the already existing Stream pageviews
. The delivery.guarantee
Stream parameter, specified in the WITH
clause, is used to override the default delivery.guarantee
of at_least_once
to exactly_once
. A user may want to use this configuration if their use case can tolerate higher latencies but cannot tolerate duplicate outputs.CREATE STREAM pageviews_exactly_once
WITH ('delivery.guarantee'='exactly_once') AS
SELECT *
FROM pageviews;
The below statement creates a new Stream, called
pageviews_partition_by
, from the already existing Stream pageviews
. The PARTITION BY
clause is sets the key type for the output pageviews_partition_by
Stream. Notice in this example the source Stream's records don't set a key value and the sink Stream has the PARTITION BY
values as key. The sink Stream's key data format is JSON
in this example because it inherits the sink's value data format by default.CREATE STREAM pageviews_partition_by AS
SELECT viewtime, userid AS `UID`, pageid
FROM pageviews
PARTITION BY "UID", pageID;
Given this input for
pageviews
:KEY VALUE
{} {"viewtime":1690327704650, "userid":"User_9", "pageid":"Page_11"}
{} {"viewtime":1690327705651, "userid":"User_6", "pageid":"Page_94"}
We can expect the following output in
pageviews_partition_by
:KEY VALUE
{"UID":"User_9", "pageid":"Page_11"} {"viewtime":1690327704650, "UID":"User_9", "pageid":"Page_11"}
{"UID":"User_6", "pageid":"Page_94"} {"viewtime":1690327705651, "UID":"User_6", "pageid":"Page_94"}
The below statement creates a new Stream, called
pageviews_partition_by
, from the already existing Stream pageviews
. The PARTITION BY
clause sets the key type for the output pageviews_partition_by
Stream. Further, this query also sets the key.format
property for the sink Stream to be PRIMITIVE
. Notice in this example the source Stream's records have the pageid
column value set as the key in JSON
format and the output Stream has the PARTITION BY
value as key in the PRIMITIVE
format.CREATE STREAM pageviews_partition_by
WITH ('key.format'='PRIMITIVE') AS
SELECT viewtime, userid AS `UID`, pageid
FROM pageviews
PARTITION BY "UID";
Given this input for
pageviews
:KEY VALUE
{"pageid":"Page_11"} {"viewtime":1690327704650, "userid":"User_9", "pageid":"Page_11"}
{"pageid":"Page_94"} {"viewtime":1690327705651, "userid":"User_6", "pageid":"Page_94"}
We can expect the following output in
pageviews_partition_by
:KEY VALUE
"User_9" {"viewtime":1690327704650, "UID":"User_9", "pageid":"Page_11"}
"User_6" {"viewtime":1690327705651, "UID":"User_6", "pageid":"Page_94"}
Last modified 13d ago