APPLICATION

Syntax

BEGIN APPLICATION application_name
    statement;
    statement;
    ...
    statement;
END APPLICATION;

Description

By using APPLICATION, you can create a set of DSQL statements (including DDL and Query statements) and run them as a unit of work with all-or-nothing effect:

  • In the case of DDL statements, either all of them succeed or none do, depending on whether there is a failure in processing statements. This means the metastore is updated only if all statements in the application succeed.

  • In the case of Query statements, all queries run in a single job. This helps achieve better efficiency and resource utilization at runtime.

Note The order of statements in an application matters.

Supported Statements

DeltaStream supports the below statement types in application:

Benefits of Application

Applications help you achieve better efficiency and overall cost reduction in two ways:

  1. You can define a #_stream or a #_changelog as a virtual relation in an application. You use a CAS statement to do this. A virtual relation is similar to a non-virtual one, except it is not backed by a topic in a persistent store. Use virtual relations to create intermediate results that you can in turn use in multiple queries in the same application. Given that records of a virtual stream or changelog aren't written to a store, they simplify the processing logic and reuse already-computed results without adding an extra cost. See virtual relations below for more details.

What Happens During an Application Failure

If a failure occurs when processing a DDL or Query statement in an application (for example, due to a parse error), then the whole application fails. This means no entities are created and no query will start.

If a runtime failure occurs when the application job is running (for example, due to an authentication error to access a store), then all queries in the application fail. However, any relation(s) created at the beginning of application via DDL statements remain.

Virtual Relation

You can only create a virtual relation in an application, and it's available only to use in queries within the same application. A virtual relation does not belong to any Database and #_schema defined by the user in DeltaStream. Every application has a reserved database named virtual and a schema under it, named public, which DeltaStream manages and uses to store virtual relations' definitions in that application. You must always use a fully-qualified name when you define a new virtual relation or refer to an existing one in a query. For example, if late_orders is the name of a virtual relation, its definition (in CSAS) and usage (in a query) should always refer to it as virtual.public.late_orders.

Here is the syntax to define a virtual relation:

BEGIN APPLICATION application_name
...
CREATE VIRTUAL STREAM virtual.public.stream_name 
AS select_statement;
...
CREATE VIRTUAL CHANGELOG virtual.public.changelog_name 
AS select_statement;
...
END APPLICATION;

Virtual relations are for defining intermediate computation results used in one or more subsequent queries in the same application. They help simplify the computation logic. They also perform some common computation among queries only once and use it multiple times, at no extra cost.

Examples

For the following examples, assume a stream named pageviews has been created using the below CREATE STREAM statement:

CREATE STREAM pageviews (
  viewtime BIGINT,
  userid VARCHAR,
  pageid VARCHAR
) WITH (
  'topic' = 'pageviews',
  'value.format' = 'json'
);

Application with multiple statements

In the example below, two new relations are created:

  1. a #_stream named pageviews_2

  2. a #_changelog named view_log

Two queries start and both run in a single job. One query ingests data into pageviews_2 and the other into view_log. Note that since both queries read data from the same source relation (such as pageviews), the application job reads pageviews records once from its entity and uses that for both queries. This improves the resource utilization. It also reduces the storage and network overhead for the store on which that pageviews is defined.

BEGIN APPLICATION app
  
  CREATE STREAM pageviews_2 AS 
    SELECT * FROM pageviews WHERE userid = 'User_2';
  
  CREATE CHANGELOG view_log AS
    SELECT pageid, count(userid) AS cnt
    FROM pageviews
    GROUP BY pageid;
    
END APPLICATION;

The example application below has 5 statements. New streams are defined, some of which are used in other queries in the same application. For example, the INSERT INTO statement reads data from pv_copy, which you define using a CREATE STREAM AS SELECT statement and write into pageviews2, which is in turn defined with another CSAS in the same application. (Note that the order of statements matter in application.) Therefore, statements defining pv_copy and pageviews2 should appear before the INSERT INTO statement that uses them, in the application body. Moreover, the users stream is defined via a CREATE STREAM statement and is used in the JOIN query and last CSAS statement. Again, data records are read once from pageviews and users entities and are used in the queries that are referring to them in their FROM clause.

All the queries in the application run within a single job.

BEGIN APPLICATION app
  
  CREATE STREAM "users" (
    registertime BIGINT,
    userid VARCHAR,
    regionid VARCHAR,
    gender VARCHAR,
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>)
    WITH ('topic'='users', 'value.format'='json');

  CREATE STREAM pv_copy AS 
    SELECT * FROM pageviews;

  CREATE STREAM pageviews2 AS 
    SELECT userid, pageid FROM pageviews WHERE userid = 'User_2' OR userid = 'User_3';

  INSERT INTO pageviews2 SELECT userid, pageid FROM pv_copy;

  CREATE STREAM pvusers AS 
  SELECT p.userid AS pv_uid, u.gender, p.pageid, u.interests[1] AS top_interest  
  FROM pageviews p JOIN "users" u 
  WITHIN 10 seconds ON p.userid = u.userid;

  CREATE STREAM ca_users AS 
  SELECT userid, contactinfo->city, interests[2] AS hobby 
  FROM "users" WHERE contactinfo->state = 'CA';
    
END APPLICATION;

Application with virtual relations

In the example application below, we assume the pageviews stream is already defined in DeltaStream and we have a topic named users in our store. In this application, we first create a changelog on the users topic to track changes in users' information. Given that we are interested in pageviews done by 3 specific users, we can create a virtual stream: virtual.public.v1 to filter records for those users from pageviews and use them in subsequent queries.

We use a Tumble window function on virtual.public.v1first to find pageviews done by these 3 users every 30 seconds. We write the results into a new topic visit_freq. Next we create a new virtual stream virtual.public.v2 using a temporal join between virtual.public.v1 and users_log changelog to extend each record with its user's information. We then use virtual.public.v2 records in two subsequent queries:

  • One query to find the popular_pages visited by female users among our users of interest.

  • A second query to find pages visited by different users in a given duration and then enrich the results by latest information about the visiting users.

Note that although we have 5 CSAS statements in this application, only 3 new topics are created in the configured store:

  1. visit_freq

  2. popular_pages

  3. cross_region_pages

Virtual relations' records do not persist in any topic and are only fed to subsequent queries that refer them.

BEGIN APPLICATION app

 CREATE CHANGELOG users_log
    (registertime BIGINT, userid VARCHAR, gender VARCHAR, interests ARRAY<VARCHAR>, contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>, PRIMARY KEY(userid))
    WITH ('topic'='users', 'key.format'='json', 'key.type'='STRUCT<userid VARCHAR>', 'value.format'='json');

 CREATE VIRTUAL STREAM virtual.public.v1 AS 
   SELECT * FROM pageviews
   WHERE userid = 'User_1' OR userid = 'User_2' OR userid = 'User_3';
 
 CREATE CHANGELOG visit_freq WITH ('topic.partitions' = 1, 'topic.replicas' = 1) AS
  SELECT window_start, window_end, userid, count(pageid) AS cnt
  FROM TUMBLE(virtual.public.v1, SIZE 30 SECONDS)
  GROUP BY window_start, window_end, userid;

 CREATE VIRTUAL STREAM virtual.public.v2 AS
  SELECT v.userid, v.pageid, u.gender, u.interests[1] AS hobby, u.contactinfo->zipcode AS zipcode
   FROM virtual.public.v1 v JOIN users_log u
   ON u.userid = v.userid;

 CREATE CHANGELOG popular_pages WITH ('topic.partitions' = 1, 'topic.replicas' = 1) AS
  SELECT pageid, count(DISTINCT userid) AS cnt
  FROM virtual.public.v2
  WHERE gender = 'FEMALE'
  GROUP BY pageid
  HAVING count(DISTINCT userid) > 2;

  CREATE STREAM cross_region_pages WITH ('topic.partitions' = 1, 'topic.replicas' = 1) AS
   SELECT l.pageid,
    l.zipcode AS zipcode_1, l.hobby AS hobby_1,
    r.zipcode AS zipcode_2, r.hobby AS hobby_2
   FROM virtual.public.v2 l JOIN virtual.public.v2 r
   WITHIN 5 SECONDS ON l.pageid = r.pageid
   WHERE (l.userid <> r.userid);

END APPLICATION;

Last updated