Creating Relations to Structure Raw Data

In DeltaStream a Store provides a layer of abstraction around the raw streaming data. To process that data in queries, we must make sense of the data. To this end, we use relations for defining the metadata and data format that in turn describe the structure of the data for its native format.

Understanding the Data

As an example, below is a defined Apache Kafka store that contains several entities:

demodb.public/msk_public# LIST ENTITIES;
      Entity name       
-----------------------
  ds_syslogs      
  ds_pageviews         
  ds_shipments         
  ds_users             

Now assume all entities are in JSON format. See CREATE STORE and UPDATE ENTITY for using other serialization formats. For information around data formats -- for example, CREATE STREAM -- refer to the relation’s DDL statements.

You can inspect the entities to understand the kind of data you have -- for example the ds_pageviews entity:

demodb.public/msk# PRINT ENTITY ds_pageviews;
{"userid":"User_7"} | {"viewtime":1677196372920,"userid":"User_7","pageid":"Page_82"}
{"userid":"User_3"} | {"viewtime":1677196372962,"userid":"User_3","pageid":"Page_97"}
{"userid":"User_6"} | {"viewtime":1677196373021,"userid":"User_6","pageid":"Page_80"}
{"userid":"User_1"} | {"viewtime":1677196373081,"userid":"User_1","pageid":"Page_73"}
{"userid":"User_2"} | {"viewtime":1677196373122,"userid":"User_2","pageid":"Page_35"}
{"userid":"User_7"} | {"viewtime":1677196373182,"userid":"User_7","pageid":"Page_58"}

Here is the ds_users entity:

demodb.public/msk# PRINT ENTITY ds_users;
{"userid":"User_6"} | {"registertime":1677196517022,"userid":"User_6","regionid":"Region_9","gender":"OTHER","interests":["News","Movies"],"contactinfo":{"phone":"6503889999","city":"Palo Alto","state":"CA","zipcode":"94301"}}
{"userid":"User_8"} | {"registertime":1677196517619,"userid":"User_8","regionid":"Region_5","gender":"FEMALE","interests":["News","Movies"],"contactinfo":{"phone":"6502215368","city":"San Carlos","state":"CA","zipcode":"94070"}}
{"userid":"User_1"} | {"registertime":1677196518042,"userid":"User_1","regionid":"Region_3","gender":"FEMALE","interests":["News","Movies"],"contactinfo":{"phone":"9492229999","city":"Irvine","state":"CA","zipcode":"92617"}}
{"userid":"User_4"} | {"registertime":1677196518620,"userid":"User_4","regionid":"Region_6","gender":"OTHER","interests":["News","Movies"],"contactinfo":{"phone":"6503889999","city":"Palo Alto","state":"CA","zipcode":"94301"}}

Defining Relations

CREATE STREAM pageviews (
    viewtime BIGINT, userid VARCHAR, pageid VARCHAR
) WITH ('topic'='ds_pageviews', 'value.format'='JSON');

See CREATE STREAM for more information.

CREATE CHANGELOG users_log (
    registertime BIGINT, userid VARCHAR, regionid VARCHAR, gender VARCHAR, interests ARRAY<VARCHAR>, contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>,
    PRIMARY KEY(userid)
) WITH ('topic'='ds_users', 'key.format'='JSON', 'key.type'='STRUCT<userid VARCHAR>', 'value.format'='JSON');

See CREATE CHANGELOG for more information.

When you have defined relations, you can list them through their database and schema:

demodb.public/msk# LIST RELATIONS;
          Name         |       Type       |  Owner   |      Created at      |      Updated at       
-----------------------+------------------+----------+----------------------+-----------------------
  users_log            | Changelog        | sysadmin | 2023-01-12T20:41:00Z | 2023-01-12T20:41:00Z  
  pageviews            | Stream           | sysadmin | 2023-01-12T20:39:02Z | 2023-01-12T20:39:02Z  

In addition to listing them, you can also use their database and schema to describe them:

demodb.public/msk# DESCRIBE RELATION pageviews;
    Name    |  Type  |                           Metadata                           |                 Columns                  |      Details       | Primary key |  Owner   |      Created at      |      Updated at       
------------+--------+--------------------------------------------------------------+------------------------------------------+--------------------+-------------+----------+----------------------+-----------------------
  pageviews | Stream | {value.format : json,store : msk,topic : pageviews}          | viewtime  BIGINT                         | store=msk          |             | sysadmin | 2023-01-12T20:39:02Z | 2023-01-12T20:39:02Z  
            |        |                                                              | userid  VARCHAR                          | topic=pageviews    |             |          |                      |                       
            |        |                                                              | pageid  VARCHAR                          |                    |             |          |                      |                       

Using Relations

When you define relations for an entity, it becomes available to DeltaStream as a consumable entity. For example, you can use them in interactive queries:

demodb.public/msk# SELECT * FROM pageviews;
{"userid":"User_5"} | {"viewtime":1677274911334,"userid":"User_5","pageid":"Page_14"}
{"userid":"User_8"} | {"viewtime":1677274911528,"userid":"User_8","pageid":"Page_65"}
{"userid":"User_9"} | {"viewtime":1677274911766,"userid":"User_9","pageid":"Page_49"}
{"userid":"User_3"} | {"viewtime":1677274911812,"userid":"User_3","pageid":"Page_21"}
{"userid":"User_3"} | {"viewtime":1677274912412,"userid":"User_3","pageid":"Page_25"}
{"userid":"User_1"} | {"viewtime":1677274912569,"userid":"User_1","pageid":"Page_56"}
{"userid":"User_6"} | {"viewtime":1677274912819,"userid":"User_6","pageid":"Page_20"}

You can also use an entity in persistent queries, where they are continuously used as a source or sink:

CREATE STREAM user2_views
    AS SELECT userid, pageid
    FROM pageviews
    WHERE userid = 'User_2';

Last updated