LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Compute Pools
      • Data Store
      • Database
      • Function
      • Query
      • SQL
      • Visualizing Data Lineage
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • How do I...?
    • Create and Manage Data Stores
      • Create Data Stores for Streaming Data
      • Explore Data Store and Topic Details
      • Use Multiple Data Stores in Queries
    • Manage Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Create DeltaStream Objects to Structure Raw Data
    • Use Namespacing for Organizing Data
    • Create and Query Materialized Views
    • Create a Compute Pool to Work with Iceberg
    • Create a Function
    • Secure my Connection to a Data Store
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Serialize my Data
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Integrations
    • Setting up Data Store Integrations
      • AWS S3
      • ClickHouse
      • Confluent Cloud
      • Databricks
      • Iceberg REST Catalog
      • PostgreSQL
      • Snowflake
      • WarpStream
  • Setting up Enterprise Security Integrations
    • Okta SAML Integration
    • Okta SCIM Integration
  • use cases
    • Using an AWS S3 Store as a Source to Feed an MSK Topic
  • Reference
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST COMPUTE_POOLS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
        • START COMPUTE_POOL
        • STOP COMPUTE_POOL
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE COMPUTE_POOL
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP COMPUTE_POOL
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • START/STOP COMPUTE_POOL
        • UPDATE COMPUTE_POOL
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • DeltaStream OpenAPI
      • Deltastream
      • Models
Powered by GitBook
On this page
  • Understanding the Data
  • Defining DeltaStream Objects
  • Using DeltaStream Objects
  1. How do I...?

Create DeltaStream Objects to Structure Raw Data

In DeltaStream a Data Store provides a layer of abstraction around the raw streaming data. To process that data in queries, we must make sense of the data. To this end, we use DeltaStream objects for defining the metadata and data format that in turn describe the structure of the data for its native format.

Understanding the Data

As an example, below is a defined Apache Kafka store that contains several entities:

demodb.public/msk_public# LIST ENTITIES;
      Entity name       
-----------------------
  ds_syslogs      
  ds_pageviews         
  ds_shipments         
  ds_users             

Now assume all entities are in JSON format. See CREATE STORE and UPDATE ENTITY for using other serialization formats. For information around data formats -- for example, CREATE STREAM -- refer to the relation’s DDL statements.

You can inspect the entities to understand the kind of data you have -- for example the ds_pageviews entity:

demodb.public/msk# PRINT ENTITY ds_pageviews;
{"userid":"User_7"} | {"viewtime":1677196372920,"userid":"User_7","pageid":"Page_82"}
{"userid":"User_3"} | {"viewtime":1677196372962,"userid":"User_3","pageid":"Page_97"}
{"userid":"User_6"} | {"viewtime":1677196373021,"userid":"User_6","pageid":"Page_80"}
{"userid":"User_1"} | {"viewtime":1677196373081,"userid":"User_1","pageid":"Page_73"}
{"userid":"User_2"} | {"viewtime":1677196373122,"userid":"User_2","pageid":"Page_35"}
{"userid":"User_7"} | {"viewtime":1677196373182,"userid":"User_7","pageid":"Page_58"}

Here is the ds_users entity:

demodb.public/msk# PRINT ENTITY ds_users;
{"userid":"User_6"} | {"registertime":1677196517022,"userid":"User_6","regionid":"Region_9","gender":"OTHER","interests":["News","Movies"],"contactinfo":{"phone":"6503889999","city":"Palo Alto","state":"CA","zipcode":"94301"}}
{"userid":"User_8"} | {"registertime":1677196517619,"userid":"User_8","regionid":"Region_5","gender":"FEMALE","interests":["News","Movies"],"contactinfo":{"phone":"6502215368","city":"San Carlos","state":"CA","zipcode":"94070"}}
{"userid":"User_1"} | {"registertime":1677196518042,"userid":"User_1","regionid":"Region_3","gender":"FEMALE","interests":["News","Movies"],"contactinfo":{"phone":"9492229999","city":"Irvine","state":"CA","zipcode":"92617"}}
{"userid":"User_4"} | {"registertime":1677196518620,"userid":"User_4","regionid":"Region_6","gender":"OTHER","interests":["News","Movies"],"contactinfo":{"phone":"6503889999","city":"Palo Alto","state":"CA","zipcode":"94301"}}

Defining DeltaStream Objects

CREATE STREAM pageviews (
    viewtime BIGINT, userid VARCHAR, pageid VARCHAR
) WITH ('topic'='ds_pageviews', 'value.format'='JSON');

See CREATE STREAM for more information.

CREATE CHANGELOG users_log (
    registertime BIGINT, userid VARCHAR, regionid VARCHAR, gender VARCHAR, interests ARRAY<VARCHAR>, contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>,
    PRIMARY KEY(userid)
) WITH ('topic'='ds_users', 'key.format'='JSON', 'key.type'='STRUCT<userid VARCHAR>', 'value.format'='JSON');

See CREATE CHANGELOG for more information.

When you have defined objects, you can list them through their database and namespace:

demodb.public/msk# LIST OBJECTS;
          Name         |       Type       |  Owner   |      Created at      |      Updated at       
-----------------------+------------------+----------+----------------------+-----------------------
  users_log            | Changelog        | sysadmin | 2023-01-12T20:41:00Z | 2023-01-12T20:41:00Z  
  pageviews            | Stream           | sysadmin | 2023-01-12T20:39:02Z | 2023-01-12T20:39:02Z  

In addition to listing them, you can also use their database and namespace to describe them:

demodb.public/msk# DESCRIBE OBJECT pageviews;
    Name    |  Type  |                           Metadata                           |                 Columns                  |      Details       | Primary key |  Owner   |      Created at      |      Updated at       
------------+--------+--------------------------------------------------------------+------------------------------------------+--------------------+-------------+----------+----------------------+-----------------------
  pageviews | Stream | {value.format : json,store : msk,topic : pageviews}          | viewtime  BIGINT                         | store=msk          |             | sysadmin | 2023-01-12T20:39:02Z | 2023-01-12T20:39:02Z  
            |        |                                                              | userid  VARCHAR                          | topic=pageviews    |             |          |                      |                       
            |        |                                                              | pageid  VARCHAR                          |                    |             |          |                      |                       

Using DeltaStream Objects

When you define objects for an entity, it becomes available to DeltaStream as a consumable entity. For example, you can use them in interactive queries:

demodb.public/msk# SELECT * FROM pageviews;
{"userid":"User_5"} | {"viewtime":1677274911334,"userid":"User_5","pageid":"Page_14"}
{"userid":"User_8"} | {"viewtime":1677274911528,"userid":"User_8","pageid":"Page_65"}
{"userid":"User_9"} | {"viewtime":1677274911766,"userid":"User_9","pageid":"Page_49"}
{"userid":"User_3"} | {"viewtime":1677274911812,"userid":"User_3","pageid":"Page_21"}
{"userid":"User_3"} | {"viewtime":1677274912412,"userid":"User_3","pageid":"Page_25"}
{"userid":"User_1"} | {"viewtime":1677274912569,"userid":"User_1","pageid":"Page_56"}
{"userid":"User_6"} | {"viewtime":1677274912819,"userid":"User_6","pageid":"Page_20"}

You can also use an entity in persistent queries, where they are continuously used as a source or sink:

CREATE STREAM user2_views
    AS SELECT userid, pageid
    FROM pageviews
    WHERE userid = 'User_2';
PreviousExample: Setting Up Custom Roles for Production and StageNextUse Namespacing for Organizing Data

Last updated 16 days ago

When you know what the data looks like in your entities, you can attach a structure to them for reference in queries. In the example below, as ds_pageviews is a continuous stream of immutable page events from your users, you can define a for it:

Since the ds_users entity hosts user information that changes over time, you can define a to capture ongoing changes to each userid:

Note For certain applications, it may be more useful to have access to a snapshot of the resulting data. See and CREATE MATERIALIZED VIEW AS for more information on how to create a view for the data.

Stream
Changelog
Materialized View