Comment on page
FROM
FROM relation_reference [[AS] alias]
[JOIN relation_reference [[AS] alias]]
[WITH (source_property = value [, ...])]
The
FROM
clause specifies the source to use in a SELECT statement. It can refer to a Relation, the result of a window function, or a JOIN
of two Relations (Stream/Stream or Stream/Changelog) that combines records based on a given join criteria. This clause can also be another SELECT for executing additional processing logic on the original source events before applying the rest of the operators/clauses on the processed events. See Sub-Queries examples below.The reference to a source in the following syntax:
relation_name
[MATCH_RECOGNIZE]
| window_function
| ( select_statement )
where,
relation_name
can be specified as fully or partially qualified names via specifying the database_name
and/or schema_name
in the format [<database_name>.<schema_name>.]<relation_name>
, e.g. db1.public.pageviews
. Otherwise, the current Database and Schema will be used to identify the Relation. For case-sensitive names, the name must be wrapped in double quotes; otherwise, the lowercase version of the name will be used.This optional clause specifies the Source Properties along with any Store specific ones documented below.
Property Name | Description |
---|---|
timestamp | This property specifies a column name in the source Relation that will be used for event-time processing. If this property is not provided, the timestamp associated with the Relation’s records will be used. The Relation’s timestamp column can be specified when the Relation is created; otherwise the record’s timestamp will be used as default. If the type of this timestamp field is BIGINT , it is expected that the values are epoch milliseconds UTC.
Required: No
Default value: Records’ timestamp
Type: String
Valid values: Must be of type BIGINT or TIMESTAMP . See Data Types. |
timestamp.format | Required: No
Default value: sql
Type: String
Valid values: sql , iso8601 |
source.allow.latency.millis | This property sets an upper bound, as a long value, on how late a record can be in milliseconds. Lateness is calculated based on the greatest chronological timestamp that has been read so far. A record that arrives with a timestamp ( t ) that is less than the difference between (a ) the greatest chronological timestamp seen thus far and (b ) the number of milliseconds specified by this property will be discarded (i.e. if t < a - b then the record with the timestamp t will be ignored because it is deemed too late). This property only applies for queries that are stateful and deal with windows.
Required: No
Default value: 10000
Type: String
Valid values: [1, ...] |
source.idle.timeout.millis | This property sets an upper bound, as a long value, on the length of time a source can remain without new events before marking this source as idle. In the case that one or more sources do not carry events while another source still has events, this can cause the query to stall as windows potentially created by stateful queries cannot be closed. A source marked as idle is not considered for the query determining if a window can be closed, and thus it won’t stall the query.
Required: No
Default: None
Type: String
Valid values: [1, ...]ms |
source.deserialization.error.handling | This property sets the strategy for how the query should handle invalid data when reading records in the Topic of the source Relation. For a detailed explanation with examples, see Configuring Deserialization Error Handling
Required: No
Default value: TERMINATE
Type: String
Valid values:
|
source.deserialization.error.log.topic | This property specifies the name of the topic to use for logging records that a query's source has failed to deserialize when that query's source has the property source.deserialization.error.handling set to IGNORE_AND_LOG . The format for the records in this error log will depend on the underlying store type of the source Relation. If the topic already exists then it will be used. If it does not exist, it will be created with whichever additional properties are relevant for creation, depending on the error log topic's store.
Required: No, unless source.deserialization.error.handling is IGNORE_AND_LOG
Default value: None
Type: String |
source.deserialization.error.log.store | This property specifies the name of the store associated with the source.deserialization.error.log.topic property.
Required: No
Default value: Store name of the source Relation
Type: String
Valid values: Store name belonging to a Kafka or Kinesis typed Store. |
Property Name | Description |
---|---|
starting.position | Required: No
Default value: Type: String
Valid values:
|
starting.position.offsets | This property specifies the partition-offset pairs from which the corresponding Kafka topic for the Relation will be read for this statement. This property must be provided when the starting.position property is set to offsets . The format for passing partition-offset values follows a ; separated list of partition:<partition_num>,offset:<offset_num> . For example, to start reading from the first offset of partition 0 and the third offset of partition 3, it’d be set to partition:0,offset:1;partition:1:offset:3 .
Required: No, unless starting.position is offsets Default value: None Type: String
Valid values: ; separated list of partition:<partition_num>,offset:<offset_num> . Both partition_num and offset_num must be in the range of [1, ...]. |
starting.position.timestamp.ms | This property specifies the timestamp from which the Kafka topic for the Relation will be read for this statement. The timestamp value is in epoch-milliseconds, which is measured as the number of milliseconds since January 1, 1970 00:00:00.000 GMT .
Required: No, unless starting.position is timestamp
Default value: None
Type: String
Valid values: [1, …] |
source.deserialization.error.log.topic.partitions | This property specifies the number of partitions the topic associated with the source.deserialization.error.log.topic property should have.
Required: No, unless source.deserialization.error.log.topic doesn't exist
Default value: None
Type: Integer
Valid values: [1, ...] |
source.deserialization.error.log.topic.replicas | This property specifies the number of replicas the topic associated with the source.deserialization.error.log.topic property should have.
Required: No, unless source.deserialization.error.log.topic doesn't exist
Default value: None
Type: Integer
Valid values: [1, ...] |
Property Name | Description |
---|---|
starting.position | Required: No
Default value: Type: String
Valid values:
|
starting.position.timestamp.ms | This property specifies the timestamp from which the Kinesis data stream for the Relation will be read for this statement. The timestamp value is in epoch-milliseconds, which is measured as the number of milliseconds since January 1, 1970 00:00:00.000 GMT .
Required: No, unless starting.position is timestamp
Default value: None
Type: String
Valid values: [1, …] |
source.deserialization.error.log.topic.shards | This property specifies the number of shards the stream associated with the source.deserialization.error.log.topic property should have.
Required: No, unless source.deserialization.error.log.topic doesn't exist
Default value: None
Type: Integer
Valid values: [1, ...] |
Using a window function, records are split into finite size batches. Each record can belong to one or multiple windows, depending on the window type. A window defines a time interval and has a start time and an end time. A record belongs to a window instance if its timestamp is in between the start time and end time of the window. By applying a window on a Relation, logically a new Relation is created that includes all columns of the original Relation as well as two additional columns named
window_start
and window_end
. These two columns can be used in the SELECT
and/or WHERE
clauses, similar to original Relation columns. Moreover, they can be added to the GROUP BY
clause to group records according to a time-based window. When defining any of the window types, OFFSET
is an optional parameter. If added, OFFSET
specifies the time offset, which the start of a window should be shifted by. Currently, three types of windows are supported.FROM HOP (relation_name, SIZE time_unit, ADVANCE BY time_unit)
[WITH (source_property = value [, ...])]
A hopping window models a fixed-size time interval and is defined by two parameters:
SIZE
, which defines a window’s time duration, and the ADVANCE BY
interval, which defines how much a window moves forward in time relative to the previous window instance. Depending on the ADVANCE BY
value, HOP
window instances could overlap with each other. Therefore, a given record could belong to one or more such window instances according to its timestamp. When using a HOP
window, the SIZE
must be an integral multiple of ADVANCE BY
.Let’s use the following input records for this example:
+----------+--------+---------+
| viewtime | userid | pageid |
+----------+--------+---------+
| 1000 | User_2 | Page_72 |
| 2000 | User_5 | Page_12 |
| 3000 | User_2 | Page_34 |
| 91000 | User_2 | Page_10 |
| 92000 | User_2 | Page_44 |
| 121000 | User_5 | Page_67 |
| 122000 | User_2 | Page_93 |
| 160000 | User_5 | Page_3 |
+----------+--------+---------+
When the following
SELECT
statement is run:SELECT
window_start,
window_end,
userid,
COUNT(pageid) AS pgcnt
FROM HOP (pageviews, SIZE 1 minute, ADVANCE BY 30 second)
WITH ('timestamp'='viewtime')
GROUP BY window_start, window_end, userid;
We get the following output:
+---------------------+---------------------+--------+-------+
| window_start | window_end | userid | pgcnt |
+---------------------+---------------------+--------+-------+
| 1970-01-01T00:00:00 | 1970-01-01T00:01:00 | User_2 | 2 |
| 1970-01-01T00:00:00 | 1970-01-01T00:01:00 | User_5 | 1 |
| 1970-01-01T00:01:00 | 1970-01-01T00:02:00 | User_2 | 2 |
| 1970-01-01T00:01:30 | 1970-01-01T00:02:30 | User_2 | 3 |
| 1970-01-01T00:01:30 | 1970-01-01T00:02:30 | User_5 | 1 |
+---------------------+---------------------+--------+-------+
NOTE: The output results in this example are not fully deterministic as factors such as late arriving data can alter the results.
The above example shows a sample of outputs for this aggregation query. Since this is a streaming query, more records will be outputted as data arrives in the source. Note that the
viewtime
column, which is being used as the timestamp for the pageviews
source, represents milliseconds since epoch. Looking at the output records, we can see the window_start
and window_end
times match the HOP
window’s 1 minute size, and the pgcnt
value for each userid
corresponds with the number of input records with the same userid
whose viewtime
falls within that window. Notice there are no output records with a window_start=1970-01-01T00:00:30
and window_end=1970-01-01T00:01:30
. This is because there are no records from the source whose viewtime
value falls within that window and thus no data to output.The following aggregates the number of unique pages a particular
userid
visits in a 2 minute window, then only emit results having a count greater than 1. With a HOP
window, since the ADVANCE BY
parameter is 30 seconds, a new 2 minute window is created every 30 seconds.SELECT
window_start,
window_end,
userid,
COUNT(pageid) AS pgcnt
FROM HOP(pageviews, size 2 minutes, advance by 30 seconds)
GROUP BY window_start, window_end, userid
HAVING COUNT(pageid) > 1;
FROM TUMBLE (relation_name, SIZE time_unit)
[WITH (source_property = value [, ...])]
A
TUMBLE
window shows a fixed-size time interval. It models non-overlapping instances of a window with no gaps. A TUMBLE
window is defined by a single parameter: SIZE
, which defines a window’s time duration. You can think of a TUMBLE
window function as a special case of a HOP
window function in which SIZE
and ADVANCE BY
values are equal. Each record belongs to only one instance of a TUMBLE
window according to its timestamp.CREATE STREAM pageviews (
viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR
) WITH (
'topic' = 'pageviews'
);
Let’s use the following input records for this example:
+----------+--------+---------+
| viewtime | userid | pageid |
+----------+--------+---------+
| 1000 | User_2 | Page_72 |
| 2000 | User_5 | Page_12 |
| 3000 | User_2 | Page_34 |
| 61000 | User_5 | Page_67 |
| 62000 | User_2 | Page_67 |
| 91000 | User_2 | Page_10 |
+----------+--------+---------+
When the following
SELECT
statement is run:SELECT
window_start,
window_end,
userid,
COUNT(*) AS cnt
FROM TUMBLE (pageviews, SIZE 30 seconds)
WITH ('timestamp'='viewtime')
GROUP BY window_start, window_end, userid;
We get the following output:
+---------------------+---------------------+--------+-----+
| window_start | window_end | userid | cnt |
+---------------------+---------------------+--------+-----+
| 1970-01-01T00:00:00 | 1970-01-01T00:00:30 | User_2 | 2 |
| 1970-01-01T00:00:00 | 1970-01-01T00:00:30 | User_5 | 1 |
| 1970-01-01T00:01:00 | 1970-01-01T00:01:30 | User_2 | 1 |
| 1970-01-01T00:01:00 | 1970-01-01T00:01:30 | User_5 | 1 |
+---------------------+---------------------+--------+-----+
NOTE: The output results in this example are not fully deterministic as factors such as late arriving data can alter results.
The above example shows a sample of outputs for this aggregation query. Since this is a streaming query, more records will be outputted as data arrives in the source. Note that the
viewtime
column, which is being used as the timestamp for the pageviews
source, represents milliseconds since epoch. Looking at the output records, we can see the window_start
and window_end
times match the TUMBLE
window’s 30 second size, and the cnt
value for each userid
corresponds with the number of input records with the same userid
whose viewtime
falls within that window. Notice there are no output records with a window_start=1970-01-01T00:00:30
and window_end=1970-01-01T00:01:00
. This is because there are no records from the source whose viewtime
value falls within that window and thus no data to output.The following aggregates the number of unique pages a particular userid visits in a 10 second window. With a
TUMBLE
window, a new 10 second window is opened at the end of the previous one, so no windows overlap.SELECT
window_start,
window_end,
userid,
COUNT(pageid) AS cnt
FROM TUMBLE (pageviews, SIZE 10 seconds)
WHERE userid != 'User_6'
GROUP BY window_start, window_end, userid;
The following aggregates the number of unique pages a particular userid visits in a 10 second window.. It also sets source properties to shorten the allow latency millis, indicating to the query to discard data arriving 1 second late. In this example, if the greatest event timestamp ingested by the query thus far is
10000ms
, then any events with a timestamp of less than 9000ms
will not be processed.SELECT
userid,
COUNT(pageid) AS pgcnt,
window_start,
window_end
FROM TUMBLE(pageviews, size 10 seconds)
WITH (
'source.allow.latency.millis' = '1000'
)
GROUP BY userid, window_start, window_end;
FROM CUMULATE (relation_name, SIZE time_unit, STEP time_unit)
[WITH (source_property = value [, ...])]
A
CUMULATE
window defines a fixed-size time interval where each window itself is split into fixed-size windows all with the same window_start
. A CUMULATE
window is defined by two parameters: SIZE
, which defines a window’s max time duration, and STEP
, which defines an increased window size between the end of sequential CUMULATE
windows within a given window. You can think of a CUMULATE
window function as a TUMBLE
window function, of width SIZE
, which itself is split into smaller windows with increasing size that all have the same start as window_start
. When defining a CUMULATE
window, SIZE
must be an integral multiple of STEP
.CREATE STREAM pageviews (
viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR
) WITH (
'topic' = 'pageviews'
);
with the following input records for this example:
+----------+--------+---------+
| viewtime | userid | pageid |
+----------+--------+---------+
| 1000 | User_2 | Page_72 |
| 2000 | User_5 | Page_12 |
| 3000 | User_2 | Page_34 |
| 51000 | User_5 | Page_67 |
| 52000 | User_2 | Page_67 |
| 91000 | User_2 | Page_10 |
| 101000 | User_2 | Page_88 |
+----------+--------+---------+
When the following
SELECT
statement is run:SELECT
window_start,
window_end,
userid,
COUNT(*) AS cnt
FROM CUMULATE (pageviews, SIZE 1 minutes, STEP 20 seconds)
WITH ('timestamp'='viewtime')
GROUP BY window_start, window_end, userid;
We get the following output:
+---------------------+---------------------+--------+-----+
| window_start | window_end | userid | cnt |
+---------------------+---------------------+--------+-----+
| 1970-01-01T00:00:00 | 1970-01-01T00:00:20 | User_2 | 2 |
| 1970-01-01T00:00:00 | 1970-01-01T00:00:20 | User_5 | 1 |
| 1970-01-01T00:00:00 | 1970-01-01T00:00:40 | User_2 | 2 |
| 1970-01-01T00:00:00 | 1970-01-01T00:00:40 | User_5 | 1 |
| 1970-01-01T00:00:00 | 1970-01-01T00:01:00 | User_2 | 3 |
| 1970-01-01T00:00:00 | 1970-01-01T00:01:00 | User_5 | 2 |
| 1970-01-01T00:01:00 | 1970-01-01T00:01:40 | User_2 | 1 |
+---------------------+---------------------+--------+-----+
NOTE: The output results in this example are not fully deterministic as factors such as late arriving data can alter results.
The above example shows a sample of outputs for this aggregation query. Since this is a streaming query, more records will be outputted as data arrives in the source. Note that the
viewtime
column, which is being used as the timestamp for the pageviews
source, represents milliseconds since epoch. Looking at the output records, we can see the window_start
and window_end
times match the CUMULATE
window’s expected size, a max size of 1 minute with intermediate windows of 20 seconds and 40 seconds long. The cnt
value for each userid
corresponds with the number of input records with the same userid
whose viewtime
falls within their respective windows. Notice there are no output records with a window_start=1970-01-01T00:01:00
and window_end=1970-01-01T00:01:20
. This is because there are no records from the source whose viewtime
value falls within that window and thus no data to output.The following aggregates the number of unique pages a particular
userid
visits in several cascading windows with the longest window being 10 minutes. With a CUMULATE
window, since the STEP
parameter is 5 seconds, a window is closed every 5 seconds. Then the query only emits results having a count of unique pageid
values greater than 5.SELECT
window_start,
window_end,
userid,
COUNT(pageid) AS cnt
FROM
CUMULATE(PAGEVIEWS, size 10 minutes, step 5 SECOND)
GROUP BY window_start, window_end, userid
HAVING COUNT(pageid) > 5;
Sub-queries are a way for adding additional layer of processing to an original schema without storing it to a Store and before providing it to an outer query within the same SQL query.
Just like any other Relation within the
FROM
clause, a sub-query may or may not have an alias, but its columns can only be referred to by a qualified name if an alias was provided for it. Additionally, the columns list of a sub-query must be unique, and the columns become available to the outer query as they were named or aliased just like any other Relation with a corresponding schema.Since sub-queries are internal to a SQL query, they are neither persisted as a Relation nor their data is persisted in the Store in which the SQL was run on.
A sub-query has the following syntax and it can be used in a
FROM
clause within a SELECT
statement:SELECT ...
FROM ( sub-query );
The rest of this section shows various use cases of in which a sub-query can become useful.
One of the basic use cases of a sub-query is intermediary transformation or sanitization of data where this data is not needed outside the scope of the given SQL. This reduces cost and overhead for an organization for omiting the need for additional persisted Relations.
Here we use the
pageviews
stream to convert its userid
and pageid
to INTEGER
for building a simple point system based on the page numbers that was visited by each user:SELECT uid, SUM(pid)*10 AS points
FROM (
SELECT
viewtime,
CAST(SUBSTRING(userid FROM 6) AS INTEGER) AS uid,
CAST(SUBSTRING(pageid FROM 6) AS INTEGER) AS pid
FROM pageviews
) GROUP BY uid;
Sub-queries are also useful when events need to be structured within the scope of a SQL query. The following example shows that a self-JOIN on top of an aggregation for counting the number of pages visited omits the need for a separate stream only for the grouping sub-query:
SELECT
pg.userid AS uid,
p.pageid AS pid,
pg.pages AS num_pages
FROM
pageviews p
JOIN
(SELECT userid, SUM(pageid) AS pages FROM pageviews GROUP BY userid) pg
ON p.userid = pg.userid
WHERE pg.pages > 1;
Using a sub-query, events can also be denormalized for the purpose of a SQL. The following query shows that with a sub-query
JOIN
we expanded the necessary user information for the outer JOIN
to be able to create shipment metrics which describing the user involved in that shipment:CREATE STREAM csas_mr_puo AS SELECT
p_user.*,
o.orderid AS orderid,
o.shipmenttime AS shiptime
FROM (
SELECT
p.pageid,
p.userid AS uid,
u.contactinfo->city AS city,
u.contactinfo->zipcode AS zipcode,
p.viewtime AS viewtime
FROM pageviews p
JOIN "users" u
WITHIN 10 SECONDS ON p.userid = u.userid
) p_user
JOIN
orders o
WITHIN 10 SECONDS
ON p_user.city = o.address->city;
Last modified 13d ago