[DISTINCT] expression AS alias [, ...]
FROM from_clause
[WHERE where_clause]
[GROUP BY [GROUPING SETS | ROLLUP | CUBE] group_by_clause]
[HAVING having_clause]
[QUERY WITH (query_property = value [, ...])];


The SELECT statement is used to retrieve data from one or more Relations: #_stream and/or #_changelog. See SELECT (FROM MATERIALIZED VIEW) for retrieving data from a #_materialized_view.



See WITH (Common Table Expression) for more information about defining CTEs.

SELECT expression AS alias [, ...]

Each expression in this clause can refer to a column from one of the statements’ sources, a primitive value, or the result of a Function call. Each expression can also be supplied an alias that sets the column name for the sink Relation. For case-sensitive column names, the name must be wrapped in double quotes; otherwise, the lowercase name will be attempted, and the statement may not find the column.


If SELECT is specified with DISTINCT, duplicate rows are removed from the results of the query.

FROM relation_reference

See FROM for more information.

WHERE where_clause

Optionally, a source Relation’s records can be filtered according to supplied predicates in the WHERE clause. Predicates are specified as a Boolean expression, which can use columns from source Relations.

GROUP BY group_by_clause

Optionally, this clause allows a grouping of records according to their values for a given set of columns to compute one or more aggregations. The GROUP BY clause in the statement defines a list of one or more columns (separated by commas) as grouping columns. Aggregate expressions are specified among SELECT expressions. Currently, GROUP BY and aggregation is supported on Streams. The result of a group aggregation is a #_changelog whose PRIMARY KEY is the list of GROUP BY columns.

HAVING having_clause

Optionally, this clause specifies aggregate filter conditions. The HAVING clause can only be used with GROUP BY to filter out groups of records that do not satisfy a condition. Any given column reference in the HAVING clause has to refer to a column from the GROUP BY columns list, or it should be referred within an aggregate function.

QUERY WITH (query_property = value [, …​])

Optionally, this clause specifies #_query_properties.

Query Properties

Property NameDescription


This property provides a hint for the maximum units of compute resources to use in parallel to process data read from the source(s) and wrote into the sink within the scope of a query. All other operations between source and sink will use the available resources to maximize their processing power.

Required: No Default value: 1 Type: String Valid values: [1, ..., 10]


This property sets the minimum amount of time before a query’s state is considered stale and can be expired. By default, this value is infinite, meaning that state is never considered stale and won’t be cleaned up. This property is particularly relevant for queries, including windowed joins/aggregations, because the amount of state in these queries can grow unboundedly and eventually fail due to resource exhaustion. See the example below for more details.

Required: No Default value: Infinite Type: String Valid values: [1, ...]


SELECT DISTINCT removes duplicate rows from the query's results. A result row consists of value columns and row metadata (Check Row Metadata Functions). When using SELECT DISTINCT, duplicate value columns are removed from results and "processing time" is used as result row's timestamp. Note that adding DISTINCT to SELECT makes the query stateful and the state size for the query could grow infinitely, depending on number of distinct rows in the result. The state.ttl.millis query property can be used to set an expiration time for the entries in the query's state and cleaning it up (See Query Properties).

Group Aggregation

For more complex use cases of the GROUP BY clause, the following extensions can be used to simplify these type of statements.


This extension can be used to simplify the union of multiple statements with their own GROUP BY clause. The following grouping statements:

SELECT aggregate_function(col1), col2, col3
FROM relation
GROUP BY col2, col3;

SELECT aggregate_function(col1), col2, NULL
FROM relation
GROUP BY col2;

SELECT aggregate_function(col1), NULL, NULL
FROM relation;

can be simplified using the GROUPING SETS extension to the GROUP BY to a single statement as followed:

SELECT aggregate_function(col1) AS agg, col2, col3
FROM relation
GROUP BY GROUPING SETS ((col2, col3), col2, ());

An empty subgroup means that all input rows are aggregate into a single group based on the aggregate_function(col1). This aggregate group returns even if no input rows were present, e.g. using COUNT(*) AS cnt, a resulting row may be {"cnt":XXX, NULL, NULL}.


This extension is a shorthand for the GROUPING SETS extension:

GROUP BY ROLLUP (c1, c2, c3)

This can be rewritten as such using the GROUPING SETS extension:

  (c1, c2, c3),
  (c1, c2),


This extension is an alias to the GROUPING SETS extension as followed:

GROUP BY CUBE (c1, c2, c3)

is rewritten as:

    ( c1, c2, c3 ),
    ( c1, c2     ),
    ( c1,     c3 ),
    ( c1         ),
    (     c2, c3 ),
    (     c2     ),
    (         c3 ),
    (            )

OVER Aggregation

In the SELECT clause, OVER aggregation is used to compute an aggregated value (via calling an aggregation function) for each record over a range of records, ordered according to the Relation’s timestamp.

agg_function(col) OVER (
[PARTITION BY col1 [, col2, ...]]
ORDER BY [rowtime() | timestamp_col]
) AS alias,
FROM ...

If PARTITION BY is defined for an OVER aggregation, the aggregate value is computed per record over the records in its partition.

The range_definition is used to determine how many previous records should be used to compute the aggregate value for each given record. You can think of a range as an aggregation window, with lower and upper boundaries. Source records are ordered according to the ORDER BY clause, and for each record, the aggregate window defines the range of records to be used for aggregation. The upper boundary is always the current record, and it is inclusive. Two types of range can be defined in OVER aggregations:

RANGE interval

A RANGE interval uses a time interval over the expression used in the ORDER BY clause (i.e. the timestamp column or rowtime() to collect records for computing aggregation.


The SIZE is a number that defines the length of the aggregation window according to the time_unit value. The aggregate value is computed over all records that are within this window, according to the current record’s timestamp.

ROWS interval

A ROWS interval uses a count-based interval over the records ordered according to the ORDER BY clause to collect records for computing aggregation.


OVER Aggregation in a Query

OVER aggregation is used in the SELECT clause. A query can have multiple OVER aggregation clauses in its SELECT; however the range definition for all of them should be the same. This means that there cannot be a mix of range definitions, or partitioned and non-partitioned OVER aggregations in a query. Moreover, the ORDER BY clause for all of them should be the same.

For example, assume orders is a Stream defined as:

    orderid BIGINT,
    category VARCHAR,
    total INTEGER)
WITH ('topic'='orders', 'value.format'='JSON');

The query below uses a range interval to define an OVER aggregation for each order record to compute the sum() of the orders’ total for all the orders records, with the same category as the current record, which are placed at most 30 seconds before the current order.

 sum(total) OVER(
  PARTITION BY category
  ORDER BY rowtime()
FROM orders;

The query below uses a rows interval to define an OVER aggregation. This rows’ interval defines that current record and all records, with the same category as the current’s, among 10 preceding records should be included in the aggregate computation. Remember that upper boundary in the aggregate window is inclusive for an OVER aggregation; hence, this query considers 11 records, including the current record and 10 preceding ones, when applying PARTITION BY and computing the aggregate value.

 sum(total) OVER(
   PARTITION BY category
   ORDER BY rowtime()
FROM orders;


For the following examples, assume a #_stream pageviews and a #_changelog users has been created by the following DDLs:

CREATE STREAM pageviews (
  viewtime BIGINT,
  userid VARCHAR,
  pageid VARCHAR
) WITH (
  'topic' = 'pageviews'
  "RegisterTime" BIGINT, 
  userid VARCHAR, 
  regionid VARCHAR, 
  interests ARRAY < VARCHAR >,
  contactinfo STRUCT<phone VARCHAR, city VARCHAR, "State" VARCHAR, zipcode VARCHAR>,
  PRIMARY KEY(userid)
) WITH (
  'topic' = 'users',
  'timestamp' = 'RegisterTime'

Select all

The following query selects all the columns from the Users Changelog.

SELECT * FROM "Users";


The following query selects DISTINCT rows from the "Users" Changelog according to the values for userid, interests[1] and contactinfo->city in each input row. This means there is no duplicate rows in the results of query.

  interests[1] AS hobby,
  contactinfo->city AS location
FROM "Users";

Select from complex types

The following is a simple projection query showing how to access data from more complex types. For arrays, you can access a particular field by indexing into the field with brackets syntax. For structs, you can use a similar bracket syntax or use arrows to access the inner data.

  interests[1] AS top_interest,
  contactinfo->city AS location,
  contactinfo['city'] AS location2,
  contactinfo->"State" AS "state"
FROM "Users";

Select all with source properties

The following selects all the columns from the pageviews Stream. It also sets source properties to treat the viewtime column as the value for the timestamp, to start reading from the beginning of the source’s underlying Kafka topic, and to skip any records that fail deserialization.

FROM pageviews
  'timestamp' = 'viewtime',
  'starting.position' = 'earliest',
  'source.deserialization.error.handling' = 'IGNORE'

Select all from specific Kafka offsets

The following selects all the columns from the pageviews Stream. It also sets source properties to treat the viewtime column as the value for the timestamp and to start reading from specific offsets in the source’s underlying Kafka topic. The offsets to start reading from are 1 in partition 0 and 3 in partition 1.

FROM pageviews 
    'timestamp' = 'viewtime',
    'starting.position' = 'specific-offsets',
    'starting.position.offsets' = 'partition:0,offset:1;partition:1:offset:3'

Aggregation with GROUP BY and HAVING clauses

The following aggregates the number of visits a page receives from a particular userid, then only emitting results having a count greater than 1.

  COUNT(pageid) AS pgcnt 
FROM pageviews 
GROUP BY userid 
HAVING COUNT(pageid) > 1;

Aggregation with state TTL query property

The following aggregates the number of visits a page receives from a particular userid. The query property state.ttl.millis is also configured to 10000, or 10 seconds. This means that state that has not been updated in the last 10 seconds is considered stale and can be cleaned up. Note that when state is not necessarily cleaned up immediately after it is considered stale.

  COUNT(pageid) AS pgcnt 
FROM pageviews 
WITH ('timestamp'='viewtime')
GROUP BY userid 
QUERY WITH ('state.ttl.millis' = '10000');

Given this input for pageviews:

| viewtime | userid | pageid  |
|     1000 | User_1 | Page_21 |
|     2000 | User_2 | Page_10 |
|     3000 | User_1 | Page_10 |
|    10000 | User_1 | Page_91 |
|    20000 | User_1 | Page_10 |
|    80000 | User_2 | Page_55 |

We can get the following output:

| userid | pgcnt |
| User_1 |   1   |
| User_2 |   1   |
| User_1 |   2   |
| User_1 |   3   |
| User_1 |   4   |
| User_2 |   1   | // pgcnt=2 is also possible for this output

The pgcnt associated with the User_2 outputs are both 1 in this case because of the state.ttl.millis configuration of 10000. Since there is a gap of more than 10000 milliseconds between the first and second User_2 input records according to the viewtime which is marked as the event time for the records, the state for User_2 will be marked stale by the time the query consumes the second User_2 record. Thus, the state for User_2 may be cleaned up. It's also possible that the the second User_2 output will have a pgcnt of 2, as the state may be stale but not yet cleaned up before the second User_2 record is consumed by the query. The pgcnt for User_1 outputs are strictly increasing because its state is never marked stale.

Last updated