# CREATE STREAM AS SELECT

## Syntax <a href="#synopsis" id="synopsis"></a>

```sql
CREATE STREAM stream_name
[WITH (stream_parameter = value [, ... ])] 
AS select_statement;
```

## Description <a href="#description" id="description"></a>

`CREATE STREAM AS` is essentially a combination of two statements:

* A DDL statement that creates a new [#\_stream](https://docs.deltastream.io/overview/core-concepts/databases#_stream "mention").
* An [insert-into](https://docs.deltastream.io/reference/sql-syntax/query/insert-into "mention") statement that runs a [select](https://docs.deltastream.io/reference/sql-syntax/query/select "mention") statement and adds the results into the newly-created stream.

### Arguments <a href="#parameters" id="parameters"></a>

#### stream\_name

This specifies the name of the new stream. Optionally, use `<database_name>.<schema_name>` as the prefix to the name to create the relation in that scope. If the name is case-sensitive, you must wrap it in double quotes; otherwise, the system uses the lower case name.

#### WITH (\<stream\_parameter> = \<value> \[, …​ ])

Optionally, this clause specifies [#\_stream\_parameters](#_stream_parameters "mention").

#### select\_statement

This statement specifies the [select](https://docs.deltastream.io/reference/sql-syntax/query/select "mention") statement to run.

### Stream Parameters <a href="#stream_parameters" id="stream_parameters"></a>

| Parameter Name     | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `topic`            | <p>The name of the entity that has the data for a newly-created sink stream. If the entity doesn’t exist in the store, an entity with the <code>stream\_name</code> is created in the corresponding store.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> Lowercase <code>stream\_name</code>.<br><strong>Type:</strong> String<br><strong>Valid values:</strong> See <a data-mention href="../command/list-entities">list-entities</a></p>                                                                                                         |
| `store`            | <p>The name of the store that hosts the entity for this stream.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> User’s default <a data-mention href="../../../overview/core-concepts/store">store</a>.</p><p><strong>Type:</strong> String<br><strong>Valid values:</strong> See <a data-mention href="../command/list-stores">list-stores</a></p>                                                                                                                                                                                                   |
| `value.format`     | <p>Format of the message value in the <a data-mention href="../../../../overview/core-concepts/store#entity">#entity</a>. For more information regarding serialization formats see <a data-mention href="../data-format-serialization">data-format-serialization</a>.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> Value format from the leftmost source relation.<br><strong>Type:</strong> <code>VALUE\_FORMAT</code><br><strong>Valid values:</strong> <code>JSON</code>, <code>AVRO</code>, <code>PROTOBUF</code>, <code>PRIMITIVE</code></p> |
| `timestamp`        | <p>Name of the column in the stream to use as the timestamp. If not set, the timestamp of the message is used for time-based operations such as window aggregations and joins.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> Record’s timestamp.<br><strong>Type:</strong> String<br><strong>Valid values:</strong> Must be of type <code>BIGINT</code> or <code>TIMESTAMP</code> or <code>TIMESTAMP\_LTZ</code>. See <a data-mention href="../data-types">data-types</a>.</p>                                                                     |
| `timestamp.format` | <p>The format to use for <code>TIMESTAMP</code> typed fields. See <a data-mention href="../data-types">data-types</a>.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> <code>sql</code><br><strong>Type:</strong> String<br><strong>Valid values:</strong> <code>sql</code>, <code>iso8601</code></p>                                                                                                                                                                                                                                                |

### **Kafka-Specific Parameters**

| Parameter Name          | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| ----------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `topic.partitions`      | <p>The number of partitions to use when creating the Kafka topic, if applicable. Note that the number of partitions must be compatible with the Kafka cluster's configuration. If there's any mismatch, a query may time out when communicating with the cluster at record creation time.<br></p><p><strong>Required:</strong> Yes, unless entity already exists.<br><strong>Default value:</strong> Leftmost source relation entity’s partition count.<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| `topic.replicas`        | <p>The number of replicas to use when creating the Kafka topic, if applicable. Note that the number of replicas must be compatible with the Kafka cluster's configuration. If there's any mismatch, a query may time out when communicating with the cluster at record creation time.<br></p><p><strong>Required:</strong> Yes, unless entity already exists.<br><strong>Default values:</strong> Leftmost source relation entity's replica count.<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
| `kafka.topic.*`         | <p>A configuration specific for the topic being created — for example, <a href="https://kafka.apache.org/documentation/">Kafka Entity Configuration for Confluent Platform</a>.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> Kafka topic configuration specific to the underlying <a data-mention href="../../../overview/core-concepts/store">store</a> type.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| `key.format`            | <p>The format of a message key in the <a data-mention href="../../../../overview/core-concepts/store#entity">#entity</a>. For more information regarding serialization formats see <a data-mention href="../data-format-serialization">data-format-serialization</a>.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> <code>JSON</code>, <code>AVRO</code>, <code>PROTOBUF</code>, <code>PRIMITIVE</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
| `key.columns`           | <p>Specifies the name(s) of the value columns, separated by commas, that are used to construct the record key.</p><p><br>For a non-primitive key.format, the record key is created as a STRUCT whose fields are the columns listed in this property.</p><p><br>For a primitive <code>key.format</code>, this property must contain exactly one column with a primitive data type.<br></p><p><strong>Required</strong>: No<br><strong>Default</strong>: None<br><strong>Type</strong>: String<br><strong>Valid values</strong>: One or more valid column names from the sink object’s column list, separated by commas.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| `value.columns.exclude` | <p>Specifies the name(s) of the columns, separated by commas, that should be excluded from the record’s value and included only in its key.</p><p><br>You can set this property only if key.columns is already defined.</p><p><br>The excluded columns must appear at the end of the relation’s column list and must also be listed in key.columns.<br></p><p><strong>Required</strong>: No<br><strong>Default</strong>: None<br><strong>Type</strong>: String<br><strong>Valid values</strong>: One or more valid column names from the relation’s column list, separated by commas.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| `delivery.guarantee`    | <p>The fault tolerance guarantees applied when producing to this stream.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> <code>at\_least\_once</code><br><strong>Type:</strong> String<br><strong>Valid values:</strong></p><ul><li><code>exactly\_once</code>: Produces to the stream using Kafka transactions. These transactions commit when the query takes a checkpoint. On the consumer side, when setting the <a href="https://kafka.apache.org/documentation/#consumerconfigs_isolation.level">Kafka consumer <code>isolation.level</code> configuration</a> to <code>read\_committed</code>, only the committed records display. Since records aren't committed until the query takes a checkpoint, there is some additional delay when using this setting.</li><li><code>at\_least\_once</code>: Ensures that records are output to the stream at least once. During query checkpointing, the query waits to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible, as the query attempts to reprocess old data.</li><li><code>none</code>: There is no fault tolerance guarantee when producing to the stream. If there are issues on the Kafka broker then records may be lost; if there are issues with the query then output records may be duplicated.</li></ul> |
| `sink.timestamp.column` | <p>Specifies the name of the value column. Use this to set the Kafka record’s timestamp when you write to the Kafka sink’s entity.</p><p>If you do not specify a timestamp column, the system creates a Kafka producer record without an explicit timestamp. This allows the sink’s store to assign a timestamp according to its configured policy.</p><p><br><strong>Required:</strong> No<br><strong>Default value:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> One of the column names from the sink relation’s column list. Must be of type <code>BIGINT</code> or <code>TIMESTAMP</code> or <code>TIMESTAMP\_LTZ</code>. See <a data-mention href="../data-types">data-types</a>.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |

### **Kinesis-Specific Parameters**

| Parameter Name | Description                                                                                                                                                                                                                                                                                                                                                                         |
| -------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `topic.shards` | <p>The number of shards to use when creating the Kinesis stream, if applicable.<br></p><p><strong>Required:</strong> Yes, unless entity already exists.<br><strong>Default values:</strong> Leftmost source relation entity’s shard count.<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]<br><strong>Alias:</strong> <code>kinesis.shards</code></p> |

Kinesis stores provide a delivery guarantee of `at_least_once` when producing events into a sink [#entity](https://docs.deltastream.io/overview/core-concepts/store#entity "mention").

### Format-Specific Properties

All format-specific properties that are applicable to a stream can be provided as a `stream_parameter`. See [#format-specific-parameters](https://docs.deltastream.io/reference/ddl/create-stream#format-specific-parameters "mention") for more details.

## Examples

#### Create a copy stream

The following creates a replica of the source stream.

```sql
CREATE STREAM pageviews_copy AS SELECT * FROM pageviews;
```

#### Create a stream with passthrough configuration for retention

```sql
CREATE STREAM us_customers WITH (
  'store' = 'kafka_store',
  'topic.partitions' = 1, 
  'topic.replicas' = 2, 
  'kafka.topic.retention.ms' = '172800000') 
AS SELECT * FROM customers WHERE region = 'US';
```

#### Create a stream in a specific schema within default database

The following creates a replica of the source stream, but the new relation belongs to the schema named `schema2` in the session’s current database.

```sql
CREATE STREAM schema2.pageviews_copy AS SELECT * FROM pageviews;
```

#### Create stream in specific schema and database

The following creates a replica of the source stream, but the new relation belongs to the schema named `schema2` in the database named `db`.

```sql
CREATE STREAM db.schema2.pageviews_copy AS SELECT * FROM pageviews;
```

#### Create a case-sensitive stream

The following creates a replica of the source stream. The new sink relation has a case-sensitive name.

```sql
CREATE STREAM "PageViews" AS SELECT * FROM pageviews;
```

#### Create a case-sensitive stream in a case-sensitive schema and database

The following creates a replica of the source stream. The new sink relation has a case-sensitive name and is in a case-sensitive database and schema.

```sql
CREATE STREAM "DataBase"."Schema"."PageViews :)" AS SELECT * FROM pageviews;
```

#### Create a new stream that is backed by a specific entity

The following creates a replica of the source stream, but this new stream is associated with the specified entity called `pageviewstwo`.

```sql
CREATE STREAM
  pageviews2
  WITH ('topic' = 'pageviewstwo')
AS SELECT * FROM pageviews;
```

#### Copy data from one store to another

The following moves data from a Kafka store to a Kinesis store. The query creates a replica of the source stream, but this new stream is associated with the specified store called `kinesis_store`.

```sql
CREATE STREAM pageviews_kinesis
  WITH ('store' = 'kinesis_store') AS 
SELECT * FROM pageviews_kafka;
```

#### Convert data from JSON to Avro with a Kafka store

The following creates a replica of the source stream that has a data format of JSON, but the new sink stream has a data format of Avro for its value and key.

```sql
CREATE STREAM
  pageviews_avro
  WITH ('value.format' = 'avro', 'key.format' = 'avro')
AS SELECT * FROM pageviews_json;
```

#### Convert data from JSON to Avro with a Kinesis store

The following creates a replica of the source stream that has a data format of JSON, but the new sink stream has a data format of Avro for its value. Since the sink is a Kinesis stream, there is no key associated with the record, and so the `value.format` property is the only one that is necessary.

```sql
CREATE STREAM
  pageviews_avro
  WITH ('store' = 'kinesis_store', 'value.format' = 'avro')
AS SELECT * FROM pageviews_json;
```

#### Simple projection to a Kafka topic with a specific number of partitions and replicas

The following is a simple projection query where the sink Kafka topic has a specific number of partitions and replicas set.

```sql
CREATE STREAM
  pageviews2
  WITH ('topic.partitions' = 5, 'topic.replicas' = 3)
AS SELECT
  viewtime AS vtime,
  pageid AS pid
FROM pageviews;
```

#### Simple projection to a Kinesis stream with a specific number of shards

The following is a simple projection query where the sink Kinesis stream has a specific number of shards set.

```sql
CREATE STREAM pageviews2
  WITH ('topic.shards' = 4) AS 
SELECT
  viewtime AS vtime,
  pageid AS pid
FROM pageviews;
```

#### Create a stream using an interval join

Interval joins between two streams result in a `STREAM` sink relation type.

```sql
CREATE STREAM pageviews_enriched AS
SELECT 
  p.userid AS pvid, 
  u.userid AS uid, 
  u.gender, 
  p.pageid, 
  u.interests[1] AS top_interest 
FROM 
  pageviews p JOIN users u WITHIN 5 minutes 
  ON u.userid = p.userid;
```

#### Create a stream using a temporal join

A temporal join of two relations where the left join side source is a stream and the right join side source is a changelog. This results in a `STREAM` output relation type. In the example below, a new stream called `users_visits` is created by performing a temporal join between the `pageviews` stream and the `users_log` changelog.

```sql
CREATE STREAM users_visits AS 
SELECT 
  p.userid AS pvid, 
  u.userid AS uid, 
  u.gender, 
  p.pageid, 
  u.interests[1] AS top_interest 
FROM 
  pageviews p JOIN users_log u 
  ON u.userid = p.userid;
```

#### Create a stream with specifying the timestamp column

The below statement creates a new stream, called `pagestats`, from the already existing stream `pageviews`. The `timestamp` stream parameter, specified in the `WITH` clause, marks the `viewtime` column in `pagestats` as the timestamp column. Therefore, any subsequent query that refers to `pagestats` in its `FROM` clause uses this column for time-based operations.

```sql
CREATE STREAM pagestats 
  WITH ('timestamp'='viewtime') AS
SELECT viewtime, pageid 
FROM pageviews;
```

#### Create a stream with specifying the Kafka delivery guarantee

The below statement creates a new stream, called `pageviews_exactly_once`, from the already existing stream `pageviews`. The `delivery.guarantee` stream parameter, specified in the `WITH` clause, overrides the default `delivery.guarantee` of `at_least_once` to `exactly_once`. You may wish to use this configuration if your use case can tolerate higher latencies but cannot tolerate duplicate outputs.

```sql
CREATE STREAM pageviews_exactly_once 
  WITH ('delivery.guarantee'='exactly_once') AS
SELECT *
FROM pageviews;
```

#### Create a stream with sink timestamp column

The below statement selects all columns to create a new stream, called `pageviews_copy`, from the pre-existing stream `pageviews`. the value of the `viewtime` column sets the sink timestamp.

```sql
-- Assuming the sink Store is Kafka
CREATE STREAM pageviews_copy 
  WITH ('sink.timestamp.column' = 'viewtime')
SELECT *
FROM pageviews;
```

#### Create a stream with format-specific properties for Avro

The following statement creates a new stream, `usersInfo,` by selecting the records' key and value from another given stream `users_avro`. Assuming the `users_avro` key and value are in `avro`, two subjects are provided to generate the Avro schemas for `userInfo` 's key and value. The system stores these subjects in the schema registry of a store called `sr_store`. The `users_data-key` subject generates the key's Avro schema, and the `users_data-value` subject generates the value's Avro schema for the records written into `usersInfo.`

```sql
CREATE STREAM "usersInfo" 
WITH ('topic'='usersInfo', 
      'avro.base.store.name' = sr_store,
      'avro.base.subject.key' = 'users_data-key',
      'avro.base.subject.value' = 'users_data-value') AS
SELECT * FROM users_avro;  
```

#### Create a stream with with key columns and value exclude columns for sink

The following creates a new stream, `keyed_pageviews` , by selecting all columns from the `pageviews` records: `viewtime` , `userid` and `pageid` . Using the built-in function `SUBSTR` , the ID of each page is also extracted as `pid` in the query. Since `userid` and `pid` are chosen as the key columns, each record in sink has a key of `STRUCT` type with these two fields. Further, since `pid` is picked as a column to exclude, the records in sink have 3 columns - `viewtime` , `userid` and `pageid` . The value of `pid` is used only for generating the records' key.

```sql
CREATE STREAM keyed_pageviews
WITH ('key.format'='json', 
      'key.columns' = 'userid,pid',
      'value.columns.exclude' = 'pid') AS
SELECT *, SUBSTR(pageid, 6) AS pid  
FROM users_avro;  
```
