# CREATE STREAM

## Syntax <a href="#synopsis" id="synopsis"></a>

```sql
CREATE STREAM stream_name (
    column_name data_type [NOT NULL] [, ...]
) WITH (stream_parameter = value [, ...]);
```

{% code overflow="wrap" %}

```sql
CREATE STREAM stream_name WITH (stream_parameter = value [, ...]);
```

{% endcode %}

## Description <a href="#description" id="description"></a>

A stream is a sequence of immutable, partitioned, and partially-ordered events.

{% hint style="info" %}
**Note** In DeltaStream, the terms **events** and **records** are synonymous.
{% endhint %}

A stream is a relational representation of data in a streaming [store](https://docs.deltastream.io/overview/core-concepts/store "mention"), such as the data in a Kafka topic or a Kinesis stream. The records in a stream are independent of each other; this means there is no correlation between two records in a stream.

A stream declares the schema of the records, which includes the column name along with the column type and optional constraints. A stream is a type of [#relation](https://docs.deltastream.io/overview/core-concepts/databases#relation "mention"). Each relation belongs to a [#schema](https://docs.deltastream.io/overview/core-concepts/databases#schema "mention") in a [databases](https://docs.deltastream.io/overview/core-concepts/databases "mention"), so the fully-qualified name of the relation would be `<database>.<schema>.<relation>`.

### Alternative syntax

If data is in Protobuf or Avro format, alternative syntax can be used, assuming the source topic has a [Schema](https://docs.deltastream.io/reference/sql-syntax/ddl/create-schema_registry) or a [Descriptor](https://docs.deltastream.io/reference/sql-syntax/ddl/create-descriptor_source) associated to it and it will create a Stream definition which includes ALL columns. The user must provide the `value.format`.

### Arguments

#### stream\_name

Specifies the name of the new stream. If the name is case-sensitive, you must wrap it in double quotes; otherwise, the system uses the lower case name.

#### column\_name

The name of a column to be created in the new stream. If the name is case-sensitive, you must wrap it in double quotes; otherwise, the system uses the lower case name.

#### data\_type

The data type of the column. This can include array specifiers. For more information on the data types supported by DeltaStream, see the [data-types](https://docs.deltastream.io/reference/sql-syntax/data-types "mention") reference.

#### NOT NULL

Defines a constraint on the column, ensuring it cannot contain `NULL` values.

#### WITH (stream\_parameter = value \[, …​ ])

Optionally, this clause specifies [#stream\_parameters](#stream_parameters "mention").

### Stream Parameters <a href="#stream_parameters" id="stream_parameters"></a>

| Parameter Name     | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |
| ------------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `topic`            | <p>Name of the <a data-mention href="../../../../overview/core-concepts/store#entity">#entity</a> that has the data for this stream. If the entity doesn't exist, an entity with this name is created in the corresponding <code>store</code>.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> Lowercase <code>stream\_name</code><br><strong>Type:</strong> String</p>                                                                                                                                                                                                 |
| `store`            | <p>Name of the store that hosts the <a data-mention href="../../../../overview/core-concepts/store#entity">#entity</a> for this stream.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> Current session's store name<br><strong>Type:</strong> String<br><strong>Valid values:</strong> See <a data-mention href="../command/list-stores">list-stores</a>.</p>                                                                                                                                                                                                          |
| `value.format`     | <p>Format of message value in the <a data-mention href="../../../../overview/core-concepts/store#entity">#entity</a>. See <a data-mention href="../data-format-serialization">data-format-serialization</a> for more information regarding serialization formats.<br><br><strong>Required:</strong> Yes<br><strong>Type:</strong> String<br><strong>Valid values:</strong> <code>JSON</code>, <code>AVRO</code>, <code>PROTOBUF</code>, <code>PRIMITIVE</code></p><p><strong>Valid values without column definition:</strong> <code>AVRO</code>, <code>PROTOBUF</code></p>                      |
| `timestamp`        | <p>Name of the column in the stream to use as the timestamp. If not set, the timestamp of the message is used for time based operations such as window aggregations and joins. If the type of this timestamp field is <code>BIGINT</code>, it is expected that the values are epoch milliseconds UTC.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> Record's timestamp<br><strong>Type:</strong> String<br><strong>Valid values:</strong> Must be of type <code>BIGINT</code> or <code>TIMESTAMP</code>. See <a data-mention href="../data-types">data-types</a>.</p> |
| `timestamp.format` | <p>The format to use for <code>TIMESTAMP</code> typed fields. See <a data-mention href="../data-types">data-types</a>.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> <code>sql</code><br><strong>Type:</strong> String<br><strong>Valid values:</strong> <code>sql</code>, <code>iso8601</code></p>                                                                                                                                                                                                                                                                |

### Kafka-Specific Parameters

Parameters to be used if the associated [store](https://docs.deltastream.io/overview/core-concepts/store "mention") is type `KAFKA`:

| Parameter Name          | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
| ----------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `topic.partitions`      | <p>The number of partitions to use when creating the entity, if applicable. If the topic already exist, then this value must be equal to the number of partitions in the existing Kafka entity.<br></p><p><strong>Required:</strong> Yes, unless topic already exists<br><strong>Default value:</strong> Leftmost source relation topic's partition count<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
| `topic.replicas`        | <p>The number of replicas to use when creating the topic, if applicable. If the topic already exists, then this value must be equal to the number of replicas in the existing Kafka entity.<br></p><p><strong>Required:</strong> Yes, unless topic already exists<br><strong>Default values:</strong> Leftmost source relation topic's replica count<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| `kafka.topic.*`         | <p>A configuration specific for the topic being created — for example, <a href="https://kafka.apache.org/documentation/">Kafka Entity Configuration for Confluent Platform</a>.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> Kafka topic configuration specific to the underlying <a data-mention href="../../../overview/core-concepts/store">store</a> type.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| `key.format`            | <p>Format of message key in the <a data-mention href="../../../../overview/core-concepts/store#entity">#entity</a>. This value can be the same as or different from the <code>value.format</code>. See <a data-mention href="../data-format-serialization">data-format-serialization</a> for more information regarding serialization formats.<br><br><strong>Required:</strong> No, unless <code>key.type</code> is set<br><strong>Default value:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> <code>JSON</code>, <code>AVRO</code>, <code>PROTOBUF</code>, <code>PRIMITIVE</code></p><p><strong>Valid values without column definition:</strong> <code>AVRO</code>, <code>PROTOBUF</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
| `key.type`              | <p>Declares the names and data types of key columns. The type is a <code>STRUCT</code> when <code>key.format</code> is a non-primitive value, e.g. <code>'key.type'='STRUCT\<id BIGINT, name VARCHAR>'</code>. For primitive values, the type is one of the <a data-mention href="../../data-types#primitive-data-types">#primitive-data-types</a>, e.g. <code>'key.type'='VARCHAR'</code>.</p><p><strong>Note</strong>: <code>key.type</code> can not be set if <code>key.columns</code> is already set.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> See <code>STRUCT</code> in <a data-mention href="../data-types">data-types</a>.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| `key.columns`           | <p>Specifies the name(s) of the value columns, separated by commas, that will be used to construct the record key.</p><ul><li>For a non-primitive <code>key.format</code>, the record key will be created as a <code>STRUCT</code> whose fields are the columns listed in this property.</li><li>For a primitive <code>key.format</code>, this property must contain exactly one column with a primitive data type.</li><li><strong>Note:</strong> <code>key.columns</code> cannot be set if <code>key.type</code> is already defined.<br></li></ul><p><strong>Required:</strong> No<br><strong>Default:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> One or more valid column names from the relation’s column list, separated by commas.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| `value.columns.exclude` | <p>Specifies the name(s) of the columns, separated by commas, that should be excluded from the record’s value and included only in its key.</p><ul><li>This property can only be set if <code>key.columns</code> is already defined.</li><li>The excluded columns must appear at the end of the relation’s column list and must also be listed in <code>key.columns</code>.</li></ul><p><br><strong>Required:</strong> No<br><strong>Default:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> One or more valid column names from the relation’s column list, separated by commas.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
| `delivery.guarantee`    | <p>The fault tolerance guarantees applied when producing to this stream.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> <code>at\_least\_once</code><br><strong>Type:</strong> String<br><strong>Valid values:</strong></p><ul><li><code>exactly\_once</code>: Produces to the stream using Kafka transactions. These transactions are committed when the query takes a checkpoint. On the consumer side, when setting the <a href="https://kafka.apache.org/documentation/#consumerconfigs_isolation.level">Kafka consumer <code>isolation.level</code> configuration</a> to <code>read\_committed</code>, only the committed records display. Since records aren't committed until the query takes a checkpoint, there is some additional delay when using this setting.</li><li><code>at\_least\_once</code>: Ensures that records are output to the stream at least once. During query checkpointing, the query waits to receive a confirmation of successful writes from the Kafka broker. If there are issues with the query then duplicate records are possible, as the query attempts to reprocess old data.</li><li><code>none</code>: There is no fault tolerance guarantee when producing to the stream. If there are issues on the Kafka broker then records may be lost; if there are issues with the query then output records may be duplicated.</li></ul> |
| `sink.timestamp.column` | <p>Specifies the name of the value column to be used to set the Kafka record’s timestamp when writing to the Kafka sink’s entity. If no timestamp column is specified, the Kafka producer record is created without an explicit timestamp, allowing the sink’s store to assign a timestamp according to its configured policy.</p><p><br><strong>Required:</strong> No<br><strong>Default value:</strong> None<br><strong>Type:</strong> String<br><strong>Valid values:</strong> One of the column names from the sink relation’s column list. Must be of type <code>BIGINT</code> or <code>TIMESTAMP</code> or <code>TIMESTAMP\_LTZ</code>. See <a data-mention href="../data-types">data-types</a>.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |

### Kinesis-Specific Parameters

Parameters to be used if the associated [store](https://docs.deltastream.io/overview/core-concepts/store "mention") is type `KINESIS`:

| Parameter Name | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
| -------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `topic.shards` | <p>The number of shards to use when creating the topic, if applicable. If the topic already exists, then this value must be equal to the number of shards in the existing Kinesis stream.<br></p><p><strong>Required:</strong> Yes, unless topic already exists<br><strong>Default values:</strong> Leftmost source relation topic's shard count<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]<br><strong>Alias:</strong> <code>kinesis.shards</code></p> |

Kinesis stores provide a delivery guarantee of `at_least_once` when producing events into a sink [#entity](https://docs.deltastream.io/overview/core-concepts/store#entity "mention").

### S3-Specific Parameters

The S3 source periodically discovers file splits under the configured paths and emits them for processing. Discovery interval controls how often the connector looks for new S3 objects. On each run it lists keys in the service’s server-side order (for general-purpose buckets, that’s **lexicographic by key**) and emits files to the job, enforcing a configurable **maximum splits per interval** cap. The connector tracks an internal watermark: the last accepted key for each configured prefix. Therefore, any key **less than or equal to** that watermark is considered already processed and is skipped on later runs (and after restarts). To ensure new data is picked up reliably in long-running jobs, write files under key prefixes that **increase lexicographically over time** (e.g., `YYYY/MM/DD/HH/`) and use zero-padded sequence parts (e.g., `part-000001.jsonl`). Avoid backfilling into older, lexicographically smaller prefixes; if needed, place backfills under a **new, higher-sorting** path so they appear after the currently existing content.

\
Parameters to be used if the associated [store](https://docs.deltastream.io/overview/core-concepts/store "mention") is type `S3`:

| Parameter Name                  | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| ------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `s3.discovery.interval.seconds` | <p>Specifies how often (in seconds) the S3 source runs discovery to look for new S3 objects under the configured S3 paths. Note that the discovery lists S3 keys in <strong>service order (</strong>which for general buckets is <strong>lexicographic</strong> by key) and emits a bounded batch of new splits to the job. Discovery progresses based on the <strong>full S3 key order</strong>. New files added “behind” the current content (lexicographically smaller keys, e.g., backfills into older date folders) are not picked up.</p><p><strong>Required:</strong> No<br><strong>Default value:</strong> 10<br><strong>Type:</strong> Long<br><strong>Valid values:</strong> \[1, ...]</p> |
| `s3.max.splits.per.interval`    | <p>An upper bound on how many file split<strong>s</strong> the connector will emit per discovery interval run.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> 20000<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                                                                                                                                                                                                                                                                                                                                                       |

### Format-Specific Parameters

#### Avro

Parameters to be used when writing records into a stream if associated `key.format` or `value.format` is `avro` and the default Avro schema generation must be changed using a base schema for the key and/or value.

When generating an Avro schema for a column using a base schema:

* if the base schema has a field with the same name and data type as that of the column, then the field's definition from the base is used in the generated schema. This includes retaining the base schema's `doc` and `logicalType` for the field.
* if the base schema has a field with the same name as that of the column, but has a different data type, then an Avro schema type definition is generated from the column's data type with the field's `doc` taken from the its corresponding field in the base schema.

{% hint style="info" %}
**Notes**

* Currently supported schema registries are Confluent Cloud and Confluent Platform.
* Known limitation: Confluent Schema Registry must use the default [TopicNameStrategy](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#subject-name-strategy) for creating subject names. See [create-schema\_registry](https://docs.deltastream.io/reference/sql-syntax/ddl/create-schema_registry "mention") for more details.
  {% endhint %}

| Parameter Name            | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          |
| ------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `avro.base.schema.store`  | <p>Name of the store whose <a href="../../../../overview/core-concepts/store#schema-registry">schema registry</a> contains the Avro schema subject(s) to be used as the base schema for generating Avro schema for stream's key and/or value.<br></p><p><strong>Required:</strong> No<br><strong>Default values:</strong> Current session's store name<br><strong>Type:</strong> Identifier<br><strong>Valid values:</strong> See <a data-mention href="../command/list-stores">list-stores</a>.</p> |
| `avro.base.subject.key`   | <p>Name of the subject in the <a href="../../../../overview/core-concepts/store#schema-registry">schema registry</a> to obtain the base schema for generating Avro schema for stream's key.</p><p><strong>Required:</strong> No, unless <code>key.format</code> is set to <code>avro</code> and <code>key.type</code> is defined.<br><strong>Type:</strong> String</p>                                                                                                                               |
| `avro.base.subject.value` | <p>Name of the subject in the <a href="../../../../overview/core-concepts/store#schema-registry">schema registry</a> to obtain the base schema for generating Avro schema for stream's value columns.</p><p><strong>Required:</strong> No, unless <code>value.format</code> is set to <code>avro</code> .<br><strong>Type:</strong> String</p>                                                                                                                                                       |

## Examples <a href="#example" id="example"></a>

#### Create a new stream with passthrough configuration for retention

```sql
CREATE STREAM customers (
  ts BIGINT, customer_id VARCHAR, full_name BIGINT, region VARCHAR
) WITH (
  'store' = 'kafka_store',
  'topic.partitions' = 1, 
  'topic.replicas' = 2, 
  'kafka.topic.retention.ms' = '172800000');
```

#### Create a new stream with timestamp column and key/value formats

The following creates a new stream with name `pageviews_json`. This stream reads from an existing topic named `pageviews` in the default store `demostore`, and has a `value.format` of `JSON`. Additionally in the `WITH` clause, we specify that this stream has a key of type `VARCHAR` and uses the `viewtime` column as its `timestamp`:

```sh
demodb.public/demostore# LIST ENTITIES;
+------------------------------------------+------------+
|  Name                                    |  Is Leaf   |
+==========================================+============+
| pageviews                                | true       |
+------------------------------------------+------------+
demodb.public/demostore# CREATE STREAM pageviews_json (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR
) WITH (
   'topic'='pageviews',
   'value.format'='json',
   'key.format'='primitive',
   'key.type'='VARCHAR',
   'timestamp'='viewtime');
+------------+------------------------------+------------+------------------------------------------+
|  Type      |  Name                        |  Command   |  Summary                                 |
+============+==============================+============+==========================================+
| stream     | demodb.public.pageviews_json | CREATE     | stream "pageviews_json" was              |
|            |                              |            | successfully created                     |
+------------+------------------------------+------------+------------------------------------------+
```

#### Create a new stream in a specific store

The following creates a new stream `pv_kinesis`. This stream reads from an existing topic named `pageviews` in the store `kinesis_store`:

```sh
demodb.public/demostore# CREATE STREAM pv_kinesis (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR
) WITH ( 'topic'='pageviews', 'store'='kinesis_store', 'value.format'='json' );
+------------+--------------------------+------------+------------------------------------------+
|  Type      |  Name                    |  Command   |  Summary                                 |
+============+==========================+============+==========================================+
| stream     | demodb.public.pv_kinesis | CREATE     | stream "pv_kinesis" was successfully     |
|            |                          |            | created                                  |
+------------+--------------------------+------------+------------------------------------------+
demodb.public/demostore# LIST STREAMS;
+------------+------------+------------+------------+------------------------------------------+-------------------------------+-------------------------------+
|  Name      |  Type      |  Owner     |  State     |  Properties                              |  Created At                   |  Updated At                   |
+============+============+============+============+==========================================+===============================+===============================+
| pv_kinesis | stream     | sysadmin   | created    | { "value.format": "json", "topic":       | 2024-07-02 21:28:35 +0000 UTC | 2024-07-02 21:28:36 +0000 UTC |
|            |            |            |            | "pageviews", "store":                    |                               |                               |
|            |            |            |            | "kinesis_store" }                        |                               |                               |
+------------+------------+------------+------------+------------------------------------------+-------------------------------+-------------------------------+
```

#### Create a new stream without an existing entity

The following creates a new stream `visit_count`. As its corresponding topic doesn't exist in the store `kinesis_store`, it requires an additional topic parameter — for example, `topic.shards` — to create the new Kinesis data stream `pv_count` in the store:

```sh
demodb.public/kinesis_store# LIST ENTITIES;
+------------------------------------------+------------+
|  Name                                    |  Is Leaf   |
+==========================================+============+
| pageviews                                | true       |
+------------------------------------------+------------+                  

demodb.public/kinesis_store# CREATE STREAM visit_count (userid VARCHAR, pgcount BIGINT) WITH ('topic'='pv_count', 'value.format'='json', 'topic.shards' = 1);
+------------+---------------------------+------------+------------------------------------------+
|  Type      |  Name                     |  Command   |  Summary                                 |
+============+===========================+============+==========================================+
| stream     | demodb.public.visit_count | CREATE     | stream "visit_count" was successfully    |
|            |                           |            | created                                  |
+------------+---------------------------+------------+------------------------------------------+
demodb.public/kinesis_store# LIST ENTITIES;
+------------------------------------------+------------+
|  Name                                    |  Is Leaf   |
+==========================================+============+
| pageviews                                | true       |
| pv_count                                 | true       |
+------------------------------------------+------------+
  
demodb.public/kinesis_store# DESCRIBE ENTITY visit_count;
+------------------+------------+-------------+
|  Name            |  Shards    |  Descriptor |
+==================+============+=============+
| pageviews_one_kb | 1          | <null>      |
+------------------+------------+-------------+

demodb.public/kinesis_store# LIST STREAMS;
+-------------+------------+------------+------------+------------------------------------------+-------------------------------+-------------------------------+
|  Name       |  Type      |  Owner     |  State     |  Properties                              |  Created At                   |  Updated At                   |
+=============+============+============+============+==========================================+===============================+===============================+
| visit_count | stream     | sysadmin   | created    | { "topic.shards": 1, "value.format":     | 2024-07-02 21:32:20 +0000 UTC | 2024-07-02 21:32:32 +0000 UTC |
|             |            |            |            | "json", "topic": "pv_count", "store":    |                               |                               |
|             |            |            |            | "kinesis_iam_store" }                    |                               |                               |
+-------------+------------+------------+------------+------------------------------------------+-------------------------------+-------------------------------+
```

#### Create a new stream for an existing entity

The following creates a new `users` stream for the existing `users` [#entity](https://docs.deltastream.io/overview/core-concepts/store#entity "mention") in the current [store](https://docs.deltastream.io/overview/core-concepts/store "mention"). This DDL implies that the name of the stream should be used as the name of the entity that hosts the records. This DDL also implies the original structure for the `users` entity:

```sql
CREATE STREAM "users" (
    registertime BIGINT,
    userid VARCHAR,
    regionid VARCHAR,
    gender VARCHAR,
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<
        phone VARCHAR,
        city VARCHAR,
        "state" VARCHAR,
        zipcode VARCHAR>
) WITH ( 'value.format'='json' );
```

#### Create a new stream with case-sensitive columns

The following creates a new stream `CaseSensitivePV` in the database `DataBase` and schema `Schema2`. This stream reads from a topic named `case_sensitive_pageviews` in store `OtherStore` and has a `value.format` of AVRO and `key.format` of PROTOBUF. As the `key.format` is included, `key.type` must be provided and the value in this example is `STRUCT<pageid VARCHAR>`.

Note that many of the columns are in quotes, indicating they are case-sensitive. The case insensitive column named `CaseInsensitiveCol` is lowercase as `caseinsensitivecol` when the relation is created. In the parameters, the `timestamp` for this relation is also specified; queries processing data using this relation as the source refer to the `timestamp` column `ViewTime` as the event's timestamp.

```sql
CREATE STREAM "DataBase"."Schema2"."CaseSensitivePV" (
   "ViewTime" BIGINT,
   "userID" VARCHAR,
   "PageId" VARCHAR,
   "CaseSensitiveCol" BIGINT,
   CaseInsensitiveCol BIGINT
)
WITH (
   'topic'='case_sensitive_pageviews',
   'store'='OtherStore',
   'value.format'='avro',
   'key.format'='protobuf',
   'key.type'='STRUCT<pageid VARCHAR>',
   'timestamp'='ViewTime'
);
```

#### Create a new stream with \`NOT NULL\` column

The following creates a new stream, `users`. Two columns in this stream are defined with the `NOT NULL` constraint:

1. `registertime`
2. `contactinfo`

This means in any valid record from this stream, these two columns are not allowed to contain null values.

```sql
CREATE STREAM "users" (
    registertime BIGINT NOT NULL,
    userid VARCHAR, 
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR> NOT NULL
)
WITH (
   'topic'='users', 
    'key.format'='json', 
    'key.type'='STRUCT<userid VARCHAR>', 
    'value.format'='json'
);
```

#### Create a new stream with format-specific properties for Avro

The following creates a new stream, `usersInfo,`. The key and value of the records in this stream are in `avro` format. The stream uses subjects from a store called `sr_store` as the base Avro schema to generate Avro schema for `usersInfo`'s key and value. `users_data-key` subject is used to generate the key's Avro schema; the `users_data-value` subject is used to generate the value's Avro schema for the records written into `usersInfo.`

```sql
CREATE STREAM "usersInfo" (
    registertime BIGINT NOT NULL,
    userid VARCHAR, 
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR> NOT NULL
)
WITH (
    'topic'='usersInfo', 
    'key.format'='avro',
    'value.format'='avro',
    'avro.base.store.name' = sr_store,
    'avro.base.subject.key' = 'users_data-key',
    'avro.base.subject.value' = 'users_data-value'
);
```

#### Create a new stream with data in S3

The following creates a pageviews stream with data backed by an S3 store. The connector runs discovery to find new S3 objects every 15 seconds and within each interval it processes up to 25000 files.

```sql
CREATE STREAM pageviews_s3 (
    viewtime BIGINT, 
    userid VARCHAR, 
    pageid VARCHAR
) WITH (
    'store' = 's3_store', 
    's3.uri'='s3://ctan-playground-data/jsonl/', 
    's3.discovery.interval.seconds'=15,
    's3.max.splits.per.interval'=25000,
    'value.format'='jsonl'
);
```

**Notes:**

* `s3.uri` is required
* `value.format`: options\[jsonl, json]
* `s3.discovery.interval.seconds`: optional. Default = 10
* `s3.max.splits.per.interval`: optional. Default = 20,000

#### Create a new stream with key columns

The following creates a pageviews stream. The key and value of records in this stream are both in `json` format. Value consists of 3 columns: `viewtime`, `userid` and `pageid`. Key is a `STRUCT` with two fields: `userid` and `pageid` whose values are coming from the corresponding columns.

```sql
CREATE STREAM pageviews (
    viewtime BIGINT, 
    userid VARCHAR, 
    pageid VARCHAR
) WITH (
    'store' = 'kafka_store',
    'topic' = 'pageviews',
    'value.format' = 'json',
    'key.format' = 'json', 
    'key.columns'='userid,pageid'
);
```

#### Create a new stream with key columns and value exclude columns

The following creates a pageviews stream. The key and value of records in this stream are both in `json` format. Key is a `STRUCT` with two fields: `userid` and `pageid` whose values are coming from the corresponding columns. Given that `pageid` is set to be an excluded column from value, value in each record consists of 2 columns: `viewtime`, `userid` .

```sql
CREATE STREAM pageviews (
    viewtime BIGINT, 
    userid VARCHAR, 
    pageid VARCHAR
) WITH (
    'store' = 'kafka_store',
    'topic' = 'pageviews',
    'value.format' = 'json',
    'key.format' = 'json', 
    'key.columns'='userid,pageid',
    'value.columns.exclude'='pageid'
);
```

#### Create a new stream without column definitions for Protobuf data

The following creates a pageviews Stream. The value format of records in this Stream is Protobuf. The 'pageviews' topic should exist as an entity in Deltastream and it should have a Protobuf value [descriptor](https://docs.deltastream.io/reference/sql-syntax/ddl/create-descriptor_source).

```sql
CREATE STREAM pageviewsProto  WITH (
    'store' = 'kafka_store',
    'topic' = 'pageviewsProto',
    'value.format' = 'protobut'
);
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.deltastream.io/reference/sql-syntax/ddl/create-stream.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
