LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Region
      • SQL
      • Store
      • Database
      • Query
      • Visualizing Data Lineage
      • Function
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • Tutorials
    • Managing Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Creating Stores for Streaming Data
    • Using Multiple Stores in Queries
    • Creating Relations to Structure Raw Data
    • Namespacing with Database and Schema
    • Creating and Querying Materialized Views
    • Creating a Function
    • Securing Your Connections to Data Stores
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Integrations
      • Connecting to Confluent Cloud
      • Databricks
      • PostgreSQL
      • Snowflake
      • WarpStream
    • Serialization
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Reference
    • Enterprise Security Integrations
      • Okta SAML Integration
      • Okta SCIM Integration
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST REGIONS
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • Rest API
Powered by GitBook
On this page
  • Syntax
  • Description
  • Arguments
  • Query Properties
  • DISTINCT
  • Group Aggregation
  • GROUPING SETS
  • ROLLUP
  • CUBE
  • OVER Aggregation
  • RANGE interval
  • ROWS interval
  • OVER Aggregation in a Query
  • Examples
  1. Reference
  2. SQL Syntax
  3. Query

SELECT

PreviousRESTART QUERYNextFROM

Last updated 16 days ago

Syntax

with_clause
SELECT
    [DISTINCT] expression AS alias [, ...]
FROM from_clause
[WHERE where_clause]
[GROUP BY [GROUPING SETS | ROLLUP | CUBE] group_by_clause]
[HAVING having_clause]
[QUERY WITH (query_property = value [, ...])];

Description

The SELECT statement is used to retrieve data from one or more Relations: and/or . See SELECT (FROM MATERIALIZED VIEW) for retrieving data from a .

Arguments

with_clause

See WITH (Common Table Expression) for more information about defining CTEs.

SELECT expression AS alias [, ...]

Each expression in this clause can refer to a column from one of the statements’ sources, a primitive value, or the result of a Function call. Each expression can also be supplied an alias that sets the column name for the sink Relation. For case-sensitive column names, the name must be wrapped in double quotes; otherwise, the lowercase name will be attempted, and the statement may not find the column.

DISTINCT

If SELECT is specified with DISTINCT, duplicate rows are removed from the results of the query.

FROM relation_reference

See FROM for more information.

WHERE where_clause

Optionally, a source Relation’s records can be filtered according to supplied predicates in the WHERE clause. Predicates are specified as a Boolean expression, which can use columns from source Relations.

GROUP BY group_by_clause

HAVING having_clause

Optionally, this clause specifies aggregate filter conditions. The HAVING clause can only be used with GROUP BY to filter out groups of records that do not satisfy a condition. Any given column reference in the HAVING clause has to refer to a column from the GROUP BY columns list, or it should be referred within an aggregate function.

QUERY WITH (query_property = value [, …​])

Optionally, this clause specifies Query Properties.

See Query Name and Version and Resume Query on how to rebuild a new Query from a running one.

Query Properties

Property Name
Description

parallelism.max

This property provides a hint for the maximum units of compute resources to use in parallel to process data read from the source(s) and wrote into the sink within the scope of a query. All other operations between source and sink will use the available resources to maximize their processing power.

Required: No Default value: 1 Type: Integer Valid values: [1, ..., 10]

state.ttl.millis

Required: No Default value: Infinite Type: Long Valid values: [1, ...]

DISTINCT

SELECT DISTINCT removes duplicate rows from the query's results. A result row consists of value columns and row metadata (Check Row Metadata Functions). When using SELECT DISTINCT, duplicate value columns are removed from results and "processing time" is used as result row's timestamp. Note that adding DISTINCT to SELECT makes the query stateful and the state size for the query could grow infinitely, depending on number of distinct rows in the result. The state.ttl.millis query property can be used to set an expiration time for the entries in the query's state and cleaning it up (See Query Properties).

Group Aggregation

For more complex use cases of the GROUP BY clause, the following extensions can be used to simplify these type of statements.

GROUPING SETS

This extension can be used to simplify the union of multiple statements with their own GROUP BY clause. The following grouping statements:

SELECT aggregate_function(col1), col2, col3
FROM relation
GROUP BY col2, col3;

SELECT aggregate_function(col1), col2, NULL
FROM relation
GROUP BY col2;

SELECT aggregate_function(col1), NULL, NULL
FROM relation;

can be simplified using the GROUPING SETS extension to the GROUP BY to a single statement as followed:

SELECT aggregate_function(col1) AS agg, col2, col3
FROM relation
GROUP BY GROUPING SETS ((col2, col3), col2, ());

An empty subgroup means that all input rows are aggregate into a single group based on the aggregate_function(col1). This aggregate group returns even if no input rows were present, e.g. using COUNT(*) AS cnt, a resulting row may be {"cnt":XXX, NULL, NULL}.

ROLLUP

This extension is a shorthand for the GROUPING SETS extension:

GROUP BY ROLLUP (c1, c2, c3)

This can be rewritten as such using the GROUPING SETS extension:

GROUP BY GROUPING SETS (
  (c1, c2, c3),
  (c1, c2),
  (c1)
  ()
)

CUBE

This extension is an alias to the GROUPING SETS extension as followed:

GROUP BY CUBE (c1, c2, c3)

is rewritten as:

GROUP BY GROUPING SETS (
    ( c1, c2, c3 ),
    ( c1, c2     ),
    ( c1,     c3 ),
    ( c1         ),
    (     c2, c3 ),
    (     c2     ),
    (         c3 ),
    (            )
)

OVER Aggregation

In the SELECT clause, OVER aggregation is used to compute an aggregated value (via calling an aggregation function) for each record over a range of records, ordered according to the Relation’s timestamp.

SELECT ...,
agg_function(col) OVER (
[PARTITION BY col1 [, col2, ...]]
ORDER BY [rowtime() | timestamp_col]
range_definition
) AS alias,
...
FROM ...

If PARTITION BY is defined for an OVER aggregation, the aggregate value is computed per record over the records in its partition.

The range_definition is used to determine how many previous records should be used to compute the aggregate value for each given record. You can think of a range as an aggregation window, with lower and upper boundaries. Source records are ordered according to the ORDER BY clause, and for each record, the aggregate window defines the range of records to be used for aggregation. The upper boundary is always the current record, and it is inclusive. Two types of range can be defined in OVER aggregations:

RANGE interval

A RANGE interval uses a time interval over the expression used in the ORDER BY clause (i.e. the timestamp column or rowtime() to collect records for computing aggregation.

RANGE BETWEEN SIZE time_unit PRECEDING AND CURRENT ROW

The SIZE is a number that defines the length of the aggregation window according to the time_unit value. The aggregate value is computed over all records that are within this window, according to the current record’s timestamp.

ROWS interval

A ROWS interval uses a count-based interval over the records ordered according to the ORDER BY clause to collect records for computing aggregation.

ROWS BETWEEN number PRECEDING AND CURRENT ROW

OVER Aggregation in a Query

OVER aggregation is used in the SELECT clause. A query can have multiple OVER aggregation clauses in its SELECT; however the range definition for all of them should be the same. This means that there cannot be a mix of range definitions, or partitioned and non-partitioned OVER aggregations in a query. Moreover, the ORDER BY clause for all of them should be the same.

For example, assume orders is a Stream defined as:

CREATE STREAM orders (
    orderid BIGINT,
    category VARCHAR,
    total INTEGER)
WITH ('topic'='orders', 'value.format'='JSON');

The query below uses a range interval to define an OVER aggregation for each order record to compute the sum() of the orders’ total for all the orders records, with the same category as the current record, which are placed at most 30 seconds before the current order.

SELECT
 orderid,
 category, 
 sum(total) OVER(
  PARTITION BY category
  ORDER BY rowtime()
  RANGE BETWEEN 30 SECONDS PRECEDING AND CURRENT ROW) AS sum_total
FROM orders;

The query below uses a rows interval to define an OVER aggregation. This rows’ interval defines that current record and all records, with the same category as the current’s, among 10 preceding records should be included in the aggregate computation. Remember that upper boundary in the aggregate window is inclusive for an OVER aggregation; hence, this query considers 11 records, including the current record and 10 preceding ones, when applying PARTITION BY and computing the aggregate value.

SELECT
 orderid,
 category,
 sum(total) OVER(
   PARTITION BY category
   ORDER BY rowtime()
   ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) AS sum_total
FROM orders;

Examples

CREATE STREAM pageviews (
  viewtime BIGINT,
  userid VARCHAR,
  pageid VARCHAR
) WITH (
  'topic' = 'pageviews'
);
CREATE CHANGELOG "Users" (
  "RegisterTime" BIGINT, 
  userid VARCHAR, 
  regionid VARCHAR, 
  interests ARRAY < VARCHAR >,
  contactinfo STRUCT<phone VARCHAR, city VARCHAR, "State" VARCHAR, zipcode VARCHAR>,
  PRIMARY KEY(userid)
) WITH (
  'topic' = 'users',
  'timestamp' = 'RegisterTime'
);

Select all

The following query selects all the columns from the Users Changelog.

SELECT * FROM "Users";

Select DISTINCT

The following query selects DISTINCT rows from the "Users" Changelog according to the values for userid, interests[1] and contactinfo->city in each input row. This means there is no duplicate rows in the results of query.

SELECT DISTINCT 
  userid,
  interests[1] AS hobby,
  contactinfo->city AS location
FROM "Users";

Select from complex types

The following is a simple projection query showing how to access data from more complex types. For arrays, you can access a particular field by indexing into the field with brackets syntax. For structs, you can use a similar bracket syntax or use arrows to access the inner data.

SELECT
  "RegisterTime",
  userid,
  interests,
  interests[1] AS top_interest,
  contactinfo,
  contactinfo->city AS location,
  contactinfo['city'] AS location2,
  contactinfo->"State" AS "state"
FROM "Users";

Select all with source properties

The following selects all the columns from the pageviews Stream. It also sets source properties to treat the viewtime column as the value for the timestamp, to start reading from the beginning of the source’s underlying Kafka topic, and to skip any records that fail deserialization.

SELECT
  * 
FROM pageviews
WITH (
  'timestamp' = 'viewtime',
  'starting.position' = 'earliest',
  'source.deserialization.error.handling' = 'IGNORE'
);

Select all from specific Kafka offsets

The following selects all the columns from the pageviews Stream. It also sets source properties to treat the viewtime column as the value for the timestamp and to start reading from specific offsets in the source’s underlying Kafka topic. The offsets to start reading from are 1 in partition 0 and 3 in partition 1.

SELECT
  * 
FROM pageviews 
WITH (
    'timestamp' = 'viewtime',
    'starting.position' = 'specific-offsets',
    'starting.position.offsets' = 'partition:0,offset:1;partition:1:offset:3'
);

Aggregation with GROUP BY and HAVING clauses

The following aggregates the number of visits a page receives from a particular userid, then only emitting results having a count greater than 1.

SELECT 
  userid, 
  COUNT(pageid) AS pgcnt 
FROM pageviews 
GROUP BY userid 
HAVING COUNT(pageid) > 1;

Aggregation with state TTL query property

The following aggregates the number of visits a page receives from a particular userid. The query property state.ttl.millis is also configured to 10000, or 10 seconds. This means that state that has not been updated in the last 10 seconds is considered stale and can be cleaned up. Note that when state is not necessarily cleaned up immediately after it is considered stale.

SELECT 
  userid, 
  COUNT(pageid) AS pgcnt 
FROM pageviews 
WITH ('timestamp'='viewtime')
GROUP BY userid 
QUERY WITH ('state.ttl.millis' = '10000');

Given this input for pageviews:

+----------+--------+---------+
| viewtime | userid | pageid  |
+----------+--------+---------+
|     1000 | User_1 | Page_21 |
|     2000 | User_2 | Page_10 |
|     3000 | User_1 | Page_10 |
|    10000 | User_1 | Page_91 |
|    20000 | User_1 | Page_10 |
|    80000 | User_2 | Page_55 |
+----------+--------+---------+

We can get the following output:

+--------+-------+
| userid | pgcnt |
+--------+-------+
| User_1 |   1   |
| User_2 |   1   |
| User_1 |   2   |
| User_1 |   3   |
| User_1 |   4   |
| User_2 |   1   | // pgcnt=2 is also possible for this output
+--------+-------+

The pgcnt associated with the User_2 outputs are both 1 in this case because of the state.ttl.millis configuration of 10000. Since there is a gap of more than 10000 milliseconds between the first and second User_2 input records according to the viewtime which is marked as the event time for the records, the state for User_2 will be marked stale by the time the query consumes the second User_2 record. Thus, the state for User_2 may be cleaned up. It's also possible that the the second User_2 output will have a pgcnt of 2, as the state may be stale but not yet cleaned up before the second User_2 record is consumed by the query. The pgcnt for User_1 outputs are strictly increasing because its state is never marked stale.

Optionally, this clause allows a grouping of records according to their values for a given set of columns to compute one or more aggregations. The GROUP BY clause in the statement defines a list of one or more columns (separated by commas) as grouping columns. Aggregate expressions are specified among SELECT expressions. Currently, GROUP BY and aggregation is supported on Streams. The result of a group aggregation is a whose PRIMARY KEY is the list of GROUP BY columns.

This property sets the minimum amount of time before a query’s state is considered stale and can be expired. By default, this value is infinite, meaning that state is never considered stale and won’t be cleaned up. This property is particularly relevant for queries, including windowed joins/aggregations, because the amount of state in these queries can grow unboundedly and eventually fail due to resource exhaustion. See for more details.

For the following examples, assume a pageviews and a users has been created by the following DDLs:

the example below
Stream
Changelog
Materialized View
Changelog
Stream
Changelog