# SELECT

## Syntax <a href="#synopsis" id="synopsis"></a>

```sql
with_clause
SELECT
    [DISTINCT] expression AS alias [, ...]
FROM from_clause
[WHERE where_clause]
[GROUP BY [GROUPING SETS | ROLLUP | CUBE] group_by_clause]
[HAVING having_clause]
[QUERY WITH (query_property = value [, ...])];
```

## Description

The `SELECT` statement is used to retrieve data from one or more Relations: [#stream](https://docs.deltastream.io/overview/core-concepts/databases#stream "mention") and/or [#changelog](https://docs.deltastream.io/overview/core-concepts/databases#changelog "mention"). See [select-from-materialized-view](https://docs.deltastream.io/reference/sql-syntax/query/materialized-view/select-from-materialized-view "mention") for retrieving data from a [#materialized\_view](https://docs.deltastream.io/overview/core-concepts/databases#materialized_view "mention").

### Arguments <a href="#parameters" id="parameters"></a>

#### with\_clause

See [with-common-table-expression](https://docs.deltastream.io/reference/sql-syntax/query/select/with-common-table-expression "mention") for more information about defining CTEs.

#### SELECT expression AS alias \[, ...]

Each expression in this clause can refer to a column from one of the statements’ sources, a primitive value, or the result of a [functions](https://docs.deltastream.io/reference/sql-syntax/query/functions "mention") call. Each expression can also be supplied an alias that sets the column name for the sink Relation. For case-sensitive column names, the name must be wrapped in double quotes; otherwise, the lowercase name will be attempted, and the statement may not find the column.

#### DISTINCT

If `SELECT` is specified with `DISTINCT`, duplicate rows are removed from the results of the query.

#### FROM relation\_reference

See [from](https://docs.deltastream.io/reference/sql-syntax/query/select/from "mention") for more information.

#### WHERE where\_clause

Optionally, a source Relation’s records can be filtered according to supplied predicates in the `WHERE` clause. Predicates are specified as a Boolean expression, which can use columns from source Relations.

#### GROUP BY group\_by\_clause

Optionally, this clause allows a grouping of records according to their values for a given set of columns to compute one or more aggregations. The `GROUP BY` clause in the statement defines a list of one or more columns (separated by commas) as grouping columns. Aggregate expressions are specified among `SELECT` expressions. Currently, `GROUP BY` and aggregation is supported on Streams. The result of a group aggregation is a [#changelog](https://docs.deltastream.io/overview/core-concepts/databases#changelog "mention") whose `PRIMARY KEY` is the list of `GROUP BY` columns.

#### HAVING having\_clause

Optionally, this clause specifies aggregate filter conditions. The `HAVING` clause can only be used with `GROUP BY` to filter out groups of records that do not satisfy a condition. Any given column reference in the `HAVING` clause has to refer to a column from the `GROUP BY` columns list, or it should be referred within an aggregate function.

#### QUERY WITH (query\_property = value \[, …​])

Optionally, this clause specifies [#query\_properties](#query_properties "mention").

See [named-queries](https://docs.deltastream.io/reference/sql-syntax/query/named-queries "mention") and [resume-query](https://docs.deltastream.io/reference/sql-syntax/query/resume-query "mention") on how to rebuild a new Query from a running one.

### Query Properties <a href="#query_properties" id="query_properties"></a>

| Property Name      | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
| ------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `parallelism.max`  | <p>This property provides a hint for the maximum units of compute resources to use in parallel to process data read from the source(s) and wrote into the sink within the scope of a query. All other operations between source and sink will use the available resources to maximize their processing power.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> 1<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ..., 10]</p>                                                                                                                                                                                                  |
| `state.ttl.millis` | <p>This property sets the minimum amount of time before a query’s state is considered stale and can be expired. By default, this value is infinite, meaning that state is never considered stale and won’t be cleaned up. This property is particularly relevant for queries, including windowed joins/aggregations, because the amount of state in these queries can grow unboundedly and eventually fail due to resource exhaustion. See <a href="#_having_clause-1">the example below</a> for more details.<br></p><p><strong>Required:</strong> No<br><strong>Default value:</strong> Infinite<br><strong>Type:</strong> Long<br><strong>Valid values:</strong> \[1, ...]</p> |

## DISTINCT

`SELECT DISTINCT` removes duplicate rows from the query's results. A result row consists of value columns and row metadata (Check [row-functions](https://docs.deltastream.io/reference/sql-syntax/query/functions/row-functions "mention")). When using `SELECT DISTINCT`, duplicate value columns are removed from results and "processing time" is used as result row's timestamp. Note that adding `DISTINCT` to `SELECT` makes the query stateful and the state size for the query could grow infinitely, depending on number of distinct rows in the result. The `state.ttl.millis` query property can be used to set an expiration time for the entries in the query's state and cleaning it up (See [#query\_properties](#query_properties "mention")).

## Group Aggregation

For more complex use cases of the `GROUP BY` clause, the following extensions can be used to simplify these type of statements.

### `GROUPING SETS`

This extension can be used to simplify the union of multiple statements with their own `GROUP BY` clause. The following grouping statements:

```sql
SELECT aggregate_function(col1), col2, col3
FROM relation
GROUP BY col2, col3;

SELECT aggregate_function(col1), col2, NULL
FROM relation
GROUP BY col2;

SELECT aggregate_function(col1), NULL, NULL
FROM relation;
```

can be simplified using the `GROUPING SETS` extension to the `GROUP BY` to a single statement as followed:

```sql
SELECT aggregate_function(col1) AS agg, col2, col3
FROM relation
GROUP BY GROUPING SETS ((col2, col3), col2, ());
```

An empty subgroup means that all input rows are aggregate into a single group based on the `aggregate_function(col1)`. This aggregate group returns even if no input rows were present, e.g. using `COUNT(*) AS cnt`, a resulting row may be `{"cnt":XXX, NULL, NULL}`.

### `ROLLUP`

This extension is a shorthand for the `GROUPING SETS` extension:

```sql
GROUP BY ROLLUP (c1, c2, c3)
```

This can be rewritten as such using the `GROUPING SETS` extension:

```sql
GROUP BY GROUPING SETS (
  (c1, c2, c3),
  (c1, c2),
  (c1)
  ()
)
```

### `CUBE`

This extension is an alias to the `GROUPING SETS` extension as followed:

```sql
GROUP BY CUBE (c1, c2, c3)
```

is rewritten as:

```sql
GROUP BY GROUPING SETS (
    ( c1, c2, c3 ),
    ( c1, c2     ),
    ( c1,     c3 ),
    ( c1         ),
    (     c2, c3 ),
    (     c2     ),
    (         c3 ),
    (            )
)
```

## OVER Aggregation

In the `SELECT` clause, `OVER` aggregation is used to compute an aggregated value (via calling an aggregation function) for each record over a range of records, ordered according to the Relation’s timestamp.

```sql
SELECT ...,
agg_function(col) OVER (
[PARTITION BY col1 [, col2, ...]]
ORDER BY [rowtime() | timestamp_col]
range_definition
) AS alias,
...
FROM ...
```

If `PARTITION BY` is defined for an `OVER` aggregation, the aggregate value is computed per record over the records in its partition.

The `range_definition` is used to determine how many previous records should be used to compute the aggregate value for each given record. You can think of a range as an aggregation window, with lower and upper boundaries. Source records are ordered according to the `ORDER BY` clause, and for each record, the aggregate window defines the range of records to be used for aggregation. The upper boundary is always the current record, and it is inclusive. Two types of range can be defined in `OVER` aggregations:

### RANGE interval

A `RANGE` interval uses a time interval over the expression used in the `ORDER BY` clause (i.e. the timestamp column or `rowtime()` to collect records for computing aggregation.

```sql
RANGE BETWEEN SIZE time_unit PRECEDING AND CURRENT ROW
```

The `SIZE` is a number that defines the length of the aggregation window according to the `time_unit` value. The aggregate value is computed over all records that are within this window, according to the current record’s timestamp.

### ROWS interval

A `ROWS` interval uses a count-based interval over the records ordered according to the `ORDER BY` clause to collect records for computing aggregation.

```sql
ROWS BETWEEN number PRECEDING AND CURRENT ROW
```

### OVER Aggregation in a Query

`OVER` aggregation is used in the `SELECT` clause. A query can have multiple `OVER` aggregation clauses in its `SELECT`; however the range definition for all of them should be the same. This means that there cannot be a mix of range definitions, or partitioned and non-partitioned `OVER` aggregations in a query. Moreover, the `ORDER BY` clause for all of them should be the same.

For example, assume `orders` is a Stream defined as:

```sql
CREATE STREAM orders (
    orderid BIGINT,
    category VARCHAR,
    total INTEGER)
WITH ('topic'='orders', 'value.format'='JSON');
```

The query below uses a range interval to define an `OVER` aggregation for each order record to compute the `sum()` of the orders’ `total` for all the orders records, with the same category as the current record, which are placed at most 30 seconds before the current order.

```sql
SELECT
 orderid,
 category, 
 sum(total) OVER(
  PARTITION BY category
  ORDER BY rowtime()
  RANGE BETWEEN 30 SECONDS PRECEDING AND CURRENT ROW) AS sum_total
FROM orders;
```

The query below uses a rows interval to define an `OVER` aggregation. This rows’ interval defines that current record and all records, with the same category as the current’s, among 10 preceding records should be included in the aggregate computation. Remember that upper boundary in the aggregate window is inclusive for an `OVER` aggregation; hence, this query considers 11 records, including the current record and 10 preceding ones, when applying `PARTITION BY` and computing the aggregate value.

```sql
SELECT
 orderid,
 category,
 sum(total) OVER(
   PARTITION BY category
   ORDER BY rowtime()
   ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sum_total
FROM orders;
```

## Examples

For the following examples, assume a [#stream](https://docs.deltastream.io/overview/core-concepts/databases#stream "mention") `pageviews` and a [#changelog](https://docs.deltastream.io/overview/core-concepts/databases#changelog "mention") `users` has been created by the following DDLs:

```sql
CREATE STREAM pageviews (
  viewtime BIGINT,
  userid VARCHAR,
  pageid VARCHAR
) WITH (
  'topic' = 'pageviews'
);
```

```sql
CREATE CHANGELOG "Users" (
  "RegisterTime" BIGINT, 
  userid VARCHAR, 
  regionid VARCHAR, 
  interests ARRAY < VARCHAR >,
  contactinfo STRUCT<phone VARCHAR, city VARCHAR, "State" VARCHAR, zipcode VARCHAR>,
  PRIMARY KEY(userid)
) WITH (
  'topic' = 'users',
  'timestamp' = 'RegisterTime'
);
```

#### Select all

The following query selects all the columns from the `Users` Changelog.

```sql
SELECT * FROM "Users";
```

#### Select DISTINCT

The following query selects DISTINCT rows from the "Users" Changelog according to the values for `userid`, `interests[1]` and `contactinfo->city` in each input row. This means there is no duplicate rows in the results of query.

```sql
SELECT DISTINCT 
  userid,
  interests[1] AS hobby,
  contactinfo->city AS location
FROM "Users";
```

#### Select from complex types

The following is a simple projection query showing how to access data from more complex types. For arrays, you can access a particular field by indexing into the field with brackets syntax. For structs, you can use a similar bracket syntax or use arrows to access the inner data.

```sql
SELECT
  "RegisterTime",
  userid,
  interests,
  interests[1] AS top_interest,
  contactinfo,
  contactinfo->city AS location,
  contactinfo['city'] AS location2,
  contactinfo->"State" AS "state"
FROM "Users";
```

#### Select all with source properties

The following selects all the columns from the `pageviews` Stream. It also sets source properties to treat the `viewtime` column as the value for the timestamp, to start reading from the beginning of the source’s underlying Kafka topic, and to skip any records that fail deserialization.

```sql
SELECT
  * 
FROM pageviews
WITH (
  'timestamp' = 'viewtime',
  'starting.position' = 'earliest',
  'source.deserialization.error.handling' = 'IGNORE'
);
```

#### Select all from specific Kafka offsets

The following selects all the columns from the `pageviews` Stream. It also sets source properties to treat the `viewtime` column as the value for the timestamp and to start reading from specific offsets in the source’s underlying Kafka topic. The offsets to start reading from are 1 in partition 0 and 3 in partition 1.

```sql
SELECT
  * 
FROM pageviews 
WITH (
    'timestamp' = 'viewtime',
    'starting.position' = 'specific-offsets',
    'starting.position.offsets' = 'partition:0,offset:1;partition:1:offset:3'
);
```

#### Aggregation with GROUP BY and HAVING clauses <a href="#having_clause" id="having_clause"></a>

The following aggregates the number of visits a page receives from a particular `userid`, then only emitting results having a count greater than 1.

```sql
SELECT 
  userid, 
  COUNT(pageid) AS pgcnt 
FROM pageviews 
GROUP BY userid 
HAVING COUNT(pageid) > 1;
```

#### Aggregation with state TTL query property <a href="#having_clause" id="having_clause"></a>

The following aggregates the number of visits a page receives from a particular `userid`. The query property `state.ttl.millis` is also configured to `10000`, or 10 seconds. This means that state that has not been updated in the last 10 seconds is considered stale and can be cleaned up. Note that when state is not necessarily cleaned up immediately after it is considered stale.

```sql
SELECT 
  userid, 
  COUNT(pageid) AS pgcnt 
FROM pageviews 
WITH ('timestamp'='viewtime')
GROUP BY userid 
QUERY WITH ('state.ttl.millis' = '10000');
```

Given this input for pageviews:

```
+----------+--------+---------+
| viewtime | userid | pageid  |
+----------+--------+---------+
|     1000 | User_1 | Page_21 |
|     2000 | User_2 | Page_10 |
|     3000 | User_1 | Page_10 |
|    10000 | User_1 | Page_91 |
|    20000 | User_1 | Page_10 |
|    80000 | User_2 | Page_55 |
+----------+--------+---------+
```

We can get the following output:

```
+--------+-------+
| userid | pgcnt |
+--------+-------+
| User_1 |   1   |
| User_2 |   1   |
| User_1 |   2   |
| User_1 |   3   |
| User_1 |   4   |
| User_2 |   1   | // pgcnt=2 is also possible for this output
+--------+-------+
```

The `pgcnt` associated with the `User_2` outputs are both `1` in this case because of the `state.ttl.millis` configuration of `10000`. Since there is a gap of more than `10000` milliseconds between the first and second `User_2` input records according to the `viewtime` which is marked as the event time for the records, the state for `User_2` will be marked stale by the time the query consumes the second `User_2` record. Thus, the state for `User_2` may be cleaned up. It's also possible that the the second `User_2` output will have a `pgcnt` of 2, as the state may be stale but not yet cleaned up before the second `User_2` record is consumed by the query. The `pgcnt` for `User_1` outputs are strictly increasing because its state is never marked stale.
