LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Compute Pools
      • Data Store
      • Database
      • Function
      • Query
      • SQL
      • Visualizing Data Lineage
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • How do I...?
    • Create and Manage Data Stores
      • Create Data Stores for Streaming Data
      • Explore Data Store and Topic Details
      • Use Multiple Data Stores in Queries
    • Manage Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Create DeltaStream Objects to Structure Raw Data
    • Use Namespacing for Organizing Data
    • Create and Query Materialized Views
    • Create a Compute Pool to Work with Iceberg
    • Create a Function
    • Secure my Connection to a Data Store
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Serialize my Data
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Integrations
    • Setting up Data Store Integrations
      • AWS S3
      • ClickHouse
      • Confluent Cloud
      • Databricks
      • Iceberg REST Catalog
      • PostgreSQL
      • Snowflake
      • WarpStream
  • Setting up Enterprise Security Integrations
    • Okta SAML Integration
    • Okta SCIM Integration
  • use cases
    • Using an AWS S3 Store as a Source to Feed an MSK Topic
  • Reference
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST COMPUTE_POOLS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
        • START COMPUTE_POOL
        • STOP COMPUTE_POOL
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE COMPUTE_POOL
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP COMPUTE_POOL
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • START/STOP COMPUTE_POOL
        • UPDATE COMPUTE_POOL
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • DeltaStream OpenAPI
      • Deltastream
      • Models
Powered by GitBook
On this page
  • Download the DeltaStream CLI
  • Create a Data Store
  • Create a Database
  • Create Streams and Changelogs
  • Run Queries
  • Clean Up
  1. Getting Started

Starting with the CLI

How to get started using the DeltaStream command line interface

PreviousStarting with the Web AppNextCreate and Manage Data Stores

Last updated 5 days ago

DeltaStream provides a Command Line Interface (CLI) you can use to interact with the platform from a terminal. This guide walks you through the steps to build a streaming application with DeltaStream’s CLI. Follow these steps to:

  • Get hands-on experience with foundational concepts in DeltaStream.

  • Gain the knowledge to build applications similar to the one in this guide.

While this guide uses topics in Apache Kafka, the steps should be the same regardless of whether you have data in other streaming data stores such as Amazon Kinesis or Redpanda.

Note If you prefer to use DeltaStream’s Web application, see for those details and procedures.

We assume you already have created your account and signed into DeltaStream. Also, we assume you already have created a DeltaStream organization or have joined an existing organization.

You accomplish the following steps in this guide for the CLI:

  • Download the DeltaStream CLI.

  • Connect to your streaming data store (in this case, Apache Kafka) by creating a data store in DeltaStream.

  • Create your first database.

  • Create streams and changelogs for your Kafka topics.

  • Create new streams, changelogs, and materialized views using DeltaStream’s continuous queries.

  • (Optional) Share your streaming data with other users in your organization.

Download the DeltaStream CLI

  1. Log into DeltaStream.

  2. At the bottom of the lefthand navigation, click Support Center ( ). The Support menu displays.

  3. Click the OS you want. The DeltaStream CLI begins downloading automatically into a dscli folder.

  4. Unzip and deploy the files as you would with any compressed application.

Create a Data Store

The first step is to create a data store in DeltaStream. A data store is a streaming service such as Apache Kafka or Amazon Kinesis where your streaming data resides.

Note The CLI refers to data stores simply as "stores."

Use the CREATE STORE statement to create a data store in the DeltaStream CLI:

<no-db>/<no-store># CREATE STORE MSK 
WITH ( 
 'type' = KAFKA ,
 'kafka.sasl.hash_function' = SHA512, 
 'kafka.sasl.password' = '**********', 
 'kafka.sasl.username' = 'mskconsumer', 
 'uris'='b-1-public.cmnuseast1datagenmsk.89yho3.c26.kafka.us-east-1.amazonaws.com:9196'
);

<no-db>/msk# SHOW STORES;
  Name | Kind  | Metadata |  Owner   |      Created at      |      Updated at       
-------+-------+---------------+----------+----------+----------------------+-----------------------
  msk  | Kafka | {}       | sysadmin | 2023-02-20T11:03:18Z | 2023-02-20T11:03:18Z  
<no-db>/msk#

Note No default data store indicates in the prompt until you create your first data store.

As is indicated below, in declaring the data store, DeltaStream provides the required configurations to connect and use the streaming data store from DeltaStream.

You can now inspect the data store and print to view the content of its topics:

<no-db>/msk# SHOW ENTITIES;
      Entity name       
----------------------- 
  ds_pageviews         
  ds_pageviews_pb      
  ds_shipments         
  ds_users             
<no-db>/msk# PRINT ENTITY ds_pageviews;
{"userid":"User_9"} | {"viewtime":1676891565084,"userid":"User_9","pageid":"Page_28"}
{"userid":"User_7"} | {"viewtime":1676891565264,"userid":"User_7","pageid":"Page_16"}
{"userid":"User_7"} | {"viewtime":1676891565404,"userid":"User_7","pageid":"Page_84"}
{"userid":"User_4"} | {"viewtime":1676891565524,"userid":"User_4","pageid":"Page_17"}
{"userid":"User_3"} | {"viewtime":1676891565664,"userid":"User_3","pageid":"Page_47"}
{"userid":"User_5"} | {"viewtime":1676891565784,"userid":"User_5","pageid":"Page_72"}

Create a Database

<no-db>/msk# CREATE DATABASE TestDB;
testdb.public/msk# SHOW DATABASES;
   Name  | Default |  Owner   |      Created at      |      Updated at       
---------+---------+----------+----------------------+-----------------------
  testdb | ✓       | sysadmin | 2023-02-20T11:18:46Z | 2023-02-20T11:18:46Z  
testdb.public/msk# SHOW NAMESPACES;
   Name  | Default |  Owner   |      Created at      |      Updated at       
---------+---------+----------+----------------------+-----------------------
  public | ✓       | sysadmin | 2023-02-20T11:18:46Z | 2023-02-20T11:18:46Z  

The prompt displays the current database and namespace.

Create Streams and Changelogs

Now create DeltaStream objects on top of your Kafka topics. You do this using DeltaStream’s DDL statements.

Note In the CLI, you can use the SHOW command and the LIST command interchangeably.

testdb.public/msk# CREATE STREAM pageviews (
    viewtime BIGINT, 
    userid VARCHAR, 
    pageid VARCHAR
) WITH (
    'topic'='ds_pageviews', 
    'value.format'='JSON'
 );
testdb.public/msk# SHOW RELATIONS;
    Name    |  Type  |  Owner   |      Created at      |      Updated at       
------------+--------+----------+----------------------+-----------------------
  pageviews | Stream | sysadmin | 2023-02-20T11:21:58Z | 2023-02-20T11:21:58Z  

Note The DeltaStream UI uses the term OBJECTS instead of RELATIONS.

testdb.public/msk# CREATE CHANGELOG users(
    registertime BIGINT, 
    userid VARCHAR, 
    regionid VARCHAR, 
    gender VARCHAR, 
    interests ARRAY<VARCHAR>, 
    contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>, 
    PRIMARY KEY(userid)
) WITH (
    'topic'='ds_users', 
    'key.format'='json', 
    'key.type'='STRUCT<userid VARCHAR>', 
    'value.format'='json'
);

testdb.public/msk# SHOW RELATIONS;
    Name    |   Type    |  Owner   |      Created at      |      Updated at       
------------+-----------+----------+----------------------+-----------------------
  pageviews | Stream    | sysadmin | 2023-02-20T11:21:58Z | 2023-02-20T11:21:58Z  
  users     | Changelog | sysadmin | 2023-02-20T11:29:58Z | 2023-02-20T11:29:58Z  

Run Queries

Here's an example: Use the following interactive query to inspect the pageviews stream:

testdb.public/msk# SELECT * FROM pageviews;
^C to exit
Waiting for sandbox to be in running state 'defined'................
Waiting for interactive query('defined') to be in running state...
Interactive query is running
 | {"viewtime":1676893147888,"userid":"User_8","pageid":"Page_39"}
 | {"viewtime":1676893148008,"userid":"User_4","pageid":"Page_72"}
 | {"viewtime":1676893148148,"userid":"User_7","pageid":"Page_41"}
 | {"viewtime":1676893148268,"userid":"User_5","pageid":"Page_98"}
 | {"viewtime":1676893148243,"userid":"User_7","pageid":"Page_12"}
 | {"viewtime":1676893148369,"userid":"User_1","pageid":"Page_10"}
 | {"viewtime":1676893148423,"userid":"User_9","pageid":"Page_12"}
 | {"viewtime":1676893148488,"userid":"User_8","pageid":"Page_75"}
CREATE STREAM csas_enriched_pv AS 
SELECT 
    TO_TIMESTAMP_LTZ(viewtime, 3) AS viewtime,  
    p.userid AS userid, 
    pageid, 
    TO_TIMESTAMP_LTZ(registertime, 3) AS registertime, 
    regionid, 
    gender, 
    interests, 
    contactinfo
FROM pageviews p
    JOIN "users" u ON u.userid = p.userid;

During this time, DeltaStream compiles and launches the query as an Apache Flink streaming job.

testdb.public/msk# 
testdb.public/msk# SHOW QUERIES;
                   ID                  |  State  |                             DSQL                             |  Owner   |      Created at      |      Updated at       
---------------------------------------+---------+--------------------------------------------------------------+----------+----------------------+-----------------------
  a913595a-ad09-452e-81e1-3f440b56fae2 | RUNNING | CREATE STREAM                                                | sysadmin | 2023-02-20T11:42:53Z | 2023-02-20T11:42:53Z  
                                       |         | csas_enriched_pv AS SELECT                                   |          |                      |                       
                                       |         | TO_TIMESTAMP_LTZ(viewtime,                                   |          |                      |                       
                                       |         | 3) AS viewtime, p.userid                                     |          |                      |                       
                                       |         | AS userid, pageid,                                           |          |                      |                       
                                       |         | TO_TIMESTAMP_LTZ(registertime,                               |          |                      |                       
                                       |         | 3) AS registertime, regionid,                                |          |                      |                       
                                       |         | gender, interests, contactinfo                               |          |                      |                       
                                       |         | FROM pageviews p JOIN users u                                |          |                      |                       
                                       |         | ON u.userid = p.userid;                                      |          |                      |                       
testdb.public/msk# 

When the query runs successfully, you have a new Kafka topic named csas_enriched_pv in your Kafka cluster, plus a new stream added to the streams in your TestDB database. To examine the contents of the new stream, run the following query:

testdb.public/msk# 
testdb.public/msk# SELECT * FROM csas_enriched_pv;
^C to exit
Interactive query is running
 | {"viewtime":"2023-02-20T11:47:05.717Z","userid":"User_6","pageid":"Page_99","registertime":"2023-02-20T11:47:05.676Z","regionid":"Region_6","gender":"FEMALE","interests":["News","Movies"],"contactinfo":{"phone":"9492229999","city":"Irvine","state":"CA","zipcode":"92617"}}
 | {"viewtime":"2023-02-20T11:47:05.856Z","userid":"User_5","pageid":"Page_77","registertime":"2023-02-20T11:46:44.951Z","regionid":"Region_4","gender":"OTHER","interests":["Game","Sport"],"contactinfo":{"phone":"6503889999","city":"Palo Alto","state":"CA","zipcode":"94301"}}
 | {"viewtime":"2023-02-20T11:47:06.056Z","userid":"User_9","pageid":"Page_24","registertime":"2023-02-20T11:46:47.991Z","regionid":"Region_9","gender":"OTHER","interests":["News","Travel"],"contactinfo":{"phone":"6503349999","city":"San Mateo","state":"CA","zipcode":"94403"}}
 | {"viewtime":"2023-02-20T11:47:06.196Z","userid":"User_4","pageid":"Page_45","registertime":"2023-02-20T11:46:36.748Z","regionid":"Region_5","gender":"FEMALE","interests":["Game","Sport"],"contactinfo":{"phone":"4083366881","city":"San Jose","state":"CA","zipcode":"95112"}}
 testdb.public/msk# 

Now that you have the enriched pageviews stream, you can build a materialized view in which you compute the number of pageviews per user. To create this materialized view, type the following statement:

CREATE MATERIALIZED VIEW user_view_count AS 
SELECT
    userid, 
    COUNT(*) AS view_count 
FROM csas_enriched_pv 
GROUP BY userid;
testdb.public/msk# 
testdb.public/msk# SHOW QUERIES;
                   ID                  |  State  |                             DSQL                             |  Owner   |      Created at      |      Updated at       
---------------------------------------+---------+--------------------------------------------------------------+----------+----------------------+-----------------------
  cf6d2092-ff25-461f-801d-5fbb0d5ceb58 | RUNNING | CREATE MATERIALIZED VIEW                                     | sysadmin | 2023-02-20T12:56:53Z | 2023-02-20T12:56:53Z  
                                       |         | user_view_count AS SELECT                                    |          |                      |                       
                                       |         | userid, COUNT(*) AS view_count                               |          |                      |                       
                                       |         | FROM csas_enriched_pv GROUP BY                               |          |                      |                       
                                       |         | userid;                                                      |          |                      |                       
  a913595a-ad09-452e-81e1-3f440b56fae2 | RUNNING | CREATE STREAM                                                | sysadmin | 2023-02-20T11:42:53Z | 2023-02-20T11:42:53Z  
                                       |         | csas_enriched_pv AS SELECT                                   |          |                      |                       
                                       |         | TO_TIMESTAMP_LTZ(viewtime,                                   |          |                      |                       
                                       |         | 3) AS viewtime, p.userid                                     |          |                      |                       
                                       |         | AS userid, pageid,                                           |          |                      |                       
                                       |         | TO_TIMESTAMP_LTZ(registertime,                               |          |                      |                       
                                       |         | 3) AS registertime, regionid,                                |          |                      |                       
                                       |         | gender, interests, contactinfo                               |          |                      |                       
                                       |         | FROM pageviews p JOIN users u                                |          |                      |                       
                                       |         | ON u.userid = p.userid;                                      |          |                      |                       
testdb.public/msk# 

You can query the resulting materialized view the same way you'd query a materialized view in a traditional relational database -- except that in DeltaStream, the streaming job always keeps the data in the materialized view fresh.

Below is a simple query to get the current view count for a user with the userid of User_2:

testdb.public/msk# 
testdb.public/msk# SELECT * FROM user_view_count WHERE userid = 'User_2';
  userid | view_count  
---------+-------------
  User_2 |        580  
testdb.public/msk# he

As you see, the number of pageviews for User_2 is 580 at the time you run the above query. Run the query again, and you see an updated result for the pageview count for the user. This demonstrates that every time you run a query on a materialized view, you receive the most up-to-date result. DeltaStream ensures the data in the view is continuously updated using the continuous query that declared the materialized view.

Now wait a few more seconds. Then run the same query on the materialized view. You should see something similar to the below:

testdb.public/msk# 
testdb.public/msk# SELECT * FROM user_view_count WHERE userid = 'User_2';
  userid | view_count  
---------+-------------
  User_2 |        580  
testdb.public/msk# 
testdb.public/msk# 
testdb.public/msk# SELECT * FROM user_view_count WHERE userid = 'User_2';
  userid | view_count  
---------+-------------
  User_2 |        818  
testdb.public/msk# 

As you see, the result is updated to 818 from the previous value of 580.

In summary, this guide has demonstrated how DeltaStream makes it easy to build a stream processing applications using the CLI. You've created a query that joins pageviews and users and creates a new stream called csas_enriched_pv. You also ran another query that creates a materialized view named user_view_count from csas_enriched_pv.

Clean Up

Now it's time to clean up your environment. To do this:

  1. Terminate the queries.

  2. Drop the created streams, changelogs, and materialized views.

  3. Go to the corresponding database and namespace to drop the streams, changelogs, and materialized views.

Important If there is a query that uses a stream, changelog, or materialized view, terminate the query before you drop the relation.

Your new data store displays as the default data store in the prompt. Use the command to view the available data stores you have created in DeltaStream.

In DeltaStream, you use databases to organize your streaming data in an intuitive namespace. Use the statement to create a database. When you create a database, DeltaStream in turn creates a default namespace, named public, in the database. The following shows the statement that creates a new database labelled TestDB:

To manage streaming data in an entity as an append-only stream, in which each streaming event is independent, define it as a . In the example below, you declare a stream on the ds_pageviews topic, as each pageview event is an independent event:

Next, declare a changelog for the ds_users topic. A indicates that you wish to interpret events in an entity as UPSERT events. The events should have a primary key, and each event is interpreted as an insert or update for the given primary key. Use the command to declare the users changelog:

Now that you have declared streams and changelogs, you can write continuous queries in SQL to process this streaming data in real time. Start with a , wherein the query results stream back to you. You can use such queries to inspect your streams and changelogs, or to build queries iteratively by inspecting the query’s result.

While interactive query results display, DeltaStream provides a , wherein the query results are saved back in a store or a . To proceed, write a persistent query that joins the pageviews stream with the changelog to create an enriched pageviews stream that includes user details for each pageview event. While you're at it, also convert the epoch time to a timestamp with a time zone via the TO_TIMESTAMP_LTZ function.

You should be able to use the command to view the query along with its status:

When you run the above query, DeltaStream launches a streaming job that runs the statement and materializes the result of the query. To view this query, use the command:

LIST STORES
CREATE DATABASE
LIST QUERIES
SELECT
LIST QUERIES
Starting with the Web App
CREATE CHANGELOG
stream
changelog
materialized view
Where to download the DeltaStream CLI
Streaming or Continuous Query
Streaming or Continuous Query