Starting with the CLI

DeltaStream provides a Command Line Interface (CLI) that you can use to interact with the platform from the terminal. In this guide, we will walk through the steps to build a streaming application with DeltaStream’s CLI. Please refer to Starting with the Web App if you are interested in using DeltaStream’s web application. Once you finish the steps in this guide, you will have hands-on experience with foundational concepts in DeltaStream and will be able to build similar applications. For this guide, we will be using topics in Apache Kafka, but the steps should be the same if you have your data in other streaming stores such as Amazon Kinesis or Redpanda.

We assume you already have created your account and signed into DeltaStream. Also, we assume you already have created an Organization or have joined an existing Organization.

You will accomplish the following steps in this guide for the CLI:

  • Connect to your streaming store (in this case, Apache Kafka) by creating a Store in DeltaStream.

  • Create your first Database.

  • Create Streams and Changelogs for your Kafka topics.

  • Create new Streams, Changelogs, and Materialized Views using DeltaStream’s continuous queries.

  • (Optional) Share your streaming data with other users in your Organization.

Create a Store

The first step in the guide is to create a Store in DeltaStream. A Store is a streaming store service such as Apache Kafka or Amazon Kinesis where your streaming data resides. You can create a Store in the DeltaStream CLI using the CREATE STORE statement:

<no-db>/<no-store># CREATE STORE MSK 
WITH ( 
 'type' = KAFKA , 
 'access_region' = "AWS us-east-1", 
 'kafka.sasl.hash_function' = SHA512, 
 'kafka.sasl.password' = '**********', 
 'kafka.sasl.username' = 'mskconsumer', 
 'uris'='b-1-public.cmnuseast1datagenmsk.89yho3.c26.kafka.us-east-1.amazonaws.com:9196'
);

<no-db>/msk# SHOW STORES;
  Name | Kind  | Access Region | Metadata |  Owner   |      Created at      |      Updated at       
-------+-------+---------------+----------+----------+----------------------+-----------------------
  msk  | Kafka | AWS us-east-1 | {}       | sysadmin | 2023-02-20T11:03:18Z | 2023-02-20T11:03:18Z  
<no-db>/msk#

Once you create the Store, it will be shown as the default Store in the prompt in the CLI. Note that before you create your first Store, there is no default Store indicated in the prompt. Using the LIST STORES command lets you see the available Stores that you have created in DeltaStream.

A DeltaStream Store is the metadata for your existing streaming store such as Apache Kafka or Amazon Kinesis. As you can see, in declaring the store, we provide required configurations to connect and use the streaming Store from DeltaStream. You can now inspect the Store and print the content of topics in the Store:

<no-db>/msk# SHOW ENTITIES;
      Entity name       
----------------------- 
  ds_pageviews         
  ds_pageviews_pb      
  ds_shipments         
  ds_users             
<no-db>/msk# PRINT ENTITY ds_pageviews;
{"userid":"User_9"} | {"viewtime":1676891565084,"userid":"User_9","pageid":"Page_28"}
{"userid":"User_7"} | {"viewtime":1676891565264,"userid":"User_7","pageid":"Page_16"}
{"userid":"User_7"} | {"viewtime":1676891565404,"userid":"User_7","pageid":"Page_84"}
{"userid":"User_4"} | {"viewtime":1676891565524,"userid":"User_4","pageid":"Page_17"}
{"userid":"User_3"} | {"viewtime":1676891565664,"userid":"User_3","pageid":"Page_47"}
{"userid":"User_5"} | {"viewtime":1676891565784,"userid":"User_5","pageid":"Page_72"}

Create a Database

Databases are used to organize your streaming data in an intuitive namespace. You can create a Database in the CLI using the CREATE DATABASE statement. Once you create a new Database, DeltaStream creates a default Schema, named public, in the database. The following shows the statement to create a new Database with the name TestDB:

<no-db>/msk# CREATE DATABASE TestDB;
testdb.public/msk# SHOW DATABASES;
   Name  | Default |  Owner   |      Created at      |      Updated at       
---------+---------+----------+----------------------+-----------------------
  testdb |        | sysadmin | 2023-02-20T11:18:46Z | 2023-02-20T11:18:46Z  
testdb.public/msk# SHOW SCHEMAS;
   Name  | Default |  Owner   |      Created at      |      Updated at       
---------+---------+----------+----------------------+-----------------------
  public |        | sysadmin | 2023-02-20T11:18:46Z | 2023-02-20T11:18:46Z  

Note that the CLI prompt shows the current Database and Schema.

Create Streams and Changelogs

We will now create Relations on top of our Kafka topics using DeltaStream’s DDL statements. If you want to treat your streaming data in an Entity as an append only Stream where each event is an independent event in your Stream, you define it as a Stream. In this guide, we declare a Stream on the ds_pageviews topic since each pageview event is an independent event:

testdb.public/msk# CREATE STREAM pageviews (
    viewtime BIGINT, 
    userid VARCHAR, 
    pageid VARCHAR
) WITH (
    'topic'='ds_pageviews', 
    'value.format'='JSON'
 );
testdb.public/msk# SHOW RELATIONS;
    Name    |  Type  |  Owner   |      Created at      |      Updated at       
------------+--------+----------+----------------------+-----------------------
  pageviews | Stream | sysadmin | 2023-02-20T11:21:58Z | 2023-02-20T11:21:58Z  

We then declare a Changelog for the ds_users topic. A Changelog indicates that we want to interpret events in an Entity as UPSERT events; therefore, the events should have a primary key, and each event will be interpreted as an insert or update for the given primary key. Use the CREATE CHANGELOG command to declare the users Changelog:

testdb.public/msk# CREATE CHANGELOG users(
    registertime BIGINT, 
    userid VARCHAR, 
    regionid VARCHAR, 
    gender VARCHAR, 
    interests ARRAY<VARCHAR>, 
    contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>, 
    PRIMARY KEY(userid)
) WITH (
    'topic'='ds_users', 
    'key.format'='json', 
    'key.type'='STRUCT<userid VARCHAR>', 
    'value.format'='json'
);

testdb.public/msk# SHOW RELATIONS;
    Name    |   Type    |  Owner   |      Created at      |      Updated at       
------------+-----------+----------+----------------------+-----------------------
  pageviews | Stream    | sysadmin | 2023-02-20T11:21:58Z | 2023-02-20T11:21:58Z  
  users     | Changelog | sysadmin | 2023-02-20T11:29:58Z | 2023-02-20T11:29:58Z  

Run Queries

Once you have declared Streams and Changelogs, you can write continuous queries in SQL to process this streaming data in real time. Let’s start with a Streaming or Continuous Query where the result of such queries are streamed back to the user. Such queries can be used to inspect your Streams and Changelogs or build queries iteratively by inspecting the query’s result. As an example, let’s inspect the pageviews stream using the following interactive query:

testdb.public/msk# SELECT * FROM pageviews;
^C to exit
Waiting for sandbox to be in running state 'defined'................
Waiting for interactive query('defined') to be in running state...
Interactive query is running
 | {"viewtime":1676893147888,"userid":"User_8","pageid":"Page_39"}
 | {"viewtime":1676893148008,"userid":"User_4","pageid":"Page_72"}
 | {"viewtime":1676893148148,"userid":"User_7","pageid":"Page_41"}
 | {"viewtime":1676893148268,"userid":"User_5","pageid":"Page_98"}
 | {"viewtime":1676893148243,"userid":"User_7","pageid":"Page_12"}
 | {"viewtime":1676893148369,"userid":"User_1","pageid":"Page_10"}
 | {"viewtime":1676893148423,"userid":"User_9","pageid":"Page_12"}
 | {"viewtime":1676893148488,"userid":"User_8","pageid":"Page_75"}

While interactive query results are streamed to the user, DeltaStream provides a Streaming or Continuous Query where the query results are stored back in a Store or a Materialized View. As the first step, let’s write a persistent query that joins the pageviews stream with the user’s Changelog to create an enriched pageviews Stream that includes user details for each pageview event. We will also convert the epoch time to a timestamp with a time zone using the TO_TIMESTAMP_LTZ function.

CREATE STREAM csas_enriched_pv AS 
SELECT 
    TO_TIMESTAMP_LTZ(viewtime, 3) AS viewtime,  
    p.userid AS userid, 
    pageid, 
    TO_TIMESTAMP_LTZ(registertime, 3) AS registertime, 
    regionid, 
    gender, 
    interests, 
    contactinfo
FROM pageviews p
    JOIN "users" u ON u.userid = p.userid;

Under the hood, DeltaStream will compile and launch the query as an Apache Flink streaming job. You should be able to see the query along with its status using the LIST QUERIES command in the CLI:

testdb.public/msk# 
testdb.public/msk# SHOW QUERIES;
                   ID                  |  State  |                             DSQL                             |  Owner   |      Created at      |      Updated at       
---------------------------------------+---------+--------------------------------------------------------------+----------+----------------------+-----------------------
  a913595a-ad09-452e-81e1-3f440b56fae2 | RUNNING | CREATE STREAM                                                | sysadmin | 2023-02-20T11:42:53Z | 2023-02-20T11:42:53Z  
                                       |         | csas_enriched_pv AS SELECT                                   |          |                      |                       
                                       |         | TO_TIMESTAMP_LTZ(viewtime,                                   |          |                      |                       
                                       |         | 3) AS viewtime, p.userid                                     |          |                      |                       
                                       |         | AS userid, pageid,                                           |          |                      |                       
                                       |         | TO_TIMESTAMP_LTZ(registertime,                               |          |                      |                       
                                       |         | 3) AS registertime, regionid,                                |          |                      |                       
                                       |         | gender, interests, contactinfo                               |          |                      |                       
                                       |         | FROM pageviews p JOIN users u                                |          |                      |                       
                                       |         | ON u.userid = p.userid;                                      |          |                      |                       
testdb.public/msk# 

Once the query successfully runs, you will have a new Kafka topic named csas_enriched_pv in your Kafka cluster, and a new Stream will also be added to the Streams in your Database, TestDB. You can examine the content of the new Stream by running the following query from the CLI:

testdb.public/msk# 
testdb.public/msk# SELECT * FROM csas_enriched_pv;
^C to exit
Interactive query is running
 | {"viewtime":"2023-02-20T11:47:05.717Z","userid":"User_6","pageid":"Page_99","registertime":"2023-02-20T11:47:05.676Z","regionid":"Region_6","gender":"FEMALE","interests":["News","Movies"],"contactinfo":{"phone":"9492229999","city":"Irvine","state":"CA","zipcode":"92617"}}
 | {"viewtime":"2023-02-20T11:47:05.856Z","userid":"User_5","pageid":"Page_77","registertime":"2023-02-20T11:46:44.951Z","regionid":"Region_4","gender":"OTHER","interests":["Game","Sport"],"contactinfo":{"phone":"6503889999","city":"Palo Alto","state":"CA","zipcode":"94301"}}
 | {"viewtime":"2023-02-20T11:47:06.056Z","userid":"User_9","pageid":"Page_24","registertime":"2023-02-20T11:46:47.991Z","regionid":"Region_9","gender":"OTHER","interests":["News","Travel"],"contactinfo":{"phone":"6503349999","city":"San Mateo","state":"CA","zipcode":"94403"}}
 | {"viewtime":"2023-02-20T11:47:06.196Z","userid":"User_4","pageid":"Page_45","registertime":"2023-02-20T11:46:36.748Z","regionid":"Region_5","gender":"FEMALE","interests":["Game","Sport"],"contactinfo":{"phone":"4083366881","city":"San Jose","state":"CA","zipcode":"95112"}}
 testdb.public/msk# 

Now that we have the enriched pageviews stream, let’s build a Materialized View where we compute the number of pageviews per user. Type the following statement in the CLI to create this Materialized View:

CREATE MATERIALIZED VIEW user_view_count AS 
SELECT
    userid, 
    COUNT(*) AS view_count 
FROM csas_enriched_pv 
GROUP BY userid;

Once you run the above query, DeltaStream will launch a streaming job that runs the SELECT statement and materializes the result of the query. You should be able to see this query in the CLI using the LIST QUERIES command:

testdb.public/msk# 
testdb.public/msk# SHOW QUERIES;
                   ID                  |  State  |                             DSQL                             |  Owner   |      Created at      |      Updated at       
---------------------------------------+---------+--------------------------------------------------------------+----------+----------------------+-----------------------
  cf6d2092-ff25-461f-801d-5fbb0d5ceb58 | RUNNING | CREATE MATERIALIZED VIEW                                     | sysadmin | 2023-02-20T12:56:53Z | 2023-02-20T12:56:53Z  
                                       |         | user_view_count AS SELECT                                    |          |                      |                       
                                       |         | userid, COUNT(*) AS view_count                               |          |                      |                       
                                       |         | FROM csas_enriched_pv GROUP BY                               |          |                      |                       
                                       |         | userid;                                                      |          |                      |                       
  a913595a-ad09-452e-81e1-3f440b56fae2 | RUNNING | CREATE STREAM                                                | sysadmin | 2023-02-20T11:42:53Z | 2023-02-20T11:42:53Z  
                                       |         | csas_enriched_pv AS SELECT                                   |          |                      |                       
                                       |         | TO_TIMESTAMP_LTZ(viewtime,                                   |          |                      |                       
                                       |         | 3) AS viewtime, p.userid                                     |          |                      |                       
                                       |         | AS userid, pageid,                                           |          |                      |                       
                                       |         | TO_TIMESTAMP_LTZ(registertime,                               |          |                      |                       
                                       |         | 3) AS registertime, regionid,                                |          |                      |                       
                                       |         | gender, interests, contactinfo                               |          |                      |                       
                                       |         | FROM pageviews p JOIN users u                                |          |                      |                       
                                       |         | ON u.userid = p.userid;                                      |          |                      |                       
testdb.public/msk# 

The resulting Materialized View then can be queried the same way you would query a Materialized View in traditional relational databases; however, in DeltaStream, the data in the Materialized View is always kept fresh by the streaming job. The following is a simple query to get the current view count for a user with the userid of User_2:

testdb.public/msk# 
testdb.public/msk# SELECT * FROM user_view_count WHERE userid = 'User_2';
  userid | view_count  
---------+-------------
  User_2 |        580  
testdb.public/msk# he

As you see, the number of pageviews for User_2 is 580 at the time of running the above query. Now run the query again—you should see an updated result for the pageview count for the user that indicates that every time you run a query on a Materialized View, you will get the most up-to-date result. DeltaStream will make sure the data in the view is continuously updated using the continuous query that declared the Materialized View. You should see something similar to the following by running the same query on the Materialized View a few seconds later:

testdb.public/msk# 
testdb.public/msk# SELECT * FROM user_view_count WHERE userid = 'User_2';
  userid | view_count  
---------+-------------
  User_2 |        580  
testdb.public/msk# 
testdb.public/msk# 
testdb.public/msk# SELECT * FROM user_view_count WHERE userid = 'User_2';
  userid | view_count  
---------+-------------
  User_2 |        818  
testdb.public/msk# 

As you see, the result is updated to 818 from the previous value of 580.

All in all in, we have show how DeltaStream makes it easy to build stream processing applications using our CLI tool. We have created a query that joins pageviews and users and creates a new Stream called csas_enriched_pv. We also have another query that creates a Materialized View named user_view_count from csas_enriched_pv.

Clean Up

Now that we have seen a basic use case, let’s clean up our environment. To do so, we first terminate the queries and then drop the created Streams, Changelogs, and Materialized Views. To terminate your queries, go to the Queries page. Then you can go to the corresponding Database and Schema to drop the Streams, Changelogs, and Materialized Views. Note that if there is a query that uses a Stream, Changelog, or Materialized View, you should terminate the query before dropping the Relation.

Last updated