LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Region
      • SQL
      • Store
      • Database
      • Query
      • Visualizing Data Lineage
      • Function
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • Tutorials
    • Managing Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Creating Stores for Streaming Data
    • Using Multiple Stores in Queries
    • Creating Relations to Structure Raw Data
    • Namespacing with Database and Schema
    • Creating and Querying Materialized Views
    • Creating a Function
    • Securing Your Connections to Data Stores
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Integrations
      • Connecting to Confluent Cloud
      • Databricks
      • PostgreSQL
      • Snowflake
      • WarpStream
    • Serialization
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Reference
    • Enterprise Security Integrations
      • Okta SAML Integration
      • Okta SCIM Integration
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST REGIONS
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • Rest API
Powered by GitBook
On this page
  • Syntax
  • Description
  • Supported Statements
  • Benefits of Application
  • What Happens During an Application Failure
  • Virtual Relation
  • Examples
  1. Reference
  2. SQL Syntax
  3. Query

APPLICATION

Syntax

BEGIN APPLICATION application_name
    statement;
    statement;
    ...
    statement;
END APPLICATION;

Description

By using APPLICATION, you can create a set of DSQL statements (including DDL and Query statements) and run them as a unit of work with all-or-nothing effect:

  • In the case of DDL statements, either all of them succeed or none do, depending on whether there is a failure in processing statements. This means the metastore is updated only if all statements in the application succeed.

  • In the case of Query statements, all queries run in a single job. This helps achieve better efficiency and resource utilization at runtime.

Note The order of statements in an application matters.

Supported Statements

DeltaStream supports the below statement types in application:

  • USE (USE database, USE schema, USE store)

  • CREATE STREAM

  • CREATE CHANGELOG

  • CREATE STREAM AS SELECT (virtual and non-virtual)

  • CREATE CHANGELOG AS SELECT (virtual and non-virtual)

  • INSERT INTO

Benefits of Application

Applications help you achieve better efficiency and overall cost reduction in two ways:

What Happens During an Application Failure

If a failure occurs when processing a DDL or Query statement in an application (for example, due to a parse error), then the whole application fails. This means no entities are created and no query will start.

If a runtime failure occurs when the application job is running (for example, due to an authentication error to access a store), then all queries in the application fail. However, any relation(s) created at the beginning of application via DDL statements remain.

Virtual Relation

Here is the syntax to define a virtual relation:

BEGIN APPLICATION application_name
...
CREATE VIRTUAL STREAM virtual.public.stream_name 
AS select_statement;
...
CREATE VIRTUAL CHANGELOG virtual.public.changelog_name 
AS select_statement;
...
END APPLICATION;

Virtual relations are for defining intermediate computation results used in one or more subsequent queries in the same application. They help simplify the computation logic. They also perform some common computation among queries only once and use it multiple times, at no extra cost.

Examples

For the following examples, assume a stream named pageviews has been created using the below CREATE STREAM statement:

CREATE STREAM pageviews (
  viewtime BIGINT,
  userid VARCHAR,
  pageid VARCHAR
) WITH (
  'topic' = 'pageviews',
  'value.format' = 'json'
);

Application with multiple statements

In the example below, two new relations are created:

Two queries start and both run in a single job. One query ingests data into pageviews_2 and the other into view_log. Note that since both queries read data from the same source relation (such as pageviews), the application job reads pageviews records once from its entity and uses that for both queries. This improves the resource utilization. It also reduces the storage and network overhead for the store on which that pageviews is defined.

BEGIN APPLICATION app
  
  CREATE STREAM pageviews_2 AS 
    SELECT * FROM pageviews WHERE userid = 'User_2';
  
  CREATE CHANGELOG view_log AS
    SELECT pageid, count(userid) AS cnt
    FROM pageviews
    GROUP BY pageid;
    
END APPLICATION;

Application with multiple related statements

The example application below has 5 statements. New streams are defined, some of which are used in other queries in the same application. For example, the INSERT INTO statement reads data from pv_copy, which you define using a CREATE STREAM AS SELECT statement and write into pageviews2, which is in turn defined with another CSAS in the same application. (Note that the order of statements matter in application.) Therefore, statements defining pv_copy and pageviews2 should appear before the INSERT INTO statement that uses them, in the application body. Moreover, the users stream is defined via a CREATE STREAM statement and is used in the JOIN query and last CSAS statement. Again, data records are read once from pageviews and users entities and are used in the queries that are referring to them in their FROM clause.

All the queries in the application run within a single job.

BEGIN APPLICATION app
  
  CREATE STREAM "users" (
    registertime BIGINT,
    userid VARCHAR,
    regionid VARCHAR,
    gender VARCHAR,
    interests ARRAY<VARCHAR>,
    contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>)
    WITH ('topic'='users', 'value.format'='json');

  CREATE STREAM pv_copy AS 
    SELECT * FROM pageviews;

  CREATE STREAM pageviews2 AS 
    SELECT userid, pageid FROM pageviews WHERE userid = 'User_2' OR userid = 'User_3';

  INSERT INTO pageviews2 SELECT userid, pageid FROM pv_copy;

  CREATE STREAM pvusers AS 
  SELECT p.userid AS pv_uid, u.gender, p.pageid, u.interests[1] AS top_interest  
  FROM pageviews p JOIN "users" u 
  WITHIN 10 seconds ON p.userid = u.userid;

  CREATE STREAM ca_users AS 
  SELECT userid, contactinfo->city, interests[2] AS hobby 
  FROM "users" WHERE contactinfo->state = 'CA';
    
END APPLICATION;

Application with virtual relations

In the example application below, we assume the pageviews stream is already defined in DeltaStream and we have a topic named users in our store. In this application, we first create a changelog on the users topic to track changes in users' information. Given that we are interested in pageviews done by 3 specific users, we can create a virtual stream: virtual.public.v1 to filter records for those users from pageviews and use them in subsequent queries.

We use a Tumble window function on virtual.public.v1first to find pageviews done by these 3 users every 30 seconds. We write the results into a new topic visit_freq. Next we create a new virtual stream virtual.public.v2 using a temporal join between virtual.public.v1 and users_log changelog to extend each record with its user's information. We then use virtual.public.v2 records in two subsequent queries:

  • One query to find the popular_pages visited by female users among our users of interest.

  • A second query to find pages visited by different users in a given duration and then enrich the results by latest information about the visiting users.

Note that although we have 5 CSAS statements in this application, only 3 new topics are created in the configured store:

  1. visit_freq

  2. popular_pages

  3. cross_region_pages

Virtual relations' records do not persist in any topic and are only fed to subsequent queries that refer them.

BEGIN APPLICATION app

 CREATE CHANGELOG users_log
    (registertime BIGINT, userid VARCHAR, gender VARCHAR, interests ARRAY<VARCHAR>, contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>, PRIMARY KEY(userid))
    WITH ('topic'='users', 'key.format'='json', 'key.type'='STRUCT<userid VARCHAR>', 'value.format'='json');

 CREATE VIRTUAL STREAM virtual.public.v1 AS 
   SELECT * FROM pageviews
   WHERE userid = 'User_1' OR userid = 'User_2' OR userid = 'User_3';
 
 CREATE CHANGELOG visit_freq WITH ('topic.partitions' = 1, 'topic.replicas' = 1) AS
  SELECT window_start, window_end, userid, count(pageid) AS cnt
  FROM TUMBLE(virtual.public.v1, SIZE 30 SECONDS)
  GROUP BY window_start, window_end, userid;

 CREATE VIRTUAL STREAM virtual.public.v2 AS
  SELECT v.userid, v.pageid, u.gender, u.interests[1] AS hobby, u.contactinfo->zipcode AS zipcode
   FROM virtual.public.v1 v JOIN users_log u
   ON u.userid = v.userid;

 CREATE CHANGELOG popular_pages WITH ('topic.partitions' = 1, 'topic.replicas' = 1) AS
  SELECT pageid, count(DISTINCT userid) AS cnt
  FROM virtual.public.v2
  WHERE gender = 'FEMALE'
  GROUP BY pageid
  HAVING count(DISTINCT userid) > 2;

  CREATE STREAM cross_region_pages WITH ('topic.partitions' = 1, 'topic.replicas' = 1) AS
   SELECT l.pageid,
    l.zipcode AS zipcode_1, l.hobby AS hobby_1,
    r.zipcode AS zipcode_2, r.hobby AS hobby_2
   FROM virtual.public.v2 l JOIN virtual.public.v2 r
   WITHIN 5 SECONDS ON l.pageid = r.pageid
   WHERE (l.userid <> r.userid);

END APPLICATION;
PreviousQueryNextChange Data Capture (CDC)

Last updated 5 months ago

Application enhances the data retrieval process from a Store. It enables you to execute multiple queries through a single job. In turn this optimization enables DeltaStream to read data from a given source only once, when multiple queries within an application refer to the same relation in their FROM clause. The retrieved data is shared across all these queries. This can significantly reduce storage and network overhead on the source store. When you run the same set of queries separately, each query accesses the source store separately. The system reads the same records from the source entity multiple times -- once per job.

You can define a or a as a virtual relation in an application. You use a CAS statement to do this. A virtual relation is similar to a non-virtual one, except it is not backed by a topic in a persistent store. Use virtual relations to create intermediate results that you can in turn use in multiple queries in the same application. Given that records of a virtual stream or changelog aren't written to a store, they simplify the processing logic and reuse already-computed results without adding an extra cost. See below for more details.

A virtual or a is similar to a non-virtual one, except that it is not backed by an in a Store. This means records of a virtual relation are not written into a persistent storage layer.

You can only create a virtual relation in an application, and it's available only to use in queries within the same application. A virtual relation does not belong to any Database and defined by the user in DeltaStream. Every application has a reserved database named virtual and a schema under it, named public, which DeltaStream manages and uses to store virtual relations' definitions in that application. You must always use a fully-qualified name when you define a new virtual relation or refer to an existing one in a query. For example, if late_orders is the name of a virtual relation, its definition (in CSAS) and usage (in a query) should always refer to it as virtual.public.late_orders.

a named pageviews_2

a named view_log

virtual relations
#entity
#entity
#_stream
#_changelog
#_stream
#_changelog
#_schema
#_stream
#_changelog