Creating Relations to Structure Raw Data
Let’s say we have defined an Apache Kafka Store, and it has a few Topics:
demodb.public/msk_public# LIST TOPICS;
Topic name
-----------------------
ds_syslogs
ds_pageviews
ds_shipments
ds_users
Let’s assume all Topics are in
JSON
format. See CREATE STORE and UPDATE TOPIC for using other serialization formats, and refer to the Relation’s DDL statements for information around data formats, e.g. CREATE STREAM.We can inspect the Topics to understand what kind of data we have. Here is the
ds_pageviews
Topic:demodb.public/msk# PRINT TOPIC ds_pageviews;
{"userid":"User_7"} | {"viewtime":1677196372920,"userid":"User_7","pageid":"Page_82"}
{"userid":"User_3"} | {"viewtime":1677196372962,"userid":"User_3","pageid":"Page_97"}
{"userid":"User_6"} | {"viewtime":1677196373021,"userid":"User_6","pageid":"Page_80"}
{"userid":"User_1"} | {"viewtime":1677196373081,"userid":"User_1","pageid":"Page_73"}
{"userid":"User_2"} | {"viewtime":1677196373122,"userid":"User_2","pageid":"Page_35"}
{"userid":"User_7"} | {"viewtime":1677196373182,"userid":"User_7","pageid":"Page_58"}
And here is the
ds_users
:demodb.public/msk# PRINT TOPIC ds_users;
{"userid":"User_6"} | {"registertime":1677196517022,"userid":"User_6","regionid":"Region_9","gender":"OTHER","interests":["News","Movies"],"contactinfo":{"phone":"6503889999","city":"Palo Alto","state":"CA","zipcode":"94301"}}
{"userid":"User_8"} | {"registertime":1677196517619,"userid":"User_8","regionid":"Region_5","gender":"FEMALE","interests":["News","Movies"],"contactinfo":{"phone":"6502215368","city":"San Carlos","state":"CA","zipcode":"94070"}}
{"userid":"User_1"} | {"registertime":1677196518042,"userid":"User_1","regionid":"Region_3","gender":"FEMALE","interests":["News","Movies"],"contactinfo":{"phone":"9492229999","city":"Irvine","state":"CA","zipcode":"92617"}}
{"userid":"User_4"} | {"registertime":1677196518620,"userid":"User_4","regionid":"Region_6","gender":"OTHER","interests":["News","Movies"],"contactinfo":{"phone":"6503889999","city":"Palo Alto","state":"CA","zipcode":"94301"}}
Now that we know what the data looks like in our Topics, we can attach a structure to them for reference in queries. In our example,
ds_pageviews
is a continuous Stream of immutable page events from our users. So we define a Stream for it:CREATE STREAM pageviews (
viewtime BIGINT, userid VARCHAR, pageid VARCHAR
) WITH ('topic'='ds_pageviews', 'value.format'='JSON');
Since the
ds_users
Topic hosts user information that changes over time, we define a Changelog for it to be able to capture ongoing changes to each userid
:CREATE CHANGELOG users_log (
registertime BIGINT, userid VARCHAR, regionid VARCHAR, gender VARCHAR, interests ARRAY<VARCHAR>, contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>,
PRIMARY KEY(userid)
) WITH ('topic'='ds_users', 'key.format'='JSON', 'key.type'='STRUCT<userid VARCHAR>', 'value.format'='JSON');
For certain applications, it may be more useful to have access to a snapshot of the resulting data. See Materialized View and CREATE MATERIALIZED VIEW AS for more information on how to create a view for the data.
Once Relations are defined, they can be listed through their Database and Schema:
demodb.public/msk# LIST RELATIONS;
Name | Type | Owner | Created at | Updated at
-----------------------+------------------+----------+----------------------+-----------------------
users_log | Changelog | sysadmin | 2023-01-12T20:41:00Z | 2023-01-12T20:41:00Z
pageviews | Stream | sysadmin | 2023-01-12T20:39:02Z | 2023-01-12T20:39:02Z
They can also be described using their Database and Schema:
demodb.public/msk# DESCRIBE RELATION pageviews;
Name | Type | Metadata | Columns | Details | Primary key | Owner | Created at | Updated at
------------+--------+--------------------------------------------------------------+------------------------------------------+--------------------+-------------+----------+----------------------+-----------------------
pageviews | Stream | {value.format : json,store : msk,topic : pageviews} | viewtime BIGINT | store=msk | | sysadmin | 2023-01-12T20:39:02Z | 2023-01-12T20:39:02Z
| | | userid VARCHAR | topic=pageviews | | | |
| | | pageid VARCHAR | | | | |
When Relations are defined for Topics, they become available to DeltaStream as a consumable entity. For example, they can be used in interactive queries:
demodb.public/msk# SELECT * FROM pageviews;
^C to exit
Waiting for sandbox to be in running state 'defined'..................
Waiting for interactive query('defined') to be in running state......
Interactive query is running
{"userid":"User_5"} | {"viewtime":1677274911334,"userid":"User_5","pageid":"Page_14"}
{"userid":"User_8"} | {"viewtime":1677274911528,"userid":"User_8","pageid":"Page_65"}
{"userid":"User_9"} | {"viewtime":1677274911766,"userid":"User_9","pageid":"Page_49"}
{"userid":"User_3"} | {"viewtime":1677274911812,"userid":"User_3","pageid":"Page_21"}
{"userid":"User_3"} | {"viewtime":1677274912412,"userid":"User_3","pageid":"Page_25"}
{"userid":"User_1"} | {"viewtime":1677274912569,"userid":"User_1","pageid":"Page_56"}
{"userid":"User_6"} | {"viewtime":1677274912819,"userid":"User_6","pageid":"Page_20"}
They can also be used in persistent queries, where they are continuously used as a source or sink:
CREATE STREAM user2_views
AS SELECT userid, pageid
FROM pageviews
WHERE userid = 'User_2';