DeltaStream provides a Command Line Interface (CLI) you can use to interact with the platform from the terminal. This guide walks you through the steps to build a streaming application with DeltaStream’s CLI. Finish these steps to:
Get hands-on experience with foundational concepts in DeltaStream
Gain the knowledge to build applications similar to the one in this guide
While this guide uses topics in Apache Kafka, the steps should be the same regardless of whether you have data in other streaming stores such as Amazon Kinesis or Redpanda.
Note If you prefer to use DeltaStream’s web application, see Starting with the Web App for those details and procedures.
We assume you already have created your account and signed into DeltaStream. Also, we assume you already have created a DeltaStream organization or have joined an existing organization.
You accomplish the following steps in this guide for the CLI:
Download the DeltaStream CLI.
Connect to your streaming store (in this case, Apache Kafka) by creating a store in DeltaStream.
Create your first database.
Create streams and changelogs for your Kafka topics.
Create new streams, changelogs, and materialized views using DeltaStream’s continuous queries.
(Optional) Share your streaming data with other users in your organization.
Download the DeltaStream CLI
Log into DeltaStream.
At the bottom of the lefthand navigation, click Support Center ( ). The Support menu displays.
Click the OS you want. The DeltaStream CLI begins downloading automatically into a dscli folder.
Unzip and deploy the files as you would with any compressed application.
Create a Store
The first step is to create a store in DeltaStream. A store is a streaming store service such as Apache Kafka or Amazon Kinesis where your streaming data resides.
Use the CREATE STORE statement to create a store in the DeltaStream CLI:
<no-db>/<no-store># CREATE STORE MSK WITH ( 'type'=KAFKA,'access_region'="AWS us-east-1",'kafka.sasl.hash_function'=SHA512,'kafka.sasl.password'='**********','kafka.sasl.username'='mskconsumer','uris'='b-1-public.cmnuseast1datagenmsk.89yho3.c26.kafka.us-east-1.amazonaws.com:9196');<no-db>/msk#SHOWSTORES;Name|Kind|AccessRegion|Metadata|Owner|Createdat|Updatedat-------+-------+---------------+----------+----------+----------------------+-----------------------msk|Kafka|AWSus-east-1| {} |sysadmin|2023-02-20T11:03:18Z|2023-02-20T11:03:18Z<no-db>/msk#
Your new store displays as the default store in the prompt. Use the LIST STORES command to view the available stores you have created in DeltaStream.
Note No default store indicates in the prompt until you create your first store.
As is indicated below, in declaring the store, DeltaStream provides the required configurations to connect and use the streaming store from DeltaStream.
You can now inspect the store and print to view the content of its topics:
In DeltaStream, you use databases to organize your streaming data in an intuitive namespace. Use the CREATE DATABASE statement to create a database. When you create a database, DeltaStream in turn creates a default schema, named public, in the database. The following shows the statement that creates a new database labelled TestDB:
Note that the prompt shows the current database and schema.
Create Streams and Changelogs
Now create relations on top of your Kafka topics. You do this using DeltaStream’s DDL statements. To manage streaming data in an entity as an append-only stream, in which each streaming event is independent, define it as a stream. In the example below, you declare a stream on the ds_pageviews topic, as each pageview event is an independent event:
testdb.public/msk#CREATESTREAMpageviews (viewtimeBIGINT,useridVARCHAR,pageidVARCHAR) WITH ('topic'='ds_pageviews','value.format'='JSON' );testdb.public/msk#SHOWRELATIONS;Name|Type|Owner|Createdat|Updatedat------------+--------+----------+----------------------+-----------------------pageviews|Stream|sysadmin|2023-02-20T11:21:58Z|2023-02-20T11:21:58Z
Next, declare a changelog for the ds_users topic. A changelog indicates that you wish to interpret events in an entity as UPSERT events. The events should have a primary key, and each event is interpreted as an insert or update for the given primary key. Use the CREATE CHANGELOG command to declare the users changelog:
testdb.public/msk#CREATECHANGELOGusers(registertimeBIGINT,useridVARCHAR,regionidVARCHAR,genderVARCHAR,interestsARRAY<VARCHAR>,contactinfoSTRUCT<phoneVARCHAR,cityVARCHAR,"state"VARCHAR,zipcodeVARCHAR>,PRIMARYKEY(userid)) WITH ('topic'='ds_users','key.format'='json','key.type'='STRUCT<userid VARCHAR>','value.format'='json');testdb.public/msk#SHOWRELATIONS;Name|Type|Owner|Createdat|Updatedat------------+-----------+----------+----------------------+-----------------------pageviews|Stream|sysadmin|2023-02-20T11:21:58Z|2023-02-20T11:21:58Zusers|Changelog|sysadmin|2023-02-20T11:29:58Z|2023-02-20T11:29:58Z
Run Queries
Now that you have declared streams and changelogs, you can write continuous queries in SQL to process this streaming data in real time. Start with a Streaming or Continuous Query, wherein the query results stream back to you. You can use such queries to inspect your streams and changelogs, or to build queries iteratively by inspecting the query’s result.
Here's an example: Use the following interactive query to inspect the pageviews stream:
While interactive query results display, DeltaStream provides a Streaming or Continuous Query, wherein the query results are saved back in a store or a materialized view. To proceed, write a persistent query that joins the pageviews stream with the changelog to create an enriched pageviews stream that includes user details for each pageview event. While you're at it, also convert the epoch time to a timestamp with a time zone via the TO_TIMESTAMP_LTZ function.
CREATE STREAM csas_enriched_pv ASSELECT TO_TIMESTAMP_LTZ(viewtime, 3) AS viewtime, p.userid AS userid, pageid, TO_TIMESTAMP_LTZ(registertime, 3) AS registertime, regionid, gender, interests, contactinfoFROM pageviews pJOIN"users" u ON u.userid = p.userid;
During this time, DeltaStream compiles and launches the query as an Apache Flink streaming job.
You should be able to use the LIST QUERIES command to view the query along with its status:
testdb.public/msk#testdb.public/msk#SHOWQUERIES;ID|State|DSQL|Owner|Createdat|Updatedat---------------------------------------+---------+--------------------------------------------------------------+----------+----------------------+-----------------------a913595a-ad09-452e-81e1-3f440b56fae2|RUNNING|CREATESTREAM|sysadmin|2023-02-20T11:42:53Z|2023-02-20T11:42:53Z||csas_enriched_pvASSELECT|||||TO_TIMESTAMP_LTZ(viewtime,|||||3) AS viewtime, p.userid |||||ASuserid,pageid,|||||TO_TIMESTAMP_LTZ(registertime,|||||3) AS registertime, regionid, |||||gender,interests,contactinfo|||||FROMpageviewspJOINusersu|||||ONu.userid=p.userid; |||testdb.public/msk#
When the query runs successfully, you have a new Kafka topic named csas_enriched_pv in your Kafka cluster, plus a new stream added to the streams in your TestDB database. Run the following query to examine the contents of the new stream:
Now that you have the enriched pageviews stream, you can build a materialized view in which you compute the number of pageviews per user. To create this materialized view, type the following statement:
CREATE MATERIALIZED VIEW user_view_count ASSELECT userid, COUNT(*) AS view_count FROM csas_enriched_pv GROUP BY userid;
When you run the above query, DeltaStream launches a streaming job that runs the SELECT statement and materializes the result of the query. Use the LIST QUERIES command to view this query:
testdb.public/msk#testdb.public/msk#SHOWQUERIES;ID|State|DSQL|Owner|Createdat|Updatedat---------------------------------------+---------+--------------------------------------------------------------+----------+----------------------+-----------------------cf6d2092-ff25-461f-801d-5fbb0d5ceb58|RUNNING|CREATEMATERIALIZEDVIEW|sysadmin|2023-02-20T12:56:53Z|2023-02-20T12:56:53Z||user_view_countASSELECT|||||userid,COUNT(*) ASview_count|||||FROMcsas_enriched_pvGROUPBY|||||userid; |||a913595a-ad09-452e-81e1-3f440b56fae2|RUNNING|CREATESTREAM|sysadmin|2023-02-20T11:42:53Z|2023-02-20T11:42:53Z||csas_enriched_pvASSELECT|||||TO_TIMESTAMP_LTZ(viewtime,|||||3) AS viewtime, p.userid |||||ASuserid,pageid,|||||TO_TIMESTAMP_LTZ(registertime,|||||3) AS registertime, regionid, |||||gender,interests,contactinfo|||||FROMpageviewspJOINusersu|||||ONu.userid=p.userid; |||testdb.public/msk#
You can query the resulting materialized view the same way you'd query a materialized view in traditional relational databases -- except that in DeltaStream, the streaming job always keeps the data in the materialized view fresh.
Below is a simple query to get the current view count for a user with the userid of User_2:
As you see, the number of pageviews for User_2 is 580 at the time you run the above query. Run the query again, and you see an updated result for the pageview count for the user. This demonstrates that every time you run a query on a materialized view, you receive the most up-to-date result. DeltaStream ensures the data in the view is continuously updated using the continuous query that declared the materialized view.
Now wait a few more seconds. Then run the same query on the materialized view. You should see something similar to the below:
As you see, the result is updated to 818 from the previous value of 580.
In summary, this guide has demonstrated how DeltaStream makes it easy to build a stream processing applications using the CLI tool. You've created a query that joins pageviews and users and creates a new stream called csas_enriched_pv. You also ran another query that creates a materialized view named user_view_count from csas_enriched_pv.
Clean Up
Now it's time to clean up your environment. To do this:
Terminate the queries.
Drop the created streams, changelogs, and materialized views.
Go to the corresponding database and schema to drop the streams, changelogs, and materialized views.
Note If there is a query that uses a stream, changelog, or materialized view, terminate the query before you drop the relation.