LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Region
      • SQL
      • Store
      • Database
      • Query
      • Visualizing Data Lineage
      • Function
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • Tutorials
    • Managing Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Creating Stores for Streaming Data
    • Using Multiple Stores in Queries
    • Creating Relations to Structure Raw Data
    • Namespacing with Database and Schema
    • Creating and Querying Materialized Views
    • Creating a Function
    • Securing Your Connections to Data Stores
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Integrations
      • Connecting to Confluent Cloud
      • Databricks
      • PostgreSQL
      • Snowflake
      • WarpStream
    • Serialization
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Reference
    • Enterprise Security Integrations
      • Okta SAML Integration
      • Okta SCIM Integration
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST REGIONS
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • Rest API
Powered by GitBook
On this page
  • Understanding the Data
  • Defining Relations
  • Using Relations
  1. Tutorials

Creating Relations to Structure Raw Data

PreviousUsing Multiple Stores in QueriesNextNamespacing with Database and Schema

Last updated 5 months ago

In DeltaStream a Store provides a layer of abstraction around the raw streaming data. To process that data in queries, we must make sense of the data. To this end, we use for defining the metadata and data format that in turn describe the structure of the data for its native format.

Understanding the Data

As an example, below is a defined Apache Kafka store that contains several entities:

demodb.public/msk_public# LIST ENTITIES;
      Entity name       
-----------------------
  ds_syslogs      
  ds_pageviews         
  ds_shipments         
  ds_users             

Now assume all entities are in JSON format. See CREATE STORE and UPDATE ENTITY for using other serialization formats. For information around data formats -- for example, CREATE STREAM -- refer to the relation’s DDL statements.

You can inspect the entities to understand the kind of data you have -- for example the ds_pageviews entity:

demodb.public/msk# PRINT ENTITY ds_pageviews;
{"userid":"User_7"} | {"viewtime":1677196372920,"userid":"User_7","pageid":"Page_82"}
{"userid":"User_3"} | {"viewtime":1677196372962,"userid":"User_3","pageid":"Page_97"}
{"userid":"User_6"} | {"viewtime":1677196373021,"userid":"User_6","pageid":"Page_80"}
{"userid":"User_1"} | {"viewtime":1677196373081,"userid":"User_1","pageid":"Page_73"}
{"userid":"User_2"} | {"viewtime":1677196373122,"userid":"User_2","pageid":"Page_35"}
{"userid":"User_7"} | {"viewtime":1677196373182,"userid":"User_7","pageid":"Page_58"}

Here is the ds_users entity:

demodb.public/msk# PRINT ENTITY ds_users;
{"userid":"User_6"} | {"registertime":1677196517022,"userid":"User_6","regionid":"Region_9","gender":"OTHER","interests":["News","Movies"],"contactinfo":{"phone":"6503889999","city":"Palo Alto","state":"CA","zipcode":"94301"}}
{"userid":"User_8"} | {"registertime":1677196517619,"userid":"User_8","regionid":"Region_5","gender":"FEMALE","interests":["News","Movies"],"contactinfo":{"phone":"6502215368","city":"San Carlos","state":"CA","zipcode":"94070"}}
{"userid":"User_1"} | {"registertime":1677196518042,"userid":"User_1","regionid":"Region_3","gender":"FEMALE","interests":["News","Movies"],"contactinfo":{"phone":"9492229999","city":"Irvine","state":"CA","zipcode":"92617"}}
{"userid":"User_4"} | {"registertime":1677196518620,"userid":"User_4","regionid":"Region_6","gender":"OTHER","interests":["News","Movies"],"contactinfo":{"phone":"6503889999","city":"Palo Alto","state":"CA","zipcode":"94301"}}

Defining Relations

CREATE STREAM pageviews (
    viewtime BIGINT, userid VARCHAR, pageid VARCHAR
) WITH ('topic'='ds_pageviews', 'value.format'='JSON');

See CREATE STREAM for more information.

CREATE CHANGELOG users_log (
    registertime BIGINT, userid VARCHAR, regionid VARCHAR, gender VARCHAR, interests ARRAY<VARCHAR>, contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>,
    PRIMARY KEY(userid)
) WITH ('topic'='ds_users', 'key.format'='JSON', 'key.type'='STRUCT<userid VARCHAR>', 'value.format'='JSON');

See CREATE CHANGELOG for more information.

When you have defined relations, you can list them through their database and schema:

demodb.public/msk# LIST RELATIONS;
          Name         |       Type       |  Owner   |      Created at      |      Updated at       
-----------------------+------------------+----------+----------------------+-----------------------
  users_log            | Changelog        | sysadmin | 2023-01-12T20:41:00Z | 2023-01-12T20:41:00Z  
  pageviews            | Stream           | sysadmin | 2023-01-12T20:39:02Z | 2023-01-12T20:39:02Z  

In addition to listing them, you can also use their database and schema to describe them:

demodb.public/msk# DESCRIBE RELATION pageviews;
    Name    |  Type  |                           Metadata                           |                 Columns                  |      Details       | Primary key |  Owner   |      Created at      |      Updated at       
------------+--------+--------------------------------------------------------------+------------------------------------------+--------------------+-------------+----------+----------------------+-----------------------
  pageviews | Stream | {value.format : json,store : msk,topic : pageviews}          | viewtime  BIGINT                         | store=msk          |             | sysadmin | 2023-01-12T20:39:02Z | 2023-01-12T20:39:02Z  
            |        |                                                              | userid  VARCHAR                          | topic=pageviews    |             |          |                      |                       
            |        |                                                              | pageid  VARCHAR                          |                    |             |          |                      |                       

Using Relations

When you define relations for an entity, it becomes available to DeltaStream as a consumable entity. For example, you can use them in interactive queries:

demodb.public/msk# SELECT * FROM pageviews;
{"userid":"User_5"} | {"viewtime":1677274911334,"userid":"User_5","pageid":"Page_14"}
{"userid":"User_8"} | {"viewtime":1677274911528,"userid":"User_8","pageid":"Page_65"}
{"userid":"User_9"} | {"viewtime":1677274911766,"userid":"User_9","pageid":"Page_49"}
{"userid":"User_3"} | {"viewtime":1677274911812,"userid":"User_3","pageid":"Page_21"}
{"userid":"User_3"} | {"viewtime":1677274912412,"userid":"User_3","pageid":"Page_25"}
{"userid":"User_1"} | {"viewtime":1677274912569,"userid":"User_1","pageid":"Page_56"}
{"userid":"User_6"} | {"viewtime":1677274912819,"userid":"User_6","pageid":"Page_20"}

You can also use an entity in persistent queries, where they are continuously used as a source or sink:

CREATE STREAM user2_views
    AS SELECT userid, pageid
    FROM pageviews
    WHERE userid = 'User_2';

When you know what the data looks like in your entities, you can attach a structure to them for reference in queries. In the example below, as ds_pageviews is a continuous stream of immutable page events from your users, you can define a for it:

Since the ds_users entity hosts user information that changes over time, you can define a to capture ongoing changes to each userid:

Note For certain applications, it may be more useful to have access to a snapshot of the resulting data. See and CREATE MATERIALIZED VIEW AS for more information on how to create a view for the data.

relations
Stream
Changelog
Materialized View