CREATE STREAM AS SELECT

Syntax

CREATE STREAM stream_name
[WITH (stream_parameter = value [, ... ])] 
AS select_statement;

Description

CREATE STREAM AS is essentially a combination of two statements:

  • A DDL statement that creates a new Database.

  • An INSERT INTO statement that runs a SELECT statement and adds the results into the newly-created stream.

Arguments

stream_name

This specifies the name of the new stream. Optionally, use <database_name>.<schema_name> as the prefix to the name to create the relation in that scope. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.

WITH (<stream_parameter> = <value> [, …​ ])

Optionally, this clause specifies CREATE STREAM AS SELECT.

select_statement

This statement specifies the SELECT statement to run.

Stream Parameters

Parameter Name
Description

topic

The name of the entity that has the data for a newly-created sink stream. If the entity doesn’t exist in the store, an entity with the stream_name is created in the corresponding store.

Required: No Default value: Lowercase stream_name. Type: String Valid values: See LIST ENTITIES

store

The name of the store that hosts the entity for this stream.

Required: No Default value: User’s default Data Store.

Type: String Valid values: See LIST STORES

value.format

Format of the message value in the Data Store. For more information regarding serialization formats see Data Formats (Serialization).

Required: No Default value: Value format from the leftmost source relation. Type: VALUE_FORMAT Valid values: JSON, AVRO, PROTOBUF, PRIMITIVE

timestamp

Name of the column in the stream to use as the timestamp. If not set, the timestamp of the message is used for time-based operations such as window aggregations and joins.

Required: No Default value: Record’s timestamp. Type: String Valid values: Must be of type BIGINT or TIMESTAMP or TIMESTAMP_LTZ. See Data Types.

timestamp.format

The format to use for TIMESTAMP typed fields. See Data Types.

Required: No Default value: sql Type: String Valid values: sql, iso8601

Kafka-Specific Parameters

Parameter Name
Description

topic.partitions

The number of partitions to use when creating the Kafka topic, if applicable. Note that the number of partitions must be compatible with the Kafka cluster's configuration. If there's any mismatch, a query may time out when communicating with the cluster at record creation time.

Required: Yes, unless entity already exists. Default value: Leftmost source relation entity’s partition count. Type: Integer Valid values: [1, ...]

topic.replicas

The number of replicas to use when creating the Kafka topic, if applicable. Note that the number of replicas must be compatible with the Kafka cluster's configuration. If there's any mismatch, a query may time out when communicating with the cluster at record creation time.

Required: Yes, unless entity already exists. Default values: Leftmost source relation entity's replica count. Type: Integer Valid values: [1, ...]

kafka.topic.*

A configuration specific for the topic being created — for example, Kafka Entity Configuration for Confluent Platform. Required: No Default value: None Type: String Valid values: Kafka topic configuration specific to the underlying Data Store type.

key.format

The format of a message key in the Data Store. For more information regarding serialization formats see Data Formats (Serialization).

Required: No Default value: None Type: String Valid values: JSON, AVRO, PROTOBUF, PRIMITIVE

key.columns

Specifies the name(s) of the value columns, separated by commas, that are used to construct the record key.

For a non-primitive key.format, the record key is created as a STRUCT whose fields are the columns listed in this property.

For a primitive key.format, this property must contain exactly one column with a primitive data type.

Required: No Default: None Type: String Valid values: One or more valid column names from the sink object’s column list, separated by commas.

value.columns.exclude

Specifies the name(s) of the columns, separated by commas, that should be excluded from the record’s value and included only in its key.

You can set this property only if key.columns is already defined.

The excluded columns must appear at the end of the relation’s column list and must also be listed in key.columns.

Required: No Default: None Type: String Valid values: One or more valid column names from the relation’s column list, separated by commas.

delivery.guarantee

The fault tolerance guarantees applied when producing to this stream.

Required: No Default value: at_least_once Type: String Valid values:

  • exactly_once: Produces to the stream using Kafka transactions. These transactions commit when the query takes a checkpoint. On the consumer side, when setting the Kafka consumer isolation.level configuration to read_committed, only the committed records display. Since records aren't committed until the query takes a checkpoint, there is some additional delay when using this setting.

  • at_least_once: Ensures that records are output to the stream at least once. During query checkpointing, the query waits to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible, as the query attempts to reprocess old data.

  • none: There is no fault tolerance guarantee when producing to the stream. If there are issues on the Kafka broker then records may be lost; if there are issues with the query then output records may be duplicated.

sink.timestamp.column

Specifies the name of the value column. Use this to set the Kafka record’s timestamp when you write to the Kafka sink’s entity.

If you do not specify a timestamp column, the system creates a Kafka producer record without an explicit timestamp. This allows the sink’s store to assign a timestamp according to its configured policy.

Required: No Default value: None Type: String Valid values: One of the column names from the sink relation’s column list. Must be of type BIGINT or TIMESTAMP or TIMESTAMP_LTZ. See Data Types.

Kinesis-Specific Parameters

Parameter Name
Description

topic.shards

The number of shards to use when creating the Kinesis stream, if applicable.

Required: Yes, unless entity already exists. Default values: Leftmost source relation entity’s shard count. Type: Integer Valid values: [1, ...] Alias: kinesis.shards

Kinesis stores provide a delivery guarantee of at_least_once when producing events into a sink Data Store.

Format-Specific Properties

All format-specific properties that are applicable to a stream can be provided as a stream_parameter. See Format-Specific Parameters for more details.

Examples

Create a copy stream

The following creates a replica of the source stream.

CREATE STREAM pageviews_copy AS SELECT * FROM pageviews;

Create a stream with passthrough configuration for retention

CREATE STREAM us_customers WITH (
  'store' = 'kafka_store',
  'topic.partitions' = 1, 
  'topic.replicas' = 2, 
  'kafka.topic.retention.ms' = '172800000') 
AS SELECT * FROM customers WHERE region = 'US';

Create a stream in a specific schema within default database

The following creates a replica of the source stream, but the new relation belongs to the schema named schema2 in the session’s current database.

CREATE STREAM schema2.pageviews_copy AS SELECT * FROM pageviews;

Create stream in specific schema and database

The following creates a replica of the source stream, but the new relation belongs to the schema named schema2 in the database named db.

CREATE STREAM db.schema2.pageviews_copy AS SELECT * FROM pageviews;

Create a case-sensitive stream

The following creates a replica of the source stream. The new sink relation has a case-sensitive name.

CREATE STREAM "PageViews" AS SELECT * FROM pageviews;

Create a case-sensitive stream in a case-sensitive schema and database

The following creates a replica of the source stream. The new sink relation has a case-sensitive name and is in a case-sensitive database and schema.

CREATE STREAM "DataBase"."Schema"."PageViews :)" AS SELECT * FROM pageviews;

Create a new stream that is backed by a specific entity

The following creates a replica of the source stream, but this new stream is associated with the specified entity called pageviewstwo.

CREATE STREAM
  pageviews2
  WITH ('topic' = 'pageviewstwo')
AS SELECT * FROM pageviews;

Copy data from one store to another

The following moves data from a Kafka store to a Kinesis store. The query creates a replica of the source stream, but this new stream is associated with the specified store called kinesis_store.

CREATE STREAM pageviews_kinesis
  WITH ('store' = 'kinesis_store') AS 
SELECT * FROM pageviews_kafka;

Convert data from JSON to Avro with a Kafka store

The following creates a replica of the source stream that has a data format of JSON, but the new sink stream has a data format of Avro for its value and key.

CREATE STREAM
  pageviews_avro
  WITH ('value.format' = 'avro', 'key.format' = 'avro')
AS SELECT * FROM pageviews_json;

Convert data from JSON to Avro with a Kinesis store

The following creates a replica of the source stream that has a data format of JSON, but the new sink stream has a data format of Avro for its value. Since the sink is a Kinesis stream, there is no key associated with the record, and so the value.format property is the only one that is necessary.

CREATE STREAM
  pageviews_avro
  WITH ('store' = 'kinesis_store', 'value.format' = 'avro')
AS SELECT * FROM pageviews_json;

Simple projection to a Kafka topic with a specific number of partitions and replicas

The following is a simple projection query where the sink Kafka topic has a specific number of partitions and replicas set.

CREATE STREAM
  pageviews2
  WITH ('topic.partitions' = 5, 'topic.replicas' = 3)
AS SELECT
  viewtime AS vtime,
  pageid AS pid
FROM pageviews;

Simple projection to a Kinesis stream with a specific number of shards

The following is a simple projection query where the sink Kinesis stream has a specific number of shards set.

CREATE STREAM pageviews2
  WITH ('topic.shards' = 4) AS 
SELECT
  viewtime AS vtime,
  pageid AS pid
FROM pageviews;

Create a stream using an interval join

Interval joins between two streams result in a STREAM sink relation type.

CREATE STREAM pageviews_enriched AS
SELECT 
  p.userid AS pvid, 
  u.userid AS uid, 
  u.gender, 
  p.pageid, 
  u.interests[1] AS top_interest 
FROM 
  pageviews p JOIN users u WITHIN 5 minutes 
  ON u.userid = p.userid;

Create a stream using a temporal join

A temporal join of two relations where the left join side source is a stream and the right join side source is a changelog. This results in a STREAM output relation type. In the example below, a new stream called users_visits is created by performing a temporal join between the pageviews stream and the users_log changelog.

CREATE STREAM users_visits AS 
SELECT 
  p.userid AS pvid, 
  u.userid AS uid, 
  u.gender, 
  p.pageid, 
  u.interests[1] AS top_interest 
FROM 
  pageviews p JOIN users_log u 
  ON u.userid = p.userid;

Create a stream with specifying the timestamp column

The below statement creates a new stream, called pagestats, from the already existing stream pageviews. The timestamp stream parameter, specified in the WITH clause, marks the viewtime column in pagestats as the timestamp column. Therefore, any subsequent query that refers to pagestats in its FROM clause uses this column for time-based operations.

CREATE STREAM pagestats 
  WITH ('timestamp'='viewtime') AS
SELECT viewtime, pageid 
FROM pageviews;

Create a stream with specifying the Kafka delivery guarantee

The below statement creates a new stream, called pageviews_exactly_once, from the already existing stream pageviews. The delivery.guarantee stream parameter, specified in the WITH clause, overrides the default delivery.guarantee of at_least_once to exactly_once. You may wish to use this configuration if your use case can tolerate higher latencies but cannot tolerate duplicate outputs.

CREATE STREAM pageviews_exactly_once 
  WITH ('delivery.guarantee'='exactly_once') AS
SELECT *
FROM pageviews;

Create a stream with sink timestamp column

The below statement selects all columns to create a new stream, called pageviews_copy, from the pre-existing stream pageviews. the value of the viewtime column sets the sink timestamp.

-- Assuming the sink Store is Kafka
CREATE STREAM pageviews_copy 
  WITH ('sink.timestamp.column' = 'viewtime')
SELECT *
FROM pageviews;

Create a stream with format-specific properties for Avro

The following statement creates a new stream, usersInfo, by selecting the records' key and value from another given stream users_avro. Assuming the users_avro key and value are in avro, two subjects are provided to generate the Avro schemas for userInfo 's key and value. The system stores these subjects in the schema registry of a store called sr_store. The users_data-key subject generates the key's Avro schema, and the users_data-value subject generates the value's Avro schema for the records written into usersInfo.

CREATE STREAM "usersInfo" 
WITH ('topic'='usersInfo', 
      'avro.base.store.name' = sr_store,
      'avro.base.subject.key' = 'users_data-key',
      'avro.base.subject.value' = 'users_data-value') AS
SELECT * FROM users_avro;  

Create a stream with with key columns and value exclude columns for sink

The following creates a new stream, keyed_pageviews , by selecting all columns from the pageviews records: viewtime , userid and pageid . Using the built-in function SUBSTR , the ID of each page is also extracted as pid in the query. Since userid and pid are chosen as the key columns, each record in sink has a key of STRUCT type with these two fields. Further, since pid is picked as a column to exclude, the records in sink have 3 columns - viewtime , userid and pageid . The value of pid is used only for generating the records' key.

CREATE STREAM keyed_pageviews
WITH ('key.format'='json', 
      'key.columns' = 'userid,pid',
      'value.columns.exclude' = 'pid') AS
SELECT *, SUBSTR(pageid, 6) AS pid  
FROM users_avro;  

Last updated