Comment on page
SELECT
SELECT
expression AS alias [, ...]
FROM from_clause
[WHERE where_clause]
[GROUP BY [GROUPING SETS | ROLLUP | CUBE] group_by_clause]
[HAVING having_clause]
[QUERY WITH (query_property = value [, ...])];
The
SELECT
statement is used to retrieve data from one or more Relations: Stream and/or Changelog. See SELECT (FROM MATERIALIZED VIEW) for retrieving data from a Materialized View.Each expression in this clause can refer to a column from one of the statements’ sources, a primitive value, or the result of a Function call. Each expression can also be supplied an alias that sets the column name for the sink Relation. For case-sensitive column names, the name must be wrapped in double quotes; otherwise, the lowercase name will be attempted, and the statement may not find the column.
Optionally, a source Relation’s records can be filtered according to supplied predicates in the
WHERE
clause. Predicates are specified as a Boolean expression, which can use columns from source Relations.Optionally, this clause allows a grouping of records according to their values for a given set of columns to compute one or more aggregations. The
GROUP BY
clause in the statement defines a list of one or more columns (separated by commas) as grouping columns. Aggregate expressions are specified among SELECT
expressions. Currently, GROUP BY
and aggregation is supported on Streams. The result of a group aggregation is a Changelog whose PRIMARY KEY
is the list of GROUP BY
columns.Optionally, this clause specifies aggregate filter conditions. The
HAVING
clause can only be used with GROUP BY
to filter out groups of records that do not satisfy a condition. Any given column reference in the HAVING
clause has to refer to a column from the GROUP BY
columns list, or it should be referred within an aggregate function.Property Name | Description |
---|---|
parallelism.max | This property provides a hint for the maximum units of compute resources to use in parallel to process data read from the source(s) and wrote into the sink within the scope of a query. All other operations between source and sink will use the available resources to maximize their processing power.
Required: No
Default value: 1
Type: String
Valid values: [1, ..., 10] |
state.ttl.millis | This property sets the minimum amount of time before a query’s state is considered stale and can be expired. By default, this value is infinite, meaning that state is never considered stale and won’t be cleaned up. This property is particularly relevant for queries, including windowed joins/aggregations, because the amount of state in these queries can grow unboundedly and eventually fail due to resource exhaustion. See the example below for more details.
Required: No
Default value: Infinite
Type: String
Valid values: [1, ...] |
For more complex use cases of the
GROUP BY
clause, the following extensions can be used to simplify these type of statements.This extension can be used to simplify the union of multiple statements with their own
GROUP BY
clause. The following grouping statements:SELECT aggregate_function(col1), col2, col3
FROM relation
GROUP BY col2, col3;
SELECT aggregate_function(col1), col2, NULL
FROM relation
GROUP BY col2;
SELECT aggregate_function(col1), NULL, NULL
FROM relation;
can be simplified using the
GROUPING SETS
extension to the GROUP BY
to a single statement as followed:SELECT aggregate_function(col1) AS agg, col2, col3
FROM relation
GROUP BY GROUPING SETS ((col2, col3), col2, ());
An empty subgroup means that all input rows are aggregate into a single group based on the
aggregate_function(col1)
. This aggregate group returns even if no input rows were present, e.g. using COUNT(*) AS cnt
, a resulting row may be {"cnt":XXX, NULL, NULL}
.This extension is a shorthand for the
GROUPING SETS
extension:GROUP BY ROLLUP (c1, c2, c3)
This can be rewritten as such using the
GROUPING SETS
extension:GROUP BY GROUPING SETS (
(c1, c2, c3),
(c1, c2),
(c1)
()
)
This extension is an alias to the
GROUPING SETS
extension as followed:GROUP BY CUBE (c1, c2, c3)
is rewritten as:
GROUP BY GROUPING SETS (
( c1, c2, c3 ),
( c1, c2 ),
( c1, c3 ),
( c1 ),
( c2, c3 ),
( c2 ),
( c3 ),
( )
)
In the
SELECT
clause, OVER
aggregation is used to compute an aggregated value (via calling an aggregation function) for each record over a range of records, ordered according to the Relation’s timestamp.SELECT ...,
agg_function(col) OVER (
[PARTITION BY col1 [, col2, ...]]
ORDER BY [rowtime() | timestamp_col]
range_definition
) AS alias,
...
FROM ...
If
PARTITION BY
is defined for an OVER
aggregation, the aggregate value is computed per record over the records in its partition. The
range_definition
is used to determine how many previous records should be used to compute the aggregate value for each given record. You can think of a range as an aggregation window, with lower and upper boundaries. Source records are ordered according to the ORDER BY
clause, and for each record, the aggregate window defines the range of records to be used for aggregation. The upper boundary is always the current record, and it is inclusive. Two types of range can be defined in OVER
aggregations:A
RANGE
interval uses a time interval over the expression used in the ORDER BY
clause (i.e. the timestamp column or rowtime()
to collect records for computing aggregation. RANGE BETWEEN SIZE time_unit PRECEDING AND CURRENT ROW
The
SIZE
is a number that defines the length of the aggregation window according to the time_unit
value. The aggregate value is computed over all records that are within this window, according to the current record’s timestamp.A
ROWS
interval uses a count-based interval over the records ordered according to the ORDER BY
clause to collect records for computing aggregation.ROWS BETWEEN number PRECEDING AND CURRENT ROW
OVER
aggregation is used in the SELECT
clause. A query can have multiple OVER
aggregation clauses in its SELECT
; however the range definition for all of them should be the same. This means that there cannot be a mix of range definitions, or partitioned and non-partitioned OVER
aggregations in a query. Moreover, the ORDER BY
clause for all of them should be the same.For example, assume
orders
is a Stream defined as:CREATE STREAM orders (
orderid BIGINT,
category VARCHAR,
total INTEGER)
WITH ('topic'='orders', 'value.format'='JSON');
The query below uses a range interval to define an
OVER
aggregation for each order record to compute the sum()
of the orders’ total
for all the orders records, with the same category as the current record, which are placed at most 30 seconds before the current order. SELECT
orderid,
category,
sum(total) OVER(
PARTITION BY category
ORDER BY rowtime()
RANGE BETWEEN 30 SECONDS PRECEDING AND CURRENT ROW) AS sum_total
FROM orders;
The query below uses a rows interval to define an
OVER
aggregation. This rows’ interval defines that current record and all records, with the same category as the current’s, among 10 preceding records should be included in the aggregate computation. Remember that upper boundary in the aggregate window is inclusive for an OVER
aggregation; hence, this query considers 11 records, including the current record and 10 preceding ones, when applying PARTITION BY
and computing the aggregate value.SELECT
orderid,
category,
sum(total) OVER(
PARTITION BY category
ORDER BY rowtime()
ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sum_total
FROM orders;
CREATE STREAM pageviews (
viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR
) WITH (
'topic' = 'pageviews'
);
CREATE CHANGELOG "Users" (
"RegisterTime" BIGINT,
userid VARCHAR,
regionid VARCHAR,
interests ARRAY < VARCHAR >,
contactinfo STRUCT<phone VARCHAR, city VARCHAR, "State" VARCHAR, zipcode VARCHAR>,
PRIMARY KEY(userid)
) WITH (
'topic' = 'users',
'timestamp' = 'RegisterTime'
);
The following query selects all the columns from the
Users
Changelog.SELECT * FROM "Users";
The following is a simple projection query showing how to access data from more complex types. For arrays, you can access a particular field by indexing into the field with brackets syntax. For structs, you can use a similar bracket syntax or use arrows to access the inner data.
SELECT
"RegisterTime",
userid,
interests,
interests[1] AS top_interest,
contactinfo,
contactinfo->city AS location,
contactinfo['city'] AS location2,
contactinfo->"State" AS "state"
FROM "Users";
The following selects all the columns from the
pageviews
Stream. It also sets source properties to treat the viewtime
column as the value for the timestamp, to start reading from the beginning of the source’s underlying Kafka topic, and to skip any records that fail deserialization.SELECT
*
FROM pageviews
WITH (
'timestamp' = 'viewtime',
'starting.position' = 'earliest',
'source.deserialization.error.handling' = 'IGNORE'
);
The following selects all the columns from the
pageviews
Stream. It also sets source properties to treat the viewtime
column as the value for the timestamp and to start reading from specific offsets in the source’s underlying Kafka topic. The offsets to start reading from are 1 in partition 0 and 3 in partition 1.SELECT
*
FROM pageviews
WITH (
'timestamp' = 'viewtime',
'starting.position' = 'specific-offsets',
'starting.position.offsets' = 'partition:0,offset:1;partition:1:offset:3'
);
The following aggregates the number of visits a page receives from a particular
userid
, then only emitting results having a count greater than 1.SELECT
userid,
COUNT(pageid) AS pgcnt
FROM pageviews
GROUP BY userid
HAVING COUNT(pageid) > 1;
The following aggregates the number of visits a page receives from a particular
userid
. The query property state.ttl.millis
is also configured to 10000
, or 10 seconds. This means that state that has not been updated in the last 10 seconds is considered stale and can be cleaned up. Note that when state is not necessarily cleaned up immediately after it is considered stale. SELECT
userid,
COUNT(pageid) AS pgcnt
FROM pageviews
WITH ('timestamp'='viewtime')
GROUP BY userid
QUERY WITH ('state.ttl.millis' = '10000');
Given this input for pageviews:
+----------+--------+---------+
| viewtime | userid | pageid |
+----------+--------+---------+
| 1000 | User_1 | Page_21 |
| 2000 | User_2 | Page_10 |
| 3000 | User_1 | Page_10 |
| 10000 | User_1 | Page_91 |
| 20000 | User_1 | Page_10 |
| 80000 | User_2 | Page_55 |
+----------+--------+---------+
We can get the following output:
+--------+-------+
| userid | pgcnt |
+--------+-------+
| User_1 | 1 |
| User_2 | 1 |
| User_1 | 2 |
| User_1 | 3 |
| User_1 | 4 |
| User_2 | 1 | // pgcnt=2 is also possible for this output
+--------+-------+
The
pgcnt
associated with the User_2
outputs are both 1
in this case because of the state.ttl.millis
configuration of 10000
. Since there is a gap of more than 10000
milliseconds between the first and second User_2
input records according to the viewtime
which is marked as the event time for the records, the state for User_2
will be marked stale by the time the query consumes the second User_2
record. Thus, the state for User_2
may be cleaned up. It's also possible that the the second User_2
output will have a pgcnt
of 2, as the state may be stale but not yet cleaned up before the second User_2
record is consumed by the query. The pgcnt
for User_1
outputs are strictly increasing because its state is never marked stale.Last modified 12d ago