FROM

Syntax

FROM relation_reference [[AS] alias]
    [JOIN relation_reference [[AS] alias]]
    [WITH (source_property = value [, ...])]

Description

The FROM clause specifies the source to use in a SELECT statement. It can refer to a #_relation, the result of a window function, or a JOIN of two Relations (Stream/Stream or Stream/Changelog) that combines records based on a given join criteria. This clause can also be another SELECT for executing additional processing logic on the original source events before applying the rest of the operators/clauses on the processed events. See Sub-Queries examples below.

See Window Function below for more information about window functions.

See JOIN for more information about joining two Relations as sources in a statement.

Arguments

relation_reference

The reference to a source in the following syntax:

relation_name
  [MATCH_RECOGNIZE]
| window_function
| ( select_statement )

where, relation_name can be specified as fully or partially qualified names via specifying the database_name and/or schema_name in the format [<database_name>.<schema_name>.]<relation_name>, e.g. db1.public.pageviews. Otherwise, the current Database and Schema will be used to identify the Relation. For case-sensitive names, the name must be wrapped in double quotes; otherwise, the lowercase version of the name will be used.

See also MATCH_RECOGNIZE.

WITH (source_property = value [, …​])

This optional clause specifies the #_source_properties along with any Store specific ones documented below.

Source Properties

Property NameDescription

timestamp

Required: No Default value: Records’ timestamp Type: String Valid values: Must be of type BIGINT or TIMESTAMP. See Data Types.

timestamp.format

The format to use for TIMESTAMP typed fields. See Data Types.

Required: No Default value: sql Type: String Valid values: sql, iso8601

source.allow.latency.millis

This property sets an upper bound, as a long value, on how late a record can be in milliseconds. Lateness is calculated based on the greatest chronological timestamp that has been read so far. A record that arrives with a timestamp (t) that is less than the difference between (a) the greatest chronological timestamp seen thus far and (b) the number of milliseconds specified by this property will be discarded (i.e. if t < a - b then the record with the timestamp t will be ignored because it is deemed too late). This property only applies for queries that are stateful and deal with windows.

Required: No Default value: 10000 Type: Long Valid values: [1, ...]

source.idle.timeout.millis

This property sets an upper bound, as a long value, on the length of time a source can remain without new events before marking this source as idle. In the case that one or more sources do not carry events while another source still has events, this can cause the query to stall as windows potentially created by stateful queries cannot be closed. A source marked as idle is not considered for the query determining if a window can be closed, and thus it won’t stall the query.

Required: No Default: None Type: Long Valid values: [1, ...]

source.deserialization.error.handling

Required: No Default value: TERMINATE Type: String Valid values:

  • TERMINATE: When an error occurs when attempting to read records from the source, fail the query and return an error message to the user.

  • IGNORE: When an error occurs when attempting to read records from the source, skip any records that fail deserialization and continue executing the query.

  • IGNORE_AND_LOG: When an error occurs when attempting to read records from the source, skip and log any records that fail deserialization then continue executing the query. When this value is being used, the property source.deserialization.error.log.topic must also be provided.

source.deserialization.error.log.topic

This property specifies the name of the topic to use for logging records that a query's source has failed to deserialize when that query's source has the property source.deserialization.error.handling set to IGNORE_AND_LOG. The format for the records in this error log will depend on the underlying store type of the source Relation. If the topic already exists then it will be used. If it does not exist, it will be created with whichever additional properties are relevant for creation, depending on the error log topic's store.

Required: No, unless source.deserialization.error.handling is IGNORE_AND_LOG Default value: None Type: String

source.deserialization.error.log.store

This property specifies the name of the store associated with the source.deserialization.error.log.topic property.

Required: No Default value: Store name of the source Relation Type: String Valid values: Store name belonging to a Kafka or Kinesis typed Store.

Kafka Specific Source Properties

Property NameDescription

starting.position

Required: No Default value:

Type: String Valid values:

  • earliest: Start reading from the earliest available offsets in each partition of the Kafka topic.

  • latest: Start reading newly arriving data for each partition of the Kafka topic.

  • offsets: Start reading from the partition offsets specified by the starting.position.offsets property. Note that if this mode is being used, the starting.position.offsets property must be provided.

  • timestamp: Start reading from the timestamp specified by the starting.position.timestamp.ms property for each partition of the Kafka topic. The timestamp corresponds with the Kafka records’ timestamp. Note that if this mode is being used, the starting.position.timestamp.ms property must be provided.

starting.position.offsets

Required: No, unless starting.position is offsets

Default value: None

Type: String Valid values: ; separated list of partition:<partition_num>,offset:<offset_num>. Both partition_num and offset_num must be in the range of [1, ...].

starting.position.timestamp.ms

Required: No, unless starting.position is timestamp Default value: None Type: Long Valid values: [1, …]

source.deserialization.error.log.topic.partitions

This property specifies the number of partitions the topic associated with the source.deserialization.error.log.topic property should have.

Required: No, unless source.deserialization.error.log.topic doesn't exist Default value: None Type: Integer Valid values: [1, ...]

source.deserialization.error.log.topic.replicas

This property specifies the number of replicas the topic associated with the source.deserialization.error.log.topic property should have.

Required: No, unless source.deserialization.error.log.topic doesn't exist Default value: None Type: Integer Valid values: [1, ...]

source.kafka.partition.discovery.millis

This property specifies the milliseconds interval to discover new partitions in the Kafka topic. By default this property is set to 10000, meaning every 10 seconds the query will look for new partitions to consume from. If disabled, the query will not consume from partitions added while the query is running.

Required: No Default value: 10000 Type: Long Valid values: [1, ...] or -1 to disable

Kinesis Specific Source Properties

Property NameDescription

starting.position

Required: No Default value:

Type: String Valid values:

  • earliest: Start reading from the earliest available sequence in each shard of the Kinesis data stream.

  • latest: Start reading newly arriving data for each shard of the Kinesis data stream.

  • timestamp: Start reading from the timestamp specified by the starting.position.timestamp.ms property for each shard of the Kinesis data stream. The timestamp corresponds with the Kinesis records’ timestamp. Note that if this mode is being used, the starting.position.timestamp.ms property must be provided.

starting.position.timestamp.ms

Required: No, unless starting.position is timestamp Default value: None Type: Long Valid values: [1, …]

source.deserialization.error.log.topic.shards

This property specifies the number of shards the stream associated with the source.deserialization.error.log.topic property should have.

Required: No, unless source.deserialization.error.log.topic doesn't exist Default value: None Type: Integer Valid values: [1, ...]

Window Function

Using a window function, records are split into finite size batches. Each record can belong to one or multiple windows, depending on the window type. A window defines a time interval and has a start time and an end time. A record belongs to a window instance if its timestamp is in between the start time and end time of the window. By applying a window on a Relation, logically a new Relation is created that includes all columns of the original Relation as well as two additional columns named window_start and window_end. These two columns can be used in the SELECT and/or WHERE clauses, similar to original Relation columns. Moreover, they can be added to the GROUP BY clause to group records according to a time-based window. When defining any of the window types, OFFSET is an optional parameter. If added, OFFSET specifies the time offset, which the start of a window should be shifted by. Currently, three types of windows are supported.

HOP Window

Syntax

FROM HOP (relation_name, SIZE time_unit, ADVANCE BY time_unit)
   [WITH (source_property = value [, ...])]

A hopping window models a fixed-size time interval and is defined by two parameters: SIZE, which defines a window’s time duration, and the ADVANCE BY interval, which defines how much a window moves forward in time relative to the previous window instance. Depending on the ADVANCE BY value, HOP window instances could overlap with each other. Therefore, a given record could belong to one or more such window instances according to its timestamp. When using a HOP window, the SIZE must be an integral multiple of ADVANCE BY.

The optional WITH (source_property = value [, ...]) clause specifies #_source_properties.

Example

Simple HOP with SIZE and ADVANCE

Let’s use the following input records for this example:

+----------+--------+---------+
| viewtime | userid | pageid  |
+----------+--------+---------+
|     1000 | User_2 | Page_72 |
|     2000 | User_5 | Page_12 |
|     3000 | User_2 | Page_34 |
|    91000 | User_2 | Page_10 |
|    92000 | User_2 | Page_44 |
|   121000 | User_5 | Page_67 |
|   122000 | User_2 | Page_93 |
|   160000 | User_5 | Page_3  |
+----------+--------+---------+

When the following SELECT statement is run:

SELECT 
  window_start, 
  window_end,
  userid, 
  COUNT(pageid) AS pgcnt 
FROM HOP (pageviews, SIZE 1 minute, ADVANCE BY 30 second)
WITH ('timestamp'='viewtime')
GROUP BY window_start, window_end, userid;

We get the following output:

+---------------------+---------------------+--------+-------+
|    window_start     |     window_end      | userid | pgcnt |
+---------------------+---------------------+--------+-------+
| 1970-01-01T00:00:00 | 1970-01-01T00:01:00 | User_2 |     2 |
| 1970-01-01T00:00:00 | 1970-01-01T00:01:00 | User_5 |     1 |
| 1970-01-01T00:01:00 | 1970-01-01T00:02:00 | User_2 |     2 |
| 1970-01-01T00:01:30 | 1970-01-01T00:02:30 | User_2 |     3 |
| 1970-01-01T00:01:30 | 1970-01-01T00:02:30 | User_5 |     1 |
+---------------------+---------------------+--------+-------+

NOTE: The output results in this example are not fully deterministic as factors such as late arriving data can alter the results.

The above example shows a sample of outputs for this aggregation query. Since this is a streaming query, more records will be outputted as data arrives in the source. Note that the viewtime column, which is being used as the timestamp for the pageviews source, represents milliseconds since epoch. Looking at the output records, we can see the window_start and window_end times match the HOP window’s 1 minute size, and the pgcnt value for each userid corresponds with the number of input records with the same userid whose viewtime falls within that window. Notice there are no output records with a window_start=1970-01-01T00:00:30 and window_end=1970-01-01T00:01:30. This is because there are no records from the source whose viewtime value falls within that window and thus no data to output.

HOP window with GROUP BY and HAVING clauses

The following aggregates the number of unique pages a particular userid visits in a 2 minute window, then only emit results having a count greater than 1. With a HOP window, since the ADVANCE BY parameter is 30 seconds, a new 2 minute window is created every 30 seconds.

SELECT 
  window_start, 
  window_end,
  userid, 
  COUNT(pageid) AS pgcnt 
FROM HOP(pageviews, size 2 minutes, advance by 30 seconds) 
GROUP BY window_start, window_end, userid 
HAVING COUNT(pageid) > 1;

TUMBLE Window

Syntax

FROM TUMBLE (relation_name, SIZE time_unit)
   [WITH (source_property = value [, ...])]

A TUMBLE window shows a fixed-size time interval. It models non-overlapping instances of a window with no gaps. A TUMBLE window is defined by a single parameter: SIZE, which defines a window’s time duration. You can think of a TUMBLE window function as a special case of a HOP window function in which SIZE and ADVANCE BY values are equal. Each record belongs to only one instance of a TUMBLE window according to its timestamp.

Example

Simple TUMBLE window with SIZE

Assume a pageviews #_stream has been created by the following DDL:

CREATE STREAM pageviews (
  viewtime BIGINT,
  userid VARCHAR,
  pageid VARCHAR
) WITH (
  'topic' = 'pageviews'
);

Let’s use the following input records for this example:

+----------+--------+---------+
| viewtime | userid | pageid  |
+----------+--------+---------+
|     1000 | User_2 | Page_72 |
|     2000 | User_5 | Page_12 |
|     3000 | User_2 | Page_34 |
|    61000 | User_5 | Page_67 |
|    62000 | User_2 | Page_67 |
|    91000 | User_2 | Page_10 |
+----------+--------+---------+

When the following SELECT statement is run:

SELECT 
  window_start,
  window_end,
  userid,
  COUNT(*) AS cnt
FROM TUMBLE (pageviews, SIZE 30 seconds)
WITH ('timestamp'='viewtime')
GROUP BY window_start, window_end, userid;

We get the following output:

+---------------------+---------------------+--------+-----+
|    window_start     |     window_end      | userid | cnt |
+---------------------+---------------------+--------+-----+
| 1970-01-01T00:00:00 | 1970-01-01T00:00:30 | User_2 |   2 |
| 1970-01-01T00:00:00 | 1970-01-01T00:00:30 | User_5 |   1 |
| 1970-01-01T00:01:00 | 1970-01-01T00:01:30 | User_2 |   1 |
| 1970-01-01T00:01:00 | 1970-01-01T00:01:30 | User_5 |   1 |
+---------------------+---------------------+--------+-----+

NOTE: The output results in this example are not fully deterministic as factors such as late arriving data can alter results.

The above example shows a sample of outputs for this aggregation query. Since this is a streaming query, more records will be outputted as data arrives in the source. Note that the viewtime column, which is being used as the timestamp for the pageviews source, represents milliseconds since epoch. Looking at the output records, we can see the window_start and window_end times match the TUMBLE window’s 30 second size, and the cnt value for each userid corresponds with the number of input records with the same userid whose viewtime falls within that window. Notice there are no output records with a window_start=1970-01-01T00:00:30 and window_end=1970-01-01T00:01:00. This is because there are no records from the source whose viewtime value falls within that window and thus no data to output.

TUMBLE window with WHERE and GROUP BY clauses

The following aggregates the number of unique pages a particular userid visits in a 10 second window. With a TUMBLE window, a new 10 second window is opened at the end of the previous one, so no windows overlap.

SELECT 
  window_start,
  window_end,
  userid,
  COUNT(pageid) AS cnt
FROM TUMBLE (pageviews, SIZE 10 seconds)
WHERE userid != 'User_6' 
GROUP BY window_start, window_end, userid;

TUMBLE window with shorter allow latency

The following aggregates the number of unique pages a particular userid visits in a 10 second window.. It also sets source properties to shorten the allow latency millis, indicating to the query to discard data arriving 1 second late. In this example, if the greatest event timestamp ingested by the query thus far is 10000ms, then any events with a timestamp of less than 9000ms will not be processed.

SELECT 
  userid, 
  COUNT(pageid) AS pgcnt, 
  window_start,
  window_end
FROM TUMBLE(pageviews, size 10 seconds)
WITH (
  'source.allow.latency.millis' = '1000'
)
GROUP BY userid, window_start, window_end;

CUMULATE Window

Syntax

FROM CUMULATE (relation_name, SIZE time_unit, STEP time_unit)
   [WITH (source_property = value [, ...])]

A CUMULATE window defines a fixed-size time interval where each window itself is split into fixed-size windows all with the same window_start. A CUMULATE window is defined by two parameters: SIZE, which defines a window’s max time duration, and STEP, which defines an increased window size between the end of sequential CUMULATE windows within a given window. You can think of a CUMULATE window function as a TUMBLE window function, of width SIZE, which itself is split into smaller windows with increasing size that all have the same start as window_start. When defining a CUMULATE window, SIZE must be an integral multiple of STEP.

Example

Simple CUMULATE window with SIZE and STEP

Let’s use the pageviews #_stream that's created using this DDL:

CREATE STREAM pageviews (
  viewtime BIGINT,
  userid VARCHAR,
  pageid VARCHAR
) WITH (
  'topic' = 'pageviews'
);

with the following input records for this example:

+----------+--------+---------+
| viewtime | userid | pageid  |
+----------+--------+---------+
|     1000 | User_2 | Page_72 |
|     2000 | User_5 | Page_12 |
|     3000 | User_2 | Page_34 |
|    51000 | User_5 | Page_67 |
|    52000 | User_2 | Page_67 |
|    91000 | User_2 | Page_10 |
|   101000 | User_2 | Page_88 |
+----------+--------+---------+

When the following SELECT statement is run:

SELECT 
  window_start,
  window_end,
  userid,
  COUNT(*) AS cnt
FROM CUMULATE (pageviews, SIZE 1 minutes, STEP 20 seconds)
WITH ('timestamp'='viewtime')
GROUP BY window_start, window_end, userid;

We get the following output:

+---------------------+---------------------+--------+-----+
|    window_start     |     window_end      | userid | cnt |
+---------------------+---------------------+--------+-----+
| 1970-01-01T00:00:00 | 1970-01-01T00:00:20 | User_2 |   2 |
| 1970-01-01T00:00:00 | 1970-01-01T00:00:20 | User_5 |   1 |
| 1970-01-01T00:00:00 | 1970-01-01T00:00:40 | User_2 |   2 |
| 1970-01-01T00:00:00 | 1970-01-01T00:00:40 | User_5 |   1 |
| 1970-01-01T00:00:00 | 1970-01-01T00:01:00 | User_2 |   3 |
| 1970-01-01T00:00:00 | 1970-01-01T00:01:00 | User_5 |   2 |
| 1970-01-01T00:01:00 | 1970-01-01T00:01:40 | User_2 |   1 |
+---------------------+---------------------+--------+-----+

NOTE: The output results in this example are not fully deterministic as factors such as late arriving data can alter results.

The above example shows a sample of outputs for this aggregation query. Since this is a streaming query, more records will be outputted as data arrives in the source. Note that the viewtime column, which is being used as the timestamp for the pageviews source, represents milliseconds since epoch. Looking at the output records, we can see the window_start and window_end times match the CUMULATE window’s expected size, a max size of 1 minute with intermediate windows of 20 seconds and 40 seconds long. The cnt value for each userid corresponds with the number of input records with the same userid whose viewtime falls within their respective windows. Notice there are no output records with a window_start=1970-01-01T00:01:00 and window_end=1970-01-01T00:01:20. This is because there are no records from the source whose viewtime value falls within that window and thus no data to output.

CUMULATE window with a GROUP BY and HAVING clause

The following aggregates the number of unique pages a particular userid visits in several cascading windows with the longest window being 10 minutes. With a CUMULATE window, since the STEP parameter is 5 seconds, a window is closed every 5 seconds. Then the query only emits results having a count of unique pageid values greater than 5.

SELECT 
  window_start, 
  window_end, 
  userid, 
  COUNT(pageid) AS cnt
FROM 
  CUMULATE(PAGEVIEWS, size 10 minutes, step 5 SECOND)
GROUP BY window_start, window_end, userid
HAVING COUNT(pageid) > 5;

Sub-Queries

Sub-queries are a way for adding additional layer of processing to an original schema without storing it to a Store and before providing it to an outer query within the same SQL query.

Just like any other #_relation within the FROM clause, a sub-query may or may not have an alias, but its columns can only be referred to by a qualified name if an alias was provided for it. Additionally, the columns list of a sub-query must be unique, and the columns become available to the outer query as they were named or aliased just like any other Relation with a corresponding schema.

Since sub-queries are internal to a SQL query, they are neither persisted as a Relation nor their data is persisted in the Store in which the SQL was run on.

A sub-query has the following syntax and it can be used in a FROM clause within a SELECT statement:

SELECT ...
FROM ( sub-query );

The rest of this section shows various use cases of in which a sub-query can become useful.

Transformation

One of the basic use cases of a sub-query is intermediary transformation or sanitization of data where this data is not needed outside the scope of the given SQL. This reduces cost and overhead for an organization for omiting the need for additional persisted Relations.

Here we use the pageviews stream to convert its userid and pageid to INTEGER for building a simple point system based on the page numbers that was visited by each user:

SELECT uid, SUM(pid)*10 AS points
FROM (
    SELECT
        viewtime,
        CAST(SUBSTRING(userid FROM 6) AS INTEGER) AS uid,
        CAST(SUBSTRING(pageid FROM 6) AS INTEGER) AS pid
    FROM pageviews
) GROUP BY uid;

Structuring

Sub-queries are also useful when events need to be structured within the scope of a SQL query. The following example shows that a self-JOIN on top of an aggregation for counting the number of pages visited omits the need for a separate stream only for the grouping sub-query:

SELECT
    pg.userid AS uid,
    p.pageid AS pid,
    pg.pages AS num_pages
FROM
  pageviews p
JOIN
  (SELECT userid, SUM(pageid) AS pages FROM pageviews GROUP BY userid) pg
ON p.userid = pg.userid
WHERE pg.pages > 1;

Denormalization

Using a sub-query, events can also be denormalized for the purpose of a SQL. The following query shows that with a sub-query JOIN we expanded the necessary user information for the outer JOIN to be able to create shipment metrics which describing the user involved in that shipment:

CREATE STREAM csas_mr_puo AS SELECT
    p_user.*,
    o.orderid AS orderid,
    o.shipmenttime AS shiptime
FROM (
    SELECT
        p.pageid,
        p.userid AS uid,
        u.contactinfo->city AS city,
        u.contactinfo->zipcode AS zipcode,
        p.viewtime AS viewtime
    FROM pageviews p
    JOIN "users" u
    WITHIN 10 SECONDS ON p.userid = u.userid
) p_user
JOIN
  orders o
WITHIN 10 SECONDS
ON p_user.city = o.address->city;

Last updated