# FROM

## Syntax

```sql
FROM relation_reference [[AS] alias]
    [JOIN relation_reference [[AS] alias]]
    [WITH (source_property = value [, ...])]
```

## Description

The `FROM` clause specifies the source to use in a [SELECT](/reference/sql-syntax/query/select.md) statement. It can refer to a [Database](/overview/core-concepts/databases.md#relation), the result of a window function, or a `JOIN` of two Relations (Stream/Stream or Stream/Changelog) that combines records based on a given join criteria. This clause can also be another [SELECT](/reference/sql-syntax/query/select.md) for executing additional processing logic on the original source events before applying the rest of the operators/clauses on the processed events. See [#sub-queries](#sub-queries "mention") examples below.

See [#window-function](#window-function "mention") below for more information about window functions.

See [JOIN](/reference/sql-syntax/query/select/join.md) for more information about joining two Relations as sources in a statement.

### Arguments

#### relation\_reference

The reference to a source in the following syntax:

```sql
relation_name
  [MATCH_RECOGNIZE]
| window_function
| ( select_statement )
```

where, `relation_name` can be specified as fully or partially qualified names via specifying the `database_name` and/or `schema_name` in the format `[<database_name>.<schema_name>.]<relation_name>`, e.g. `db1.public.pageviews`. Otherwise, the current Database and Schema will be used to identify the Relation. For case-sensitive names, the name must be wrapped in double quotes; otherwise, the lowercase version of the name will be used.

See also [MATCH\_RECOGNIZE](/reference/sql-syntax/query/select/match_recognize.md).

#### WITH (source\_property = value \[, …​])

This optional clause specifies the [#source\_properties](#source_properties "mention") along with any [Data Store](/overview/core-concepts/store.md) specific ones documented below.

### Source Properties <a href="#source_properties" id="source_properties"></a>

| Property Name                            | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| ---------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `timestamp`                              | <p>This property specifies a column name in the source Relation that will be used for event-time processing. If this property is not provided, the <code>timestamp</code> associated with the <a data-mention href="/pages/6UJYcUVtorC4Xmf9nEwB#_relation">/pages/6UJYcUVtorC4Xmf9nEwB#\_relation</a>’s records will be used. The Relation’s <code>timestamp</code> column can be specified when the Relation is created; otherwise, the record’s timestamp will be used as default. If the type of this timestamp field is <code>BIGINT</code>, it is expected that the values are epoch milliseconds UTC.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> Records’ timestamp<br><strong>Type:</strong> String<br><strong>Valid values:</strong> Must be of type <code>BIGINT</code> or <code>TIMESTAMP</code>. See <a data-mention href="/pages/RVSlBTxhanmYZDFNrAJB">/pages/RVSlBTxhanmYZDFNrAJB</a>.</p>                                                                                                                                                                                                                                                                                                                                                                                        |
| `timestamp.format`                       | <p>The format to use for <code>TIMESTAMP</code> typed fields. See <a data-mention href="/pages/RVSlBTxhanmYZDFNrAJB">/pages/RVSlBTxhanmYZDFNrAJB</a>.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> <code>sql</code><br><strong>Type:</strong> String<br><strong>Valid values:</strong> <code>sql</code>, <code>iso8601</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| `source.allow.latency.millis`            | <p>This property sets an upper bound, as a long value, on how late a record can be in milliseconds. Lateness is calculated based on the greatest chronological timestamp that has been read so far. A record that arrives with a timestamp (<code>t</code>) that is less than the difference between (<code>a</code>) the greatest chronological timestamp seen thus far and (<code>b</code>) the number of milliseconds specified by this property will be discarded (i.e. if <code>t < a - b</code> then the record with the timestamp <code>t</code> will be ignored because it is deemed too late). This property only applies for queries that are stateful and deal with windows.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> 10000<br><strong>Type:</strong> Long<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
| `source.idle.timeout.millis`             | <p>This property sets an upper bound, as a long value, on the length of time a source can remain without new events before marking this source as idle. In the case that one or more sources do not carry events while another source still has events, this can cause the query to stall as windows potentially created by stateful queries cannot be closed. A source marked as idle is not considered for the query determining if a window can be closed, and thus it won’t stall the query.<br></p><p><strong>Required:</strong> No<br><strong>Default:</strong> None<br><strong>Type:</strong> Long<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| `source.deserialization.error.handling`  | <p>This property sets the strategy for how the query should handle invalid data when reading records in the <a data-mention href="/pages/khAma1ENd997ICjDGcJW#entity">/pages/khAma1ENd997ICjDGcJW#entity</a> of the source <a data-mention href="/pages/6UJYcUVtorC4Xmf9nEwB#_relation">/pages/6UJYcUVtorC4Xmf9nEwB#\_relation</a>. For a detailed explanation with examples, see <a data-mention href="/pages/dAaxWLYxuFYAaz5kgy6C">/pages/dAaxWLYxuFYAaz5kgy6C</a><br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> <code>TERMINATE</code><br><strong>Type:</strong> String<br><strong>Valid values:</strong></p><ul><li><code>TERMINATE</code>: When an error occurs when attempting to read records from the source, fail the query and return an error message to the user.</li><li><code>IGNORE</code>: When an error occurs when attempting to read records from the source, skip any records that fail deserialization and continue executing the query.</li><li><code>IGNORE\_AND\_LOG</code>: When an error occurs when attempting to read records from the source, skip and log any records that fail deserialization then continue executing the query. When this value is being used, the property <code>source.deserialization.error.log.topic</code> must also be provided.</li></ul> |
| `source.deserialization.error.log.topic` | <p>This property specifies the name of the topic to use for logging records that a query's source has failed to deserialize when that query's source has the property <code>source.deserialization.error.handling</code> set to <code>IGNORE\_AND\_LOG</code>. The format for the records in this error log will depend on the underlying store type of the source Relation. If the topic already exists then it will be used. If it does not exist, it will be created with whichever additional properties are relevant for creation, depending on the error log topic's store.<br></p><p><strong>Required:</strong> No, unless <code>source.deserialization.error.handling</code> is <code>IGNORE\_AND\_LOG</code><br><strong>Default value:</strong> None<br><strong>Type:</strong> String</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |
| `source.deserialization.error.log.store` | <p>This property specifies the name of the store associated with the <code>source.deserialization.error.log.topic</code> property.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> Store name of the source Relation<br><strong>Type:</strong> String<br><strong>Valid values:</strong> Store name belonging to a Kafka or Kinesis typed Store.</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| `flink.sql.state.ttl`                    | <p>This property specifies the idle state retention time for a source relation used in a stateful query such as a regular join or a group aggregation. When the state associated with a source's key has not been updated for the specified duration, it is considered idle and will be cleaned up. This is useful for controlling memory usage in long-running stateful queries where keys may become inactive over time. Each source in a query can specify a different TTL value by including this property in its respective <code>WITH</code> clause. The value must be a valid duration string.<br><br><strong>Required</strong>: No<br><strong>Default value</strong>: None (falls back to the pipeline-level state TTL configuration)<br><strong>Type:</strong> String<br><strong>Valid values</strong>: A duration string consisting of a numeric value followed by a time unit. Supported units are <code>ms</code> (milliseconds), <code>s</code> (seconds), <code>min</code> (minutes), <code>h</code> (hours), <code>d</code> (days). Examples: <code>'300s'</code>, <code>'5min'</code>, <code>'1h'</code>, <code>'1d'</code>.</p>                                                                                                                                                                               |

### **Kafka Specific Source Properties**

| Property Name                                       | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| --------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `starting.position`                                 | <p>This property sets the strategy for how the query should start reading from the <a data-mention href="/pages/khAma1ENd997ICjDGcJW#entity">/pages/khAma1ENd997ICjDGcJW#entity</a> of the source <a data-mention href="/pages/6UJYcUVtorC4Xmf9nEwB#relation">/pages/6UJYcUVtorC4Xmf9nEwB#relation</a>.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong></p><ul><li><code>latest</code> for <a data-mention href="/pages/6UJYcUVtorC4Xmf9nEwB#stream">/pages/6UJYcUVtorC4Xmf9nEwB#stream</a></li><li><code>earliest</code> for <a data-mention href="/pages/6UJYcUVtorC4Xmf9nEwB#changelog">/pages/6UJYcUVtorC4Xmf9nEwB#changelog</a></li></ul><p><strong>Type:</strong> String<br><strong>Valid values:</strong></p><ul><li><code>earliest</code>: Start reading from the earliest available offsets in each partition of the Kafka topic.</li><li><code>latest</code>: Start reading newly arriving data for each partition of the Kafka topic.</li><li><code>offsets</code>: Start reading from the partition offsets specified by the <code>starting.position.offsets</code> property. Note that if this mode is being used, the <code>starting.position.offsets</code> property must be provided.</li><li><code>timestamp</code>: Start reading from the timestamp specified by the <code>starting.position.timestamp.ms</code> property for each partition of the Kafka topic. The timestamp corresponds with the Kafka records’ timestamp. Note that if this mode is being used, the <code>starting.position.timestamp.ms</code> property must be provided.</li></ul> |
| `starting.position.offsets`                         | <p>This property specifies the partition-offset pairs from which the corresponding Kafka topic for the <a data-mention href="/pages/6UJYcUVtorC4Xmf9nEwB#_relation">/pages/6UJYcUVtorC4Xmf9nEwB#\_relation</a> will be read for this statement. This property must be provided when the <code>starting.position</code> property is set to <code>offsets</code>. The format for passing partition-offset values follows a <code>;</code> separated list of <code>partition:\<partition\_num>,offset:\<offset\_num></code>. For example, to start reading from the first offset of partition 0 and the third offset of partition 3, it’d be set to <code>partition:0,offset:1;partition:1:offset:3</code>.<br></p><p><strong>Required:</strong> No, unless <code>starting.position</code> is <code>offsets</code></p><p><strong>Default value:</strong> None</p><p><strong>Type:</strong> String<br><strong>Valid values:</strong> <code>;</code> separated list of <code>partition:\<partition\_num>,offset:\<offset\_num></code>. Both <code>partition\_num</code> and <code>offset\_num</code> must be in the range of \[1, ...].</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| `starting.position.timestamp.ms`                    | <p>This property specifies the timestamp from which the Kafka topic for the <a data-mention href="/pages/6UJYcUVtorC4Xmf9nEwB#_relation">/pages/6UJYcUVtorC4Xmf9nEwB#\_relation</a> will be read for this statement. The timestamp value is in epoch-milliseconds, which is measured as the number of milliseconds since <code>January 1, 1970 00:00:00.000 GMT</code>.<br></p><p><strong>Required:</strong> No, unless <code>starting.position</code> is <code>timestamp</code><br><strong>Default value:</strong> None<br><strong>Type:</strong> Long<br><strong>Valid values:</strong> \[1, …]</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| `source.deserialization.error.log.topic.partitions` | <p>This property specifies the number of partitions the topic associated with the <code>source.deserialization.error.log.topic</code> property should have.<br></p><p><strong>Required:</strong> No, unless <code>source.deserialization.error.log.topic</code> doesn't exist<br><strong>Default value:</strong> None<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |
| `source.deserialization.error.log.topic.replicas`   | <p>This property specifies the number of replicas the topic associated with the <code>source.deserialization.error.log.topic</code> property should have.<br></p><p><strong>Required:</strong> No, unless <code>source.deserialization.error.log.topic</code> doesn't exist<br><strong>Default value:</strong> None<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| `source.kafka.partition.discovery.millis`           | <p>This property specifies the milliseconds interval to discover new partitions in the Kafka topic. By default this property is set to 10000, meaning every 10 seconds the query will look for new partitions to consume from. If disabled, the query will not consume from partitions added while the query is running.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> 10000<br><strong>Type:</strong> Long<br><strong>Valid values:</strong> \[1, ...] or -1 to disable</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |

### Kinesis Specific Source Properties

| Property Name                                   | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| ----------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `starting.position`                             | <p>This property sets the strategy for how the query should start reading from the <a data-mention href="/pages/khAma1ENd997ICjDGcJW#entity">/pages/khAma1ENd997ICjDGcJW#entity</a> of the source <a data-mention href="/pages/6UJYcUVtorC4Xmf9nEwB#relation">/pages/6UJYcUVtorC4Xmf9nEwB#relation</a>.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong></p><ul><li><code>latest</code> for <a data-mention href="/pages/6UJYcUVtorC4Xmf9nEwB#stream">/pages/6UJYcUVtorC4Xmf9nEwB#stream</a></li><li><code>earliest</code> for <a data-mention href="/pages/6UJYcUVtorC4Xmf9nEwB#changelog">/pages/6UJYcUVtorC4Xmf9nEwB#changelog</a></li></ul><p><strong>Type:</strong> String<br><strong>Valid values:</strong></p><ul><li><code>earliest</code>: Start reading from the earliest available sequence in each shard of the Kinesis data stream.</li><li><code>latest</code>: Start reading newly arriving data for each shard of the Kinesis data stream.</li><li><code>timestamp</code>: Start reading from the timestamp specified by the <code>starting.position.timestamp.ms</code> property for each shard of the Kinesis data stream. The timestamp corresponds with the Kinesis records’ timestamp. Note that if this mode is being used, the <code>starting.position.timestamp.ms</code> property must be provided.</li></ul> |
| `starting.position.timestamp.ms`                | <p>This property specifies the timestamp from which the Kinesis data stream for the <a data-mention href="/pages/6UJYcUVtorC4Xmf9nEwB#_relation">/pages/6UJYcUVtorC4Xmf9nEwB#\_relation</a> will be read for this statement. The timestamp value is in epoch-milliseconds, which is measured as the number of milliseconds since <code>January 1, 1970 00:00:00.000 GMT</code>.<br></p><p><strong>Required:</strong> No, unless <code>starting.position</code> is <code>timestamp</code><br><strong>Default value:</strong> None<br><strong>Type:</strong> Long<br><strong>Valid values:</strong> \[1, …]</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| `source.deserialization.error.log.topic.shards` | <p>This property specifies the number of shards the stream associated with the <code>source.deserialization.error.log.topic</code> property should have.<br></p><p><strong>Required:</strong> No, unless <code>source.deserialization.error.log.topic</code> doesn't exist<br><strong>Default value:</strong> None<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |

## Window Function

Using a window function, records are split into finite size batches. Each record can belong to one or multiple windows, depending on the window type. A window defines a time interval and has a start time and an end time. A record belongs to a window instance if its timestamp is in between the start time and end time of the window. By applying a window on a Relation, logically a new Relation is created that includes all columns of the original Relation as well as two additional columns named `window_start` and `window_end`. These two columns can be used in the `SELECT` and/or `WHERE` clauses, similar to original Relation columns. Moreover, they can be added to the `GROUP BY` clause to group records according to a time-based window. When defining any of the window types, `OFFSET` is an optional parameter. If added, `OFFSET` specifies the time offset, which the start of a window should be shifted by. Currently, three types of windows are supported.

### HOP Window

#### Syntax

```sql
FROM HOP (relation_name, SIZE time_unit, ADVANCE BY time_unit)
   [WITH (source_property = value [, ...])]
```

A hopping window models a fixed-size time interval and is defined by two parameters: `SIZE`, which defines a window’s time duration, and the `ADVANCE BY` interval, which defines how much a window moves forward in time relative to the previous window instance. Depending on the `ADVANCE BY` value, `HOP` window instances could overlap with each other. Therefore, a given record could belong to one or more such window instances according to its timestamp. When using a `HOP` window, the `SIZE` must be an integral multiple of `ADVANCE BY`.

The optional `WITH (source_property = value [, ...])` clause specifies [#source\_properties](#source_properties "mention").

#### Example

#### Simple HOP with SIZE and ADVANCE

Let’s use the following input records for this example:

```
+----------+--------+---------+
| viewtime | userid | pageid  |
+----------+--------+---------+
|     1000 | User_2 | Page_72 |
|     2000 | User_5 | Page_12 |
|     3000 | User_2 | Page_34 |
|    91000 | User_2 | Page_10 |
|    92000 | User_2 | Page_44 |
|   121000 | User_5 | Page_67 |
|   122000 | User_2 | Page_93 |
|   160000 | User_5 | Page_3  |
+----------+--------+---------+
```

When the following `SELECT` statement is run:

```sql
SELECT 
  window_start, 
  window_end,
  userid, 
  COUNT(pageid) AS pgcnt 
FROM HOP (pageviews, SIZE 1 minute, ADVANCE BY 30 second)
WITH ('timestamp'='viewtime')
GROUP BY window_start, window_end, userid;
```

We get the following output:

```
+---------------------+---------------------+--------+-------+
|    window_start     |     window_end      | userid | pgcnt |
+---------------------+---------------------+--------+-------+
| 1970-01-01T00:00:00 | 1970-01-01T00:01:00 | User_2 |     2 |
| 1970-01-01T00:00:00 | 1970-01-01T00:01:00 | User_5 |     1 |
| 1970-01-01T00:01:00 | 1970-01-01T00:02:00 | User_2 |     2 |
| 1970-01-01T00:01:30 | 1970-01-01T00:02:30 | User_2 |     3 |
| 1970-01-01T00:01:30 | 1970-01-01T00:02:30 | User_5 |     1 |
+---------------------+---------------------+--------+-------+
```

**NOTE:** The output results in this example are not fully deterministic as factors such as late arriving data can alter the results.

The above example shows a sample of outputs for this aggregation query. Since this is a streaming query, more records will be outputted as data arrives in the source. Note that the `viewtime` column, which is being used as the timestamp for the `pageviews` source, represents milliseconds since epoch. Looking at the output records, we can see the `window_start` and `window_end` times match the `HOP` window’s 1 minute size, and the `pgcnt` value for each `userid` corresponds with the number of input records with the same `userid` whose `viewtime` falls within that window. Notice there are no output records with a `window_start=1970-01-01T00:00:30` and `window_end=1970-01-01T00:01:30`. This is because there are no records from the source whose `viewtime` value falls within that window and thus no data to output.

#### HOP window with GROUP BY and HAVING clauses <a href="#group_by_clause" id="group_by_clause"></a>

The following aggregates the number of unique pages a particular `userid` visits in a 2 minute window, then only emit results having a count greater than 1. With a `HOP` window, since the `ADVANCE BY` parameter is 30 seconds, a new 2 minute window is created every 30 seconds.

```sql
SELECT 
  window_start, 
  window_end,
  userid, 
  COUNT(pageid) AS pgcnt 
FROM HOP(pageviews, size 2 minutes, advance by 30 seconds) 
GROUP BY window_start, window_end, userid 
HAVING COUNT(pageid) > 1;
```

### TUMBLE Window

#### Syntax

```sql
FROM TUMBLE (relation_name, SIZE time_unit)
   [WITH (source_property = value [, ...])]
```

A `TUMBLE` window shows a fixed-size time interval. It models non-overlapping instances of a window with no gaps. A `TUMBLE` window is defined by a single parameter: `SIZE`, which defines a window’s time duration. You can think of a `TUMBLE` window function as a special case of a `HOP` window function in which `SIZE` and `ADVANCE BY` values are equal. Each record belongs to only one instance of a `TUMBLE` window according to its timestamp.

#### Example

#### Simple TUMBLE window with SIZE

Assume a `pageviews` [Database](/overview/core-concepts/databases.md#stream) has been created by the following DDL:

```sql
CREATE STREAM pageviews (
  viewtime BIGINT,
  userid VARCHAR,
  pageid VARCHAR
) WITH (
  'topic' = 'pageviews'
);
```

Let’s use the following input records for this example:

```
+----------+--------+---------+
| viewtime | userid | pageid  |
+----------+--------+---------+
|     1000 | User_2 | Page_72 |
|     2000 | User_5 | Page_12 |
|     3000 | User_2 | Page_34 |
|    61000 | User_5 | Page_67 |
|    62000 | User_2 | Page_67 |
|    91000 | User_2 | Page_10 |
+----------+--------+---------+
```

When the following `SELECT` statement is run:

```sql
SELECT 
  window_start,
  window_end,
  userid,
  COUNT(*) AS cnt
FROM TUMBLE (pageviews, SIZE 30 seconds)
WITH ('timestamp'='viewtime')
GROUP BY window_start, window_end, userid;
```

We get the following output:

```
+---------------------+---------------------+--------+-----+
|    window_start     |     window_end      | userid | cnt |
+---------------------+---------------------+--------+-----+
| 1970-01-01T00:00:00 | 1970-01-01T00:00:30 | User_2 |   2 |
| 1970-01-01T00:00:00 | 1970-01-01T00:00:30 | User_5 |   1 |
| 1970-01-01T00:01:00 | 1970-01-01T00:01:30 | User_2 |   1 |
| 1970-01-01T00:01:00 | 1970-01-01T00:01:30 | User_5 |   1 |
+---------------------+---------------------+--------+-----+
```

**NOTE:** The output results in this example are not fully deterministic as factors such as late arriving data can alter results.

The above example shows a sample of outputs for this aggregation query. Since this is a streaming query, more records will be outputted as data arrives in the source. Note that the `viewtime` column, which is being used as the timestamp for the `pageviews` source, represents milliseconds since epoch. Looking at the output records, we can see the `window_start` and `window_end` times match the `TUMBLE` window’s 30 second size, and the `cnt` value for each `userid` corresponds with the number of input records with the same `userid` whose `viewtime` falls within that window. Notice there are no output records with a `window_start=1970-01-01T00:00:30` and `window_end=1970-01-01T00:01:00`. This is because there are no records from the source whose `viewtime` value falls within that window and thus no data to output.

#### TUMBLE window with WHERE and GROUP BY clauses <a href="#window_clause" id="window_clause"></a>

The following aggregates the number of unique pages a particular userid visits in a 10 second window. With a `TUMBLE` window, a new 10 second window is opened at the end of the previous one, so no windows overlap.

```sql
SELECT 
  window_start,
  window_end,
  userid,
  COUNT(pageid) AS cnt
FROM TUMBLE (pageviews, SIZE 10 seconds)
WHERE userid != 'User_6' 
GROUP BY window_start, window_end, userid;
```

#### TUMBLE window with shorter allow latency

The following aggregates the number of unique pages a particular userid visits in a 10 second window\.. It also sets source properties to shorten the allow latency millis, indicating to the query to discard data arriving 1 second late. In this example, if the greatest event timestamp ingested by the query thus far is `10000ms`, then any events with a timestamp of less than `9000ms` will not be processed.

```sql
SELECT 
  userid, 
  COUNT(pageid) AS pgcnt, 
  window_start,
  window_end
FROM TUMBLE(pageviews, size 10 seconds)
WITH (
  'source.allow.latency.millis' = '1000'
)
GROUP BY userid, window_start, window_end;
```

### CUMULATE Window

#### Syntax

```sql
FROM CUMULATE (relation_name, SIZE time_unit, STEP time_unit)
   [WITH (source_property = value [, ...])]
```

A `CUMULATE` window defines a fixed-size time interval where each window itself is split into fixed-size windows all with the same `window_start`. A `CUMULATE` window is defined by two parameters: `SIZE`, which defines a window’s max time duration, and `STEP`, which defines an increased window size between the end of sequential `CUMULATE` windows within a given window. You can think of a `CUMULATE` window function as a `TUMBLE` window function, of width `SIZE`, which itself is split into smaller windows with increasing size that all have the same start as `window_start`. When defining a `CUMULATE` window, `SIZE` must be an integral multiple of `STEP`.

#### Example

#### Simple CUMULATE window with SIZE and STEP

Let’s use the `pageviews` [Database](/overview/core-concepts/databases.md#stream) that's created using this DDL:

```sql
CREATE STREAM pageviews (
  viewtime BIGINT,
  userid VARCHAR,
  pageid VARCHAR
) WITH (
  'topic' = 'pageviews'
);
```

with the following input records for this example:

```
+----------+--------+---------+
| viewtime | userid | pageid  |
+----------+--------+---------+
|     1000 | User_2 | Page_72 |
|     2000 | User_5 | Page_12 |
|     3000 | User_2 | Page_34 |
|    51000 | User_5 | Page_67 |
|    52000 | User_2 | Page_67 |
|    91000 | User_2 | Page_10 |
|   101000 | User_2 | Page_88 |
+----------+--------+---------+
```

When the following `SELECT` statement is run:

```sql
SELECT 
  window_start,
  window_end,
  userid,
  COUNT(*) AS cnt
FROM CUMULATE (pageviews, SIZE 1 minutes, STEP 20 seconds)
WITH ('timestamp'='viewtime')
GROUP BY window_start, window_end, userid;
```

We get the following output:

```
+---------------------+---------------------+--------+-----+
|    window_start     |     window_end      | userid | cnt |
+---------------------+---------------------+--------+-----+
| 1970-01-01T00:00:00 | 1970-01-01T00:00:20 | User_2 |   2 |
| 1970-01-01T00:00:00 | 1970-01-01T00:00:20 | User_5 |   1 |
| 1970-01-01T00:00:00 | 1970-01-01T00:00:40 | User_2 |   2 |
| 1970-01-01T00:00:00 | 1970-01-01T00:00:40 | User_5 |   1 |
| 1970-01-01T00:00:00 | 1970-01-01T00:01:00 | User_2 |   3 |
| 1970-01-01T00:00:00 | 1970-01-01T00:01:00 | User_5 |   2 |
| 1970-01-01T00:01:00 | 1970-01-01T00:01:40 | User_2 |   1 |
+---------------------+---------------------+--------+-----+
```

**NOTE:** The output results in this example are not fully deterministic as factors such as late arriving data can alter results.

The above example shows a sample of outputs for this aggregation query. Since this is a streaming query, more records will be outputted as data arrives in the source. Note that the `viewtime` column, which is being used as the timestamp for the `pageviews` source, represents milliseconds since epoch. Looking at the output records, we can see the `window_start` and `window_end` times match the `CUMULATE` window’s expected size, a max size of 1 minute with intermediate windows of 20 seconds and 40 seconds long. The `cnt` value for each `userid` corresponds with the number of input records with the same `userid` whose `viewtime` falls within their respective windows. Notice there are no output records with a `window_start=1970-01-01T00:01:00` and `window_end=1970-01-01T00:01:20`. This is because there are no records from the source whose `viewtime` value falls within that window and thus no data to output.

#### CUMULATE window with a GROUP BY and HAVING clause

The following aggregates the number of unique pages a particular `userid` visits in several cascading windows with the longest window being 10 minutes. With a `CUMULATE` window, since the `STEP` parameter is 5 seconds, a window is closed every 5 seconds. Then the query only emits results having a count of unique `pageid` values greater than 5.

```sql
SELECT 
  window_start, 
  window_end, 
  userid, 
  COUNT(pageid) AS cnt
FROM 
  CUMULATE(PAGEVIEWS, size 10 minutes, step 5 SECOND)
GROUP BY window_start, window_end, userid
HAVING COUNT(pageid) > 5;
```

### Session Window

#### Syntax

```sql
FROM SESSION (relation_name, [PARTITION BY column_name,] GAP time_unit)
   [WITH (source_property = value [, ...])]
```

A `SESSION` window groups records into dynamic, variable-length windows based on periods of activity. Unlike `HOP`, `TUMBLE`, or `CUMULATE` windows which have fixed sizes, a `SESSION` window closes when no new records arrive within the specified `GAP` duration. A `SESSION` window is defined by a single required parameter: `GAP`, which specifies the maximum inactivity period between consecutive records before the window closes. The optional `PARTITION BY` clause specifies one or more columns to partition the `SESSION` window by. When provided, `SESSION` windows are tracked independently for each distinct combination of the partition columns. For example, partitioning by `userid` means each user gets their own independent set of `SESSION` windows. A `SESSION` window does not support the `OFFSET` parameter. `SESSION` windows cannot be used on Changelog source relations. Additionally, JOIN is not supported on relations with `SESSION` windowing applied.

#### Example

#### SESSION window with GAP and PARTITION BY

Let’s use the `pageviews` [Database](/overview/core-concepts/databases.md#stream) that's created using this DDL:

```sql
CREATE STREAM pageviews (
  viewtime BIGINT,
  userid VARCHAR,
  pageid VARCHAR
) WITH (
  'topic' = 'pageviews'
);
```

with the following input records for this example:

```
+----------+--------+---------+
| viewtime | userid | pageid  |
+----------+--------+---------+
|     1000 | User_1 | Page_1  |
|     2000 | User_1 | Page_2  |
|     3000 | User_1 | Page_3  |
|     2000 | User_2 | Page_1  |
|     3000 | User_2 | Page_2  |
|    10000 | User_1 | Page_4  |
+----------+--------+---------+
```

When the following `SESSION` window query runs:

```sql
SELECT 
  userid,
  COUNT(pageid) AS pgcnt,
  window_start,
  window_end
FROM SESSION (pageviews, PARTITION BY userid, GAP 5 SECONDS)
WITH ('timestamp'='viewtime')
GROUP BY userid, window_start, window_end;
```

We get the following output:

```
+--------+-------+---------------------+---------------------+
| userid | pgcnt |    window_start     |     window_end      |
+--------+-------+---------------------+---------------------+
| User_1 |     3 | 1970-01-01T00:00:01 | 1970-01-01T00:00:08 |
| User_2 |     2 | 1970-01-01T00:00:02 | 1970-01-01T00:00:08 |
| User_1 |     1 | 1970-01-01T00:00:10 | 1970-01-01T00:00:15 |
+--------+-------+---------------------+---------------------+
```

The output results in this example are not fully deterministic as factors such as late arriving data can alter results. The above example shows a sample of outputs for this aggregation query. Since this is a streaming query, more records will be outputted as data arrives in the source. Note that the `viewtime` column, which is being used as the timestamp for the `pageviews` source, represents milliseconds since epoch. Looking at the output records, we can see that `User_1` has two separate sessions: the first session includes three page views (at timestamps 1000, 2000, and 3000) with a `window_start` of 00:00:01 and `window_end` of 00:00:08 (last event at 3s + 5s gap = 8s), while the second session contains a single page view at timestamp 10000 (which is more than 5 seconds after 3000, so it starts a new session). `User_2`'s two page views (at timestamps 2000 and 3000) fall within the same 5-second gap, resulting in a single session. Because `PARTITION BY userid` is specified, session windows are tracked independently per user. Without the `PARTITION BY` clause, all records would be grouped into sessions regardless of user.

#### SESSION window without PARTITION BY

The following query groups all page view records into sessions with a 10-second inactivity gap, regardless of which user generated the event:

```sql
SELECT 
  COUNT(pageid) AS pgcnt,
  window_start,
  window_end
FROM SESSION (pageviews, GAP 10 SECONDS)
WITH ('timestamp'='viewtime')
GROUP BY window_start, window_end;
```

## Sub-Queries

Sub-queries are a way for adding additional layer of processing to an original schema without storing it to a [Data Store](/overview/core-concepts/store.md) and before providing it to an outer query within the same SQL query.

Just like any other [Database](/overview/core-concepts/databases.md#relation) within the `FROM` clause, a sub-query may or may not have an alias, but its columns can only be referred to by a qualified name if an alias was provided for it. Additionally, the columns list of a sub-query must be unique, and the columns become available to the outer query as they were named or aliased just like any other Relation with a corresponding schema.

Since sub-queries are internal to a SQL query, they are neither persisted as a Relation nor their data is persisted in the Store in which the SQL was run on.

A sub-query has the following syntax and it can be used in a `FROM` clause within a `SELECT` statement:

```
SELECT ...
FROM ( sub-query );
```

The rest of this section shows various use cases of in which a sub-query can become useful.

### Transformation

One of the basic use cases of a sub-query is intermediary transformation or sanitization of data where this data is not needed outside the scope of the given SQL. This reduces cost and overhead for an organization for omiting the need for additional persisted Relations.

Here we use the `pageviews` stream to convert its `userid` and `pageid` to `INTEGER` for building a simple point system based on the page numbers that was visited by each user:

```sql
SELECT uid, SUM(pid)*10 AS points
FROM (
    SELECT
        viewtime,
        CAST(SUBSTRING(userid FROM 6) AS INTEGER) AS uid,
        CAST(SUBSTRING(pageid FROM 6) AS INTEGER) AS pid
    FROM pageviews
) GROUP BY uid;
```

### Structuring

Sub-queries are also useful when events need to be structured within the scope of a SQL query. The following example shows that a self-JOIN on top of an aggregation for counting the number of pages visited omits the need for a separate stream only for the grouping sub-query:

```sql
SELECT
    pg.userid AS uid,
    p.pageid AS pid,
    pg.pages AS num_pages
FROM
  pageviews p
JOIN
  (SELECT userid, SUM(pageid) AS pages FROM pageviews GROUP BY userid) pg
ON p.userid = pg.userid
WHERE pg.pages > 1;
```

### Denormalization

Using a sub-query, events can also be denormalized for the purpose of a SQL. The following query shows that with a sub-query `JOIN` we expanded the necessary user information for the outer `JOIN` to be able to create shipment metrics which describing the user involved in that shipment:

```sql
CREATE STREAM csas_mr_puo AS SELECT
    p_user.*,
    o.orderid AS orderid,
    o.shipmenttime AS shiptime
FROM (
    SELECT
        p.pageid,
        p.userid AS uid,
        u.contactinfo->city AS city,
        u.contactinfo->zipcode AS zipcode,
        p.viewtime AS viewtime
    FROM pageviews p
    JOIN "users" u
    WITHIN 10 SECONDS ON p.userid = u.userid
) p_user
JOIN
  orders o
WITHIN 10 SECONDS
ON p_user.city = o.address->city;
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.deltastream.io/reference/sql-syntax/query/select/from.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
