FROM
Last updated
Last updated
The FROM
clause specifies the source to use in a SELECT statement. It can refer to a , the result of a window function, or a JOIN
of two Relations (Stream/Stream or Stream/Changelog) that combines records based on a given join criteria. This clause can also be another SELECT for executing additional processing logic on the original source events before applying the rest of the operators/clauses on the processed events. See Sub-Queries examples below.
See Window Function below for more information about window functions.
See JOIN for more information about joining two Relations as sources in a statement.
The reference to a source in the following syntax:
where, relation_name
can be specified as fully or partially qualified names via specifying the database_name
and/or schema_name
in the format [<database_name>.<schema_name>.]<relation_name>
, e.g. db1.public.pageviews
. Otherwise, the current Database and Schema will be used to identify the Relation. For case-sensitive names, the name must be wrapped in double quotes; otherwise, the lowercase version of the name will be used.
See also MATCH_RECOGNIZE.
This optional clause specifies the Source Properties along with any Store specific ones documented below.
timestamp
Required: No
Default value: Records’ timestamp
Type: String
Valid values: Must be of type BIGINT
or TIMESTAMP
. See Data Types.
timestamp.format
The format to use for TIMESTAMP
typed fields. See Data Types.
Required: No
Default value: sql
Type: String
Valid values: sql
, iso8601
source.allow.latency.millis
This property sets an upper bound, as a long value, on how late a record can be in milliseconds. Lateness is calculated based on the greatest chronological timestamp that has been read so far. A record that arrives with a timestamp (t
) that is less than the difference between (a
) the greatest chronological timestamp seen thus far and (b
) the number of milliseconds specified by this property will be discarded (i.e. if t < a - b
then the record with the timestamp t
will be ignored because it is deemed too late). This property only applies for queries that are stateful and deal with windows.
Required: No Default value: 10000 Type: Long Valid values: [1, ...]
source.idle.timeout.millis
This property sets an upper bound, as a long value, on the length of time a source can remain without new events before marking this source as idle. In the case that one or more sources do not carry events while another source still has events, this can cause the query to stall as windows potentially created by stateful queries cannot be closed. A source marked as idle is not considered for the query determining if a window can be closed, and thus it won’t stall the query.
Required: No Default: None Type: Long Valid values: [1, ...]
source.deserialization.error.handling
Required: No
Default value: TERMINATE
Type: String
Valid values:
TERMINATE
: When an error occurs when attempting to read records from the source, fail the query and return an error message to the user.
IGNORE
: When an error occurs when attempting to read records from the source, skip any records that fail deserialization and continue executing the query.
IGNORE_AND_LOG
: When an error occurs when attempting to read records from the source, skip and log any records that fail deserialization then continue executing the query. When this value is being used, the property source.deserialization.error.log.topic
must also be provided.
source.deserialization.error.log.topic
This property specifies the name of the topic to use for logging records that a query's source has failed to deserialize when that query's source has the property source.deserialization.error.handling
set to IGNORE_AND_LOG
. The format for the records in this error log will depend on the underlying store type of the source Relation. If the topic already exists then it will be used. If it does not exist, it will be created with whichever additional properties are relevant for creation, depending on the error log topic's store.
Required: No, unless source.deserialization.error.handling
is IGNORE_AND_LOG
Default value: None
Type: String
source.deserialization.error.log.store
This property specifies the name of the store associated with the source.deserialization.error.log.topic
property.
Required: No Default value: Store name of the source Relation Type: String Valid values: Store name belonging to a Kafka or Kinesis typed Store.
starting.position
Required: No Default value:
Type: String Valid values:
earliest
: Start reading from the earliest available offsets in each partition of the Kafka topic.
latest
: Start reading newly arriving data for each partition of the Kafka topic.
offsets
: Start reading from the partition offsets specified by the starting.position.offsets
property. Note that if this mode is being used, the starting.position.offsets
property must be provided.
timestamp
: Start reading from the timestamp specified by the starting.position.timestamp.ms
property for each partition of the Kafka topic. The timestamp corresponds with the Kafka records’ timestamp. Note that if this mode is being used, the starting.position.timestamp.ms
property must be provided.
starting.position.offsets
Required: No, unless starting.position
is offsets
Default value: None
Type: String
Valid values: ;
separated list of partition:<partition_num>,offset:<offset_num>
. Both partition_num
and offset_num
must be in the range of [1, ...].
starting.position.timestamp.ms
Required: No, unless starting.position
is timestamp
Default value: None
Type: Long
Valid values: [1, …]
source.deserialization.error.log.topic.partitions
This property specifies the number of partitions the topic associated with the source.deserialization.error.log.topic
property should have.
Required: No, unless source.deserialization.error.log.topic
doesn't exist
Default value: None
Type: Integer
Valid values: [1, ...]
source.deserialization.error.log.topic.replicas
This property specifies the number of replicas the topic associated with the source.deserialization.error.log.topic
property should have.
Required: No, unless source.deserialization.error.log.topic
doesn't exist
Default value: None
Type: Integer
Valid values: [1, ...]
source.kafka.partition.discovery.millis
This property specifies the milliseconds interval to discover new partitions in the Kafka topic. By default this property is set to 10000, meaning every 10 seconds the query will look for new partitions to consume from. If disabled, the query will not consume from partitions added while the query is running.
Required: No Default value: 10000 Type: Long Valid values: [1, ...] or -1 to disable
starting.position
Required: No Default value:
Type: String Valid values:
earliest
: Start reading from the earliest available sequence in each shard of the Kinesis data stream.
latest
: Start reading newly arriving data for each shard of the Kinesis data stream.
timestamp
: Start reading from the timestamp specified by the starting.position.timestamp.ms
property for each shard of the Kinesis data stream. The timestamp corresponds with the Kinesis records’ timestamp. Note that if this mode is being used, the starting.position.timestamp.ms
property must be provided.
starting.position.timestamp.ms
Required: No, unless starting.position
is timestamp
Default value: None
Type: Long
Valid values: [1, …]
source.deserialization.error.log.topic.shards
This property specifies the number of shards the stream associated with the source.deserialization.error.log.topic
property should have.
Required: No, unless source.deserialization.error.log.topic
doesn't exist
Default value: None
Type: Integer
Valid values: [1, ...]
Using a window function, records are split into finite size batches. Each record can belong to one or multiple windows, depending on the window type. A window defines a time interval and has a start time and an end time. A record belongs to a window instance if its timestamp is in between the start time and end time of the window. By applying a window on a Relation, logically a new Relation is created that includes all columns of the original Relation as well as two additional columns named window_start
and window_end
. These two columns can be used in the SELECT
and/or WHERE
clauses, similar to original Relation columns. Moreover, they can be added to the GROUP BY
clause to group records according to a time-based window. When defining any of the window types, OFFSET
is an optional parameter. If added, OFFSET
specifies the time offset, which the start of a window should be shifted by. Currently, three types of windows are supported.
A hopping window models a fixed-size time interval and is defined by two parameters: SIZE
, which defines a window’s time duration, and the ADVANCE BY
interval, which defines how much a window moves forward in time relative to the previous window instance. Depending on the ADVANCE BY
value, HOP
window instances could overlap with each other. Therefore, a given record could belong to one or more such window instances according to its timestamp. When using a HOP
window, the SIZE
must be an integral multiple of ADVANCE BY
.
The optional WITH (source_property = value [, ...])
clause specifies Source Properties.
Let’s use the following input records for this example:
When the following SELECT
statement is run:
We get the following output:
NOTE: The output results in this example are not fully deterministic as factors such as late arriving data can alter the results.
The above example shows a sample of outputs for this aggregation query. Since this is a streaming query, more records will be outputted as data arrives in the source. Note that the viewtime
column, which is being used as the timestamp for the pageviews
source, represents milliseconds since epoch. Looking at the output records, we can see the window_start
and window_end
times match the HOP
window’s 1 minute size, and the pgcnt
value for each userid
corresponds with the number of input records with the same userid
whose viewtime
falls within that window. Notice there are no output records with a window_start=1970-01-01T00:00:30
and window_end=1970-01-01T00:01:30
. This is because there are no records from the source whose viewtime
value falls within that window and thus no data to output.
The following aggregates the number of unique pages a particular userid
visits in a 2 minute window, then only emit results having a count greater than 1. With a HOP
window, since the ADVANCE BY
parameter is 30 seconds, a new 2 minute window is created every 30 seconds.
A TUMBLE
window shows a fixed-size time interval. It models non-overlapping instances of a window with no gaps. A TUMBLE
window is defined by a single parameter: SIZE
, which defines a window’s time duration. You can think of a TUMBLE
window function as a special case of a HOP
window function in which SIZE
and ADVANCE BY
values are equal. Each record belongs to only one instance of a TUMBLE
window according to its timestamp.
Let’s use the following input records for this example:
When the following SELECT
statement is run:
We get the following output:
NOTE: The output results in this example are not fully deterministic as factors such as late arriving data can alter results.
The above example shows a sample of outputs for this aggregation query. Since this is a streaming query, more records will be outputted as data arrives in the source. Note that the viewtime
column, which is being used as the timestamp for the pageviews
source, represents milliseconds since epoch. Looking at the output records, we can see the window_start
and window_end
times match the TUMBLE
window’s 30 second size, and the cnt
value for each userid
corresponds with the number of input records with the same userid
whose viewtime
falls within that window. Notice there are no output records with a window_start=1970-01-01T00:00:30
and window_end=1970-01-01T00:01:00
. This is because there are no records from the source whose viewtime
value falls within that window and thus no data to output.
The following aggregates the number of unique pages a particular userid visits in a 10 second window. With a TUMBLE
window, a new 10 second window is opened at the end of the previous one, so no windows overlap.
The following aggregates the number of unique pages a particular userid visits in a 10 second window.. It also sets source properties to shorten the allow latency millis, indicating to the query to discard data arriving 1 second late. In this example, if the greatest event timestamp ingested by the query thus far is 10000ms
, then any events with a timestamp of less than 9000ms
will not be processed.
A CUMULATE
window defines a fixed-size time interval where each window itself is split into fixed-size windows all with the same window_start
. A CUMULATE
window is defined by two parameters: SIZE
, which defines a window’s max time duration, and STEP
, which defines an increased window size between the end of sequential CUMULATE
windows within a given window. You can think of a CUMULATE
window function as a TUMBLE
window function, of width SIZE
, which itself is split into smaller windows with increasing size that all have the same start as window_start
. When defining a CUMULATE
window, SIZE
must be an integral multiple of STEP
.
with the following input records for this example:
When the following SELECT
statement is run:
We get the following output:
NOTE: The output results in this example are not fully deterministic as factors such as late arriving data can alter results.
The above example shows a sample of outputs for this aggregation query. Since this is a streaming query, more records will be outputted as data arrives in the source. Note that the viewtime
column, which is being used as the timestamp for the pageviews
source, represents milliseconds since epoch. Looking at the output records, we can see the window_start
and window_end
times match the CUMULATE
window’s expected size, a max size of 1 minute with intermediate windows of 20 seconds and 40 seconds long. The cnt
value for each userid
corresponds with the number of input records with the same userid
whose viewtime
falls within their respective windows. Notice there are no output records with a window_start=1970-01-01T00:01:00
and window_end=1970-01-01T00:01:20
. This is because there are no records from the source whose viewtime
value falls within that window and thus no data to output.
The following aggregates the number of unique pages a particular userid
visits in several cascading windows with the longest window being 10 minutes. With a CUMULATE
window, since the STEP
parameter is 5 seconds, a window is closed every 5 seconds. Then the query only emits results having a count of unique pageid
values greater than 5.
Sub-queries are a way for adding additional layer of processing to an original schema without storing it to a Store and before providing it to an outer query within the same SQL query.
Since sub-queries are internal to a SQL query, they are neither persisted as a Relation nor their data is persisted in the Store in which the SQL was run on.
A sub-query has the following syntax and it can be used in a FROM
clause within a SELECT
statement:
The rest of this section shows various use cases of in which a sub-query can become useful.
One of the basic use cases of a sub-query is intermediary transformation or sanitization of data where this data is not needed outside the scope of the given SQL. This reduces cost and overhead for an organization for omiting the need for additional persisted Relations.
Here we use the pageviews
stream to convert its userid
and pageid
to INTEGER
for building a simple point system based on the page numbers that was visited by each user:
Sub-queries are also useful when events need to be structured within the scope of a SQL query. The following example shows that a self-JOIN on top of an aggregation for counting the number of pages visited omits the need for a separate stream only for the grouping sub-query:
Using a sub-query, events can also be denormalized for the purpose of a SQL. The following query shows that with a sub-query JOIN
we expanded the necessary user information for the outer JOIN
to be able to create shipment metrics which describing the user involved in that shipment:
This property specifies a column name in the source Relation that will be used for event-time processing. If this property is not provided, the timestamp
associated with the ’s records will be used. The Relation’s timestamp
column can be specified when the Relation is created; otherwise the record’s timestamp will be used as default. If the type of this timestamp field is BIGINT
, it is expected that the values are epoch milliseconds UTC.
This property sets the strategy for how the query should handle invalid data when reading records in the of the source . For a detailed explanation with examples, see Configuring Deserialization Error Handling
This property sets the strategy for how the query should start reading from the of the source .
latest
for
earliest
for
This property specifies the partition-offset pairs from which the corresponding Kafka topic for the will be read for this statement. This property must be provided when the starting.position
property is set to offsets
. The format for passing partition-offset values follows a ;
separated list of partition:<partition_num>,offset:<offset_num>
. For example, to start reading from the first offset of partition 0 and the third offset of partition 3, it’d be set to partition:0,offset:1;partition:1:offset:3
.
This property specifies the timestamp from which the Kafka topic for the will be read for this statement. The timestamp value is in epoch-milliseconds, which is measured as the number of milliseconds since January 1, 1970 00:00:00.000 GMT
.
This property sets the strategy for how the query should start reading from the of the source .
latest
for
earliest
for
This property specifies the timestamp from which the Kinesis data stream for the will be read for this statement. The timestamp value is in epoch-milliseconds, which is measured as the number of milliseconds since January 1, 1970 00:00:00.000 GMT
.
Assume a pageviews
has been created by the following DDL:
Let’s use the pageviews
that's created using this DDL:
Just like any other within the FROM
clause, a sub-query may or may not have an alias, but its columns can only be referred to by a qualified name if an alias was provided for it. Additionally, the columns list of a sub-query must be unique, and the columns become available to the outer query as they were named or aliased just like any other Relation with a corresponding schema.