FROM
Syntax
Description
The FROM
clause specifies the source to use in a SELECT statement. It can refer to a Relation, the result of a window function, or a JOIN
of two Relations (Stream/Stream or Stream/Changelog) that combines records based on a given join criteria. This clause can also be another SELECT for executing additional processing logic on the original source events before applying the rest of the operators/clauses on the processed events. See Sub-Queries examples below.
See Window Function below for more information about window functions.
See JOIN for more information about joining two Relations as sources in a statement.
Arguments
relation_reference
The reference to a source in the following syntax:
where, relation_name
can be specified as fully or partially qualified names via specifying the database_name
and/or schema_name
in the format [<database_name>.<schema_name>.]<relation_name>
, e.g. db1.public.pageviews
. Otherwise, the current Database and Schema will be used to identify the Relation. For case-sensitive names, the name must be wrapped in double quotes; otherwise, the lowercase version of the name will be used.
See also MATCH_RECOGNIZE.
WITH (source_property = value [, …])
This optional clause specifies the Source Properties along with any Store specific ones documented below.
Source Properties
Property Name | Description |
---|---|
| This property specifies a column name in the source Relation that will be used for event-time processing. If this property is not provided, the Required: No
Default value: Records’ timestamp
Type: String
Valid values: Must be of type |
| The format to use for Required: No
Default value: |
| This property sets an upper bound, as a long value, on how late a record can be in milliseconds. Lateness is calculated based on the greatest chronological timestamp that has been read so far. A record that arrives with a timestamp ( Required: No Default value: 10000 Type: Long Valid values: [1, ...] |
| This property sets an upper bound, as a long value, on the length of time a source can remain without new events before marking this source as idle. In the case that one or more sources do not carry events while another source still has events, this can cause the query to stall as windows potentially created by stateful queries cannot be closed. A source marked as idle is not considered for the query determining if a window can be closed, and thus it won’t stall the query. Required: No Default: None Type: Long Valid values: [1, ...] |
| Required: No
Default value:
|
| This property specifies the name of the topic to use for logging records that a query's source has failed to deserialize when that query's source has the property Required: No, unless |
| This property specifies the name of the store associated with the Required: No Default value: Store name of the source Relation Type: String Valid values: Store name belonging to a Kafka or Kinesis typed Store. |
Kafka Specific Source Properties
Property Name | Description |
---|---|
| Required: No Default value: Type: String Valid values:
|
| This property specifies the partition-offset pairs from which the corresponding Kafka topic for the #_relation will be read for this statement. This property must be provided when the Required: No, unless Default value: None Type: String
Valid values: |
| This property specifies the timestamp from which the Kafka topic for the #_relation will be read for this statement. The timestamp value is in epoch-milliseconds, which is measured as the number of milliseconds since Required: No, unless |
| This property specifies the number of partitions the topic associated with the Required: No, unless |
| This property specifies the number of replicas the topic associated with the Required: No, unless |
| This property specifies the milliseconds interval to discover new partitions in the Kafka topic. By default this property is set to 10000, meaning every 10 seconds the query will look for new partitions to consume from. If disabled, the query will not consume from partitions added while the query is running. Required: No Default value: 10000 Type: Long Valid values: [1, ...] or -1 to disable |
Kinesis Specific Source Properties
Property Name | Description |
---|---|
| Required: No Default value: Type: String Valid values:
|
| This property specifies the timestamp from which the Kinesis data stream for the #_relation will be read for this statement. The timestamp value is in epoch-milliseconds, which is measured as the number of milliseconds since Required: No, unless |
| This property specifies the number of shards the stream associated with the Required: No, unless |
Window Function
Using a window function, records are split into finite size batches. Each record can belong to one or multiple windows, depending on the window type. A window defines a time interval and has a start time and an end time. A record belongs to a window instance if its timestamp is in between the start time and end time of the window. By applying a window on a Relation, logically a new Relation is created that includes all columns of the original Relation as well as two additional columns named window_start
and window_end
. These two columns can be used in the SELECT
and/or WHERE
clauses, similar to original Relation columns. Moreover, they can be added to the GROUP BY
clause to group records according to a time-based window. When defining any of the window types, OFFSET
is an optional parameter. If added, OFFSET
specifies the time offset, which the start of a window should be shifted by. Currently, three types of windows are supported.
HOP Window
Syntax
A hopping window models a fixed-size time interval and is defined by two parameters: SIZE
, which defines a window’s time duration, and the ADVANCE BY
interval, which defines how much a window moves forward in time relative to the previous window instance. Depending on the ADVANCE BY
value, HOP
window instances could overlap with each other. Therefore, a given record could belong to one or more such window instances according to its timestamp. When using a HOP
window, the SIZE
must be an integral multiple of ADVANCE BY
.
The optional WITH (source_property = value [, ...])
clause specifies Source Properties.
Example
Simple HOP with SIZE and ADVANCE
Let’s use the following input records for this example:
When the following SELECT
statement is run:
We get the following output:
NOTE: The output results in this example are not fully deterministic as factors such as late arriving data can alter the results.
The above example shows a sample of outputs for this aggregation query. Since this is a streaming query, more records will be outputted as data arrives in the source. Note that the viewtime
column, which is being used as the timestamp for the pageviews
source, represents milliseconds since epoch. Looking at the output records, we can see the window_start
and window_end
times match the HOP
window’s 1 minute size, and the pgcnt
value for each userid
corresponds with the number of input records with the same userid
whose viewtime
falls within that window. Notice there are no output records with a window_start=1970-01-01T00:00:30
and window_end=1970-01-01T00:01:30
. This is because there are no records from the source whose viewtime
value falls within that window and thus no data to output.
HOP window with GROUP BY and HAVING clauses
The following aggregates the number of unique pages a particular userid
visits in a 2 minute window, then only emit results having a count greater than 1. With a HOP
window, since the ADVANCE BY
parameter is 30 seconds, a new 2 minute window is created every 30 seconds.
TUMBLE Window
Syntax
A TUMBLE
window shows a fixed-size time interval. It models non-overlapping instances of a window with no gaps. A TUMBLE
window is defined by a single parameter: SIZE
, which defines a window’s time duration. You can think of a TUMBLE
window function as a special case of a HOP
window function in which SIZE
and ADVANCE BY
values are equal. Each record belongs to only one instance of a TUMBLE
window according to its timestamp.
Example
Simple TUMBLE window with SIZE
Assume a pageviews
Stream has been created by the following DDL:
Let’s use the following input records for this example:
When the following SELECT
statement is run:
We get the following output:
NOTE: The output results in this example are not fully deterministic as factors such as late arriving data can alter results.
The above example shows a sample of outputs for this aggregation query. Since this is a streaming query, more records will be outputted as data arrives in the source. Note that the viewtime
column, which is being used as the timestamp for the pageviews
source, represents milliseconds since epoch. Looking at the output records, we can see the window_start
and window_end
times match the TUMBLE
window’s 30 second size, and the cnt
value for each userid
corresponds with the number of input records with the same userid
whose viewtime
falls within that window. Notice there are no output records with a window_start=1970-01-01T00:00:30
and window_end=1970-01-01T00:01:00
. This is because there are no records from the source whose viewtime
value falls within that window and thus no data to output.
TUMBLE window with WHERE and GROUP BY clauses
The following aggregates the number of unique pages a particular userid visits in a 10 second window. With a TUMBLE
window, a new 10 second window is opened at the end of the previous one, so no windows overlap.
TUMBLE window with shorter allow latency
The following aggregates the number of unique pages a particular userid visits in a 10 second window.. It also sets source properties to shorten the allow latency millis, indicating to the query to discard data arriving 1 second late. In this example, if the greatest event timestamp ingested by the query thus far is 10000ms
, then any events with a timestamp of less than 9000ms
will not be processed.
CUMULATE Window
Syntax
A CUMULATE
window defines a fixed-size time interval where each window itself is split into fixed-size windows all with the same window_start
. A CUMULATE
window is defined by two parameters: SIZE
, which defines a window’s max time duration, and STEP
, which defines an increased window size between the end of sequential CUMULATE
windows within a given window. You can think of a CUMULATE
window function as a TUMBLE
window function, of width SIZE
, which itself is split into smaller windows with increasing size that all have the same start as window_start
. When defining a CUMULATE
window, SIZE
must be an integral multiple of STEP
.
Example
Simple CUMULATE window with SIZE and STEP
Let’s use the pageviews
Stream that's created using this DDL:
with the following input records for this example:
When the following SELECT
statement is run:
We get the following output:
NOTE: The output results in this example are not fully deterministic as factors such as late arriving data can alter results.
The above example shows a sample of outputs for this aggregation query. Since this is a streaming query, more records will be outputted as data arrives in the source. Note that the viewtime
column, which is being used as the timestamp for the pageviews
source, represents milliseconds since epoch. Looking at the output records, we can see the window_start
and window_end
times match the CUMULATE
window’s expected size, a max size of 1 minute with intermediate windows of 20 seconds and 40 seconds long. The cnt
value for each userid
corresponds with the number of input records with the same userid
whose viewtime
falls within their respective windows. Notice there are no output records with a window_start=1970-01-01T00:01:00
and window_end=1970-01-01T00:01:20
. This is because there are no records from the source whose viewtime
value falls within that window and thus no data to output.
CUMULATE window with a GROUP BY and HAVING clause
The following aggregates the number of unique pages a particular userid
visits in several cascading windows with the longest window being 10 minutes. With a CUMULATE
window, since the STEP
parameter is 5 seconds, a window is closed every 5 seconds. Then the query only emits results having a count of unique pageid
values greater than 5.
Sub-Queries
Sub-queries are a way for adding additional layer of processing to an original schema without storing it to a Store and before providing it to an outer query within the same SQL query.
Just like any other Relation within the FROM
clause, a sub-query may or may not have an alias, but its columns can only be referred to by a qualified name if an alias was provided for it. Additionally, the columns list of a sub-query must be unique, and the columns become available to the outer query as they were named or aliased just like any other Relation with a corresponding schema.
Since sub-queries are internal to a SQL query, they are neither persisted as a Relation nor their data is persisted in the Store in which the SQL was run on.
A sub-query has the following syntax and it can be used in a FROM
clause within a SELECT
statement:
The rest of this section shows various use cases of in which a sub-query can become useful.
Transformation
One of the basic use cases of a sub-query is intermediary transformation or sanitization of data where this data is not needed outside the scope of the given SQL. This reduces cost and overhead for an organization for omiting the need for additional persisted Relations.
Here we use the pageviews
stream to convert its userid
and pageid
to INTEGER
for building a simple point system based on the page numbers that was visited by each user:
Structuring
Sub-queries are also useful when events need to be structured within the scope of a SQL query. The following example shows that a self-JOIN on top of an aggregation for counting the number of pages visited omits the need for a separate stream only for the grouping sub-query:
Denormalization
Using a sub-query, events can also be denormalized for the purpose of a SQL. The following query shows that with a sub-query JOIN
we expanded the necessary user information for the outer JOIN
to be able to create shipment metrics which describing the user involved in that shipment:
Last updated