BEGIN APPLICATION application_name


By using an APPLICATION, you can create a set of DSQL statements (including DDL and Query statements) and run them as a unit of work with all-or-nothing effect:

  • In case of DDL statements, either all of them succeed or none depending on whether there is a failure in processing statements. This means the metastore will be updated, only if all statements in the Application succeed.

  • In case of Query statements, all queries will be run in a single job. This helps achieving better efficiency and resource utilization at runtime.

The order of statements in an application matters.

Supported Statements

Currently, below statement types are supported in applications:

Benefits of Application

Applications help users achieve better efficiency and overall cost reduction in two ways:

  1. Application enhances the data retrieval process from a Store by allowing users to execute multiple queries through a single job. This optimization let DeltaStream to read data from a given source Topic only once, when multiple queries within an application refer to the same relation in their FROM clause. The retrieved data is shared across all these queries. This could lead to a significant reduction in storage and network overhead on the source Store. When the same set of queries is run separately, each query access the source Store separately, leading to reading same records from the source Topic multiple times, once per job.

  2. A #_stream or a #_changelog can be defined as a virtual relation in an Application, using a CAS statement. A virtual relation is similar to non-virtual one, except it is not backed by a topic in a persistent Store. Virtual relations are used to create intermediate results which can be used in multiple queries in the same Application. Given that records of a virtual Stream or Changelog is not written to a Store, they help simplifying the processing logic and reuse already computed results without adding an extra cost. See virtual relations below for more details.

What Happens during an Application Failure

If a failure happens when processing one of the DDL or Query statements (if any) in an application (for example, due to a parse error), then the whole application fails. This means none of the entities will be created and no query will start.

If a runtime failure happens when the application job is running (for example, due to an authentication error to access a store), then all queries in the application will fail; However all the relation(s) which were created at the beginning of application via DDL statements (if any) will remain.

Virtual Relation

A virtual #_stream or a #_changelog is similar to a non-virtual one, except that it is not backed by a Topic in a Store. This means records of a virtual relation are not written into a persistent storage layer. A virtual relation can only be created in an Application and is only available to use in queries within the same Application. A virtual relation does not belong to any Database and #_schema defined by the user in DeltaStream. Every Application has a reserved Database named virtual and a Schema under it, named public which is managed by DeltaStream and is used to store virtual relations' definitions in that Application. User needs to always use a fully qualified name when defining a new virtual relation or referring to an existing one in a query. For example, if late_orders is the name of a virtual relation, its definition (in CAS) an usage (in a Query) should always refer to it as virtual.public.late_orders

Here is the syntax to define a virtual relation:

BEGIN APPLICATION application_name
CREATE VIRTUAL STREAM virtual.public.stream_name 
AS select_statement;
CREATE VIRTUAL CHANGELOG virtual.public.changelog_name 
AS select_statement;

Virtual relations are for defining intermediate computation results which are used in one or more subsequent queries in the same Application. They help simplify the computation logic and do some common computation among queries only once and use it multiple times, at no extra cost.


For the following examples, assume a Stream named pageviews and has been created using the below CREATE STREAM statement:

CREATE STREAM pageviews (
  viewtime BIGINT,
  userid VARCHAR,
  pageid VARCHAR
) WITH (
  'topic' = 'pageviews',
  'value.format' = 'json'

Application with multiple statements

In the example below, two new relations are created: a #_stream named pageviews_2 and a #_changelog named view_log. Moreover, two queries start and both run in a single job. One query ingests data into pageviews_2 and the other into view_log. Note that since both queries are reading data from the same source relation (i.e. pageviews), the Application's job reads pageviews records once from its Topic and use that for both queries. This improves the resource utilization and reduces the storage and network overhead on the Store that pageviews is defined on.

  CREATE STREAM pageviews_2 AS 
    SELECT * FROM pageviews WHERE userid = 'User_2';
    SELECT pageid, count(userid) AS cnt
    FROM pageviews
    GROUP BY pageid;

The example Application below has 5 statements. New streams are defined and some are used in other queries in the same Application. For example, the INSERT INTO statement is reading data from pv_copy, which is defined using a CREATE STREAM AS SELECT statement and writes into pageviews2 which is defined with another CSAS in the same Application. Note that the order of Statements matter in Application. Therefore, statements defining pv_copy and pageviews2 should appear before the INSERT INTO statement, that is using them, in the Application body. Moreover, the users Stream is defined via a CREATE STREAM statement and is used in the JOIN query and last CSAS statement. Again, data records are read once from pageviews and users Topics and are used in the queries that are referring to them in their FROM clause. All the queries in the Application run within a single job.

  CREATE STREAM "users" (
    registertime BIGINT,
    userid VARCHAR,
    regionid VARCHAR,
    gender VARCHAR,
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>)
    WITH ('topic'='users', 'value.format'='json');

    SELECT * FROM pageviews;

  CREATE STREAM pageviews2 AS 
    SELECT userid, pageid FROM pageviews WHERE userid = 'User_2' OR userid = 'User_3';

  INSERT INTO pageviews2 SELECT userid, pageid FROM pv_copy;

  SELECT p.userid AS pv_uid, u.gender, p.pageid, u.interests[1] AS top_interest  
  FROM pageviews p JOIN "users" u 
  WITHIN 10 seconds ON p.userid = u.userid;

  CREATE STREAM ca_users AS 
  SELECT userid, contactinfo->city, interests[2] AS hobby 
  FROM "users" WHERE contactinfo->state = 'CA';

Application with virtual relations

In the example Application below, we assume the pageviews Stream is already defined in DeltaStream and we have a topic, named users in our Store. In this Application, we first create a Changelog on the users topic to track changes in users' information. Given that we are interested in pageviews done by 3 specific users, we can create a virtual Stream: virtual.public.v1 to filter records for those users from pageviews and use them in subsequent queries. We first find pageviews done by these 3 users every 30 seconds using a Tumble window function on virtual.public.v1 and write the results into a new topic visit_freq. Then we create a new virtual Stream: virtual.public.v2 using a temporal join between virtual.public.v1 and users_log Changelog to extend each record with its user's information. We then use virtual.public.v2 records in two subsequent queries: One to find the popular_pages visited by female users among our users of interest and a second query to find pages visited by different users in a given duration and enrich the results by latest information about the visiting users.

Note that although we have 5 CAS statements in this Application, only 3 new topics will be created in the configured Store: visit_freq, popular_pages and cross_region_pages . Virtual relations' records are not persisted in any topic and are only fed to subsequent queries which are referring them.


    (registertime BIGINT, userid VARCHAR, gender VARCHAR, interests ARRAY<VARCHAR>, contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>, PRIMARY KEY(userid))
    WITH ('topic'='users', 'key.format'='json', 'key.type'='STRUCT<userid VARCHAR>', 'value.format'='json');

 CREATE VIRTUAL STREAM virtual.public.v1 AS 
   SELECT * FROM pageviews
   WHERE userid = 'User_1' OR userid = 'User_2' OR userid = 'User_3';
 CREATE CHANGELOG visit_freq WITH ('topic.partitions' = 1, 'topic.replicas' = 1) AS
  SELECT window_start, window_end, userid, count(pageid) AS cnt
  FROM TUMBLE(virtual.public.v1, SIZE 30 SECONDS)
  GROUP BY window_start, window_end, userid;

 CREATE VIRTUAL STREAM virtual.public.v2 AS
  SELECT v.userid, v.pageid, u.gender, u.interests[1] AS hobby, u.contactinfo->zipcode AS zipcode
   FROM virtual.public.v1 v JOIN users_log u
   ON u.userid = v.userid;

 CREATE CHANGELOG popular_pages WITH ('topic.partitions' = 1, 'topic.replicas' = 1) AS
  SELECT pageid, count(DISTINCT userid) AS cnt
  FROM virtual.public.v2
  WHERE gender = 'FEMALE'
  GROUP BY pageid
  HAVING count(DISTINCT userid) > 2;

  CREATE STREAM cross_region_pages WITH ('topic.partitions' = 1, 'topic.replicas' = 1) AS
   SELECT l.pageid,
    l.zipcode AS zipcode_1, l.hobby AS hobby_1,
    r.zipcode AS zipcode_2, r.hobby AS hobby_2
   FROM virtual.public.v2 l JOIN virtual.public.v2 r
   WITHIN 5 SECONDS ON l.pageid = r.pageid
   WHERE (l.userid <> r.userid);


Last updated