CREATE STREAM AS SELECT
Syntax
Description
CREATE STREAM AS
is essentially a combination of two statements:
A DDL statement that creates a new #_stream.
An INSERT INTO statement that runs a SELECT statement and adds the results into the newly-created stream.
Arguments
stream_name
This specifies the name of the new stream. Optionally, use <database_name>.<schema_name>
as the prefix to the name to create the relation in that scope. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.
WITH (<stream_parameter> = <value> [, … ])
Optionally, this clause specifies #_stream_parameters.
select_statement
This statement specifies the SELECT statement to run.
PARTITION BY partition_by_clause
Optionally, this clause enables you to set the partition key of records according to their values for a given set of columns. The PARTITION BY
clause in the statement defines a list of one or more columns (separated by commas) as partitioning columns. By default, the key for the sink's records have a data format equal to the sink's value data format. To set a specific key format, set the key.format
stream parameter to specify a different key format. PARTITION BY
is supported for CREATE STREAM AS SELECT and INSERT INTO queries where the sink is a stream. Currently, PARTITION BY
only applies for queries whose sink stream is backed by a Kafka store.
Stream Parameters
topic
The name of the entity that has the data for a newly-created sink stream. If the entity doesn’t exist in the store, an entity with the stream_name
is created in the corresponding store.
Required: No
Default value: Lowercase stream_name
.
Type: String
Valid values: See LIST ENTITIES
store
The name of the store that hosts the entity for this stream.
Required: No Default value: User’s default Store.
Type: String Valid values: See LIST STORES
value.format
Format of the message value in the #entity. For more information regarding serialization formats see Data Formats (Serialization).
Required: No
Default value: Value format from the leftmost source relation.
Type: VALUE_FORMAT
Valid values: JSON
, AVRO
, PROTOBUF
, PRIMITIVE
timestamp
Name of the column in the stream to use as the timestamp. If not set, the timestamp of the message is used for time-based operations such as window aggregations and joins.
Required: No
Default value: Record’s timestamp.
Type: String
Valid values: Must be of type BIGINT
or TIMESTAMP
. See Data Types.
timestamp.format
The format to use for TIMESTAMP
typed fields. See Data Types.
Required: No
Default value: sql
Type: String
Valid values: sql
, iso8601
Kafka Specific Parameters
topic.partitions
The number of partitions to use when creating the Kafka topic, if applicable.
Required: Yes, unless entity already exists. Default value: Leftmost source relation entity’s partition count. Type: Integer Valid values: [1, ...]
topic.replicas
The number of replicas to use when creating the Kafka topic, if applicable.
Required: Yes, unless entity already exists. Default values: Leftmost source relation entity's replica count. Type: Integer Valid values: [1, ...]
key.format
The format of a message key in the #entity. For more information regarding serialization formats see Data Formats (Serialization).
Required: No
Default value: Key format from the leftmost source relation’s key (if any) or the same as value.format
.
Type: KEY_FORMAT
Valid values: JSON
, AVRO
, PROTOBUF
, PRIMITIVE
key.type
Required: No, unless key.format
is set and there is no default value.
Default value: For certain query semantics (that is, queries using JOIN or GROUP BY), a generated key type is used by default. For queries that do not generate a key type, the key type from the leftmost source relation’s key is used by default (if any). See Row Key Definition.
Type: String
Valid values: See STRUCT
in Data Types
delivery.guarantee
The fault tolerance guarantees applied when producing to this stream.
Required: No
Default value: at_least_once
Type: String
Valid values:
exactly_once
: Produces to the stream using Kafka transactions. These transactions commit when the query takes a checkpoint. On the consumer side, when setting the Kafka consumerisolation.level
configuration toread_committed
, only the committed records display. Since records aren't committed until the query takes a checkpoint, there is some additional delay when using this setting.at_least_once
: Ensures that records are output to the stream at least once. During query checkpointing, the query waits to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible, as the query attempts to reprocess old data.none
: There is no fault tolerance guarantee when producing to the stream. If there are issues on the Kafka broker then records may be lost; if there are issues with the query then output records may be duplicated.
sink.timestamp.strategy
Determines how the timestamp values for records, written to a Kafka sink's entity, are set.
Required: No
Default value: proc.time
Type: String
Valid values:
event.time
: Use timestamp of the records, coming from the source topic.proc.time
: Uses the current time of the Kafka producer when writing into the sink's entity. Note that the final timestamp used by Kafka depends on the timestamp type configured for the Kafka topic.
Kinesis Specific Parameters
topic.shards
The number of shards to use when creating the Kinesis stream, if applicable.
Required: Yes, unless entity already exists.
Default values: Leftmost source relation entity’s shard count.
Type: Integer
Valid values: [1, ...]
Alias: kinesis.shards
Kinesis stores provide a delivery guarantee of at_least_once
when producing events into a sink #entity.
Format-Specific Properties
All format-specific properties that are applicable to a stream can be provided as a stream_parameter
. See Format-Specific Parameters for more details.
Examples
Create a copy stream
The following creates a replica of the source stream.
Create a stream in a specific schema within default database
The following creates a replica of the source stream, but the new relation belongs to the schema named schema2
in the session’s current database.
Create stream in specific schema and database
The following creates a replica of the source stream, but the new relation belongs to the schema named schema2
in the database named db
.
Create a case-sensitive stream
The following creates a replica of the source stream. The new sink relation has a case-sensitive name.
Create a case-sensitive stream in a case-sensitive schema and database
The following creates a replica of the source stream. The new sink relation has a case-sensitive name and is in a case-sensitive database and schema.
Create a new stream that is backed by a specific entity
The following creates a replica of the source stream, but this new stream is associated with the specified entity called pageviewstwo
.
Copy data from one store to another
The following moves data from a Kafka store to a Kinesis store. The query creates a replica of the source stream, but this new stream is associated with the specified store called kinesis_store
.
Convert data from JSON to Avro with a Kafka store
The following creates a replica of the source stream that has a data format of JSON, but the new sink stream has a data format of Avro for its value and key.
Convert data from JSON to Avro with a Kinesis store
The following creates a replica of the source stream that has a data format of JSON, but the new sink stream has a data format of Avro for its value. Since the sink is a Kinesis stream, there is no key associated with the record, and so the value.format
property is the only one that is necessary.
Simple projection to a Kafka topic with a specific number of partitions and replicas
The following is a simple projection query where the sink Kafka topic has a specific number of partitions and replicas set.
Simple projection to a Kinesis stream with a specific number of shards
The following is a simple projection query where the sink Kinesis stream has a specific number of shards set.
Create a stream using an interval join
Interval joins between two streams result in a STREAM
sink relation type.
Create a stream using a temporal join
A temporal join of two relations where the left join side source is a stream and the right join side source is a changelog. This results in a STREAM
output relation type. In the example below, a new stream called users_visits
is created by performing a temporal join between the pageviews
stream and the users_log
changelog.
Create a stream with specifying the timestamp column
The below statement creates a new stream, called pagestats
, from the already existing stream pageviews
. The timestamp
stream parameter, specified in the WITH
clause, marks the viewtime
column in pagestats
as the timestamp column. Therefore, any subsequent query that refers to pagestats
in its FROM
clause uses this column for time-based operations.
Create a stream with specifying the Kafka delivery guarantee
The below statement creates a new stream, called pageviews_exactly_once
, from the already existing stream pageviews
. The delivery.guarantee
stream parameter, specified in the WITH
clause, overrides the default delivery.guarantee
of at_least_once
to exactly_once
. You may wish to use this configuration if your use case can tolerate higher latencies but cannot tolerate duplicate outputs.
Create a stream with the PARTITION BY clause
The below statement creates a new stream, called pageviews_partition_by
, from the already existing stream pageviews
. The PARTITION BY
clause sets the key type for the output pageviews_partition_by
stream. Notice in this example the source stream's records don't set a key value and the sink stream has the PARTITION BY
values as key. The sink stream's key data format is JSON
in this example because it inherits the sink's value data format by default.
Given this input for pageviews
:
You can expect the following output in pageviews_partition_by
:
Create a stream with the PARTITION BY clause to override existing key
The below statement creates a new stream, called pageviews_partition_by
, from the already existing stream pageviews
. The PARTITION BY
clause sets the key type for the output pageviews_partition_by
stream. Further, this query also sets the key.format
property for the sink stream to be PRIMITIVE
. Notice in this example the source stream's records have the pageid
column value set as the key in JSON
format and the output stream has the PARTITION BY
value as key in the PRIMITIVE
format.
Given this input for pageviews
:
We can expect the following output in pageviews_partition_by
:
Create a stream with `event.time` sink timestamp strategy
The below statement creates a new stream, called pageviews_copy
, from the already existing stream pageviews
by selecting all columns. The sink timestamp strategy is set to event.time
; this way when writing to the sink's Kafka topic, the timestamp for each record in pageviews_copy
is set to its source record's timestamp, coming from the entity that backs pageviews
.
Create a stream with format-specific properties for Avro
The following creates a new stream, usersInfo,
by selecting records' key and value from another given stream users_avro
. Assuming users_avro
key and value are in avro
, two subjects are provided to generate the Avro schemas for userInfo
's key and value. These subjects are stored in the schema registry of a store called sr_store
. users_data-key
subject generates key's Avro schema, and users_data-value
subject generates value's Avro schema for the records written into usersInfo.
Last updated