SELECT
Last updated
Last updated
The SELECT
statement is used to retrieve data from one or more Relations: and/or . See SELECT (FROM MATERIALIZED VIEW) for retrieving data from a .
See WITH (Common Table Expression) for more information about defining CTEs.
Each expression in this clause can refer to a column from one of the statements’ sources, a primitive value, or the result of a Function call. Each expression can also be supplied an alias that sets the column name for the sink Relation. For case-sensitive column names, the name must be wrapped in double quotes; otherwise, the lowercase name will be attempted, and the statement may not find the column.
If SELECT
is specified with DISTINCT
, duplicate rows are removed from the results of the query.
See FROM for more information.
Optionally, a source Relation’s records can be filtered according to supplied predicates in the WHERE
clause. Predicates are specified as a Boolean expression, which can use columns from source Relations.
Optionally, this clause specifies aggregate filter conditions. The HAVING
clause can only be used with GROUP BY
to filter out groups of records that do not satisfy a condition. Any given column reference in the HAVING
clause has to refer to a column from the GROUP BY
columns list, or it should be referred within an aggregate function.
Optionally, this clause specifies Query Properties.
See Query Name and Version and Resume Query on how to rebuild a new Query from a running one.
parallelism.max
This property provides a hint for the maximum units of compute resources to use in parallel to process data read from the source(s) and wrote into the sink within the scope of a query. All other operations between source and sink will use the available resources to maximize their processing power.
Required: No Default value: 1 Type: String Valid values: [1, ..., 10]
state.ttl.millis
This property sets the minimum amount of time before a query’s state is considered stale and can be expired. By default, this value is infinite, meaning that state is never considered stale and won’t be cleaned up. This property is particularly relevant for queries, including windowed joins/aggregations, because the amount of state in these queries can grow unboundedly and eventually fail due to resource exhaustion. See the example below for more details.
Required: No Default value: Infinite Type: String Valid values: [1, ...]
SELECT DISTINCT
removes duplicate rows from the query's results. A result row consists of value columns and row metadata (Check Row Metadata Functions). When using SELECT DISTINCT
, duplicate value columns are removed from results and "processing time" is used as result row's timestamp. Note that adding DISTINCT
to SELECT
makes the query stateful and the state size for the query could grow infinitely, depending on number of distinct rows in the result. The state.ttl.millis
query property can be used to set an expiration time for the entries in the query's state and cleaning it up (See Query Properties).
For more complex use cases of the GROUP BY
clause, the following extensions can be used to simplify these type of statements.
GROUPING SETS
This extension can be used to simplify the union of multiple statements with their own GROUP BY
clause. The following grouping statements:
can be simplified using the GROUPING SETS
extension to the GROUP BY
to a single statement as followed:
An empty subgroup means that all input rows are aggregate into a single group based on the aggregate_function(col1)
. This aggregate group returns even if no input rows were present, e.g. using COUNT(*) AS cnt
, a resulting row may be {"cnt":XXX, NULL, NULL}
.
ROLLUP
This extension is a shorthand for the GROUPING SETS
extension:
This can be rewritten as such using the GROUPING SETS
extension:
CUBE
This extension is an alias to the GROUPING SETS
extension as followed:
is rewritten as:
In the SELECT
clause, OVER
aggregation is used to compute an aggregated value (via calling an aggregation function) for each record over a range of records, ordered according to the Relation’s timestamp.
If PARTITION BY
is defined for an OVER
aggregation, the aggregate value is computed per record over the records in its partition.
The range_definition
is used to determine how many previous records should be used to compute the aggregate value for each given record. You can think of a range as an aggregation window, with lower and upper boundaries. Source records are ordered according to the ORDER BY
clause, and for each record, the aggregate window defines the range of records to be used for aggregation. The upper boundary is always the current record, and it is inclusive. Two types of range can be defined in OVER
aggregations:
A RANGE
interval uses a time interval over the expression used in the ORDER BY
clause (i.e. the timestamp column or rowtime()
to collect records for computing aggregation.
The SIZE
is a number that defines the length of the aggregation window according to the time_unit
value. The aggregate value is computed over all records that are within this window, according to the current record’s timestamp.
A ROWS
interval uses a count-based interval over the records ordered according to the ORDER BY
clause to collect records for computing aggregation.
OVER
aggregation is used in the SELECT
clause. A query can have multiple OVER
aggregation clauses in its SELECT
; however the range definition for all of them should be the same. This means that there cannot be a mix of range definitions, or partitioned and non-partitioned OVER
aggregations in a query. Moreover, the ORDER BY
clause for all of them should be the same.
For example, assume orders
is a Stream defined as:
The query below uses a range interval to define an OVER
aggregation for each order record to compute the sum()
of the orders’ total
for all the orders records, with the same category as the current record, which are placed at most 30 seconds before the current order.
The query below uses a rows interval to define an OVER
aggregation. This rows’ interval defines that current record and all records, with the same category as the current’s, among 10 preceding records should be included in the aggregate computation. Remember that upper boundary in the aggregate window is inclusive for an OVER
aggregation; hence, this query considers 11 records, including the current record and 10 preceding ones, when applying PARTITION BY
and computing the aggregate value.
The following query selects all the columns from the Users
Changelog.
The following query selects DISTINCT rows from the "Users" Changelog according to the values for userid
, interests[1]
and contactinfo->city
in each input row. This means there is no duplicate rows in the results of query.
The following is a simple projection query showing how to access data from more complex types. For arrays, you can access a particular field by indexing into the field with brackets syntax. For structs, you can use a similar bracket syntax or use arrows to access the inner data.
The following selects all the columns from the pageviews
Stream. It also sets source properties to treat the viewtime
column as the value for the timestamp, to start reading from the beginning of the source’s underlying Kafka topic, and to skip any records that fail deserialization.
The following selects all the columns from the pageviews
Stream. It also sets source properties to treat the viewtime
column as the value for the timestamp and to start reading from specific offsets in the source’s underlying Kafka topic. The offsets to start reading from are 1 in partition 0 and 3 in partition 1.
The following aggregates the number of visits a page receives from a particular userid
, then only emitting results having a count greater than 1.
The following aggregates the number of visits a page receives from a particular userid
. The query property state.ttl.millis
is also configured to 10000
, or 10 seconds. This means that state that has not been updated in the last 10 seconds is considered stale and can be cleaned up. Note that when state is not necessarily cleaned up immediately after it is considered stale.
Given this input for pageviews:
We can get the following output:
The pgcnt
associated with the User_2
outputs are both 1
in this case because of the state.ttl.millis
configuration of 10000
. Since there is a gap of more than 10000
milliseconds between the first and second User_2
input records according to the viewtime
which is marked as the event time for the records, the state for User_2
will be marked stale by the time the query consumes the second User_2
record. Thus, the state for User_2
may be cleaned up. It's also possible that the the second User_2
output will have a pgcnt
of 2, as the state may be stale but not yet cleaned up before the second User_2
record is consumed by the query. The pgcnt
for User_1
outputs are strictly increasing because its state is never marked stale.
Optionally, this clause allows a grouping of records according to their values for a given set of columns to compute one or more aggregations. The GROUP BY
clause in the statement defines a list of one or more columns (separated by commas) as grouping columns. Aggregate expressions are specified among SELECT
expressions. Currently, GROUP BY
and aggregation is supported on Streams. The result of a group aggregation is a whose PRIMARY KEY
is the list of GROUP BY
columns.
For the following examples, assume a pageviews
and a users
has been created by the following DDLs: