Comment on page
Row Key Definition
Any Stream or Changelog can have a Row Key according to the messages in its Topic. Row key definition in a DDL statement is optional and currently is only available for relations that are backed by a Kafka topic. If a row key is defined for a relation, source Kafka messages’ keys are used to set the row keys in the relation’s records. The row key in a record can be accessed via the
rowkey
function listed in the Row Metadata Functions.The user can define the row key for a relation by adding certain properties to the relation’s DDL statement in the
WITH
clause:key.format
: This is the data format for the row key (i.e. how key bytes are serialized) in the messages coming from the relation’s topic. Supported formats for a row key are:primitive
,json
,protobuf
, andavro
.key.type
: It defines the structure of the row key’s fields:- For a row key with a
primitive
format, thekey.type
defines the data type of key values. Valid data types for a primitive row key are:SMALLINT
,INTEGER
,BIGINT
,DOUBLE
, andVARCHAR
. - For a row key in the
json
,avro
, orprotobuf
format, thekey.type
is astruct
whose fields define the names and data types of the key’s fields.
key.descriptor.name
: When defining a relation with a row key in theprotobuf
format, a Descriptor for the key has to be uploaded to DeltaStream prior to running the DDL statement. This descriptor has to be associated with the relation’s Topic. In the DDL statement, thekey.descriptor.name
parameter defines the name of theprotobuf
message descriptor in the descriptor source, which should be used for reading and writing records’ row keys.
The following DDL statement defines a stream that has a
primitive
row key of the VARCHAR
type:CREATE STREAM pageviews (
viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR
)
WITH (
'topic'='pageviews',
'value.format'='json',
'key.format'='primitive',
'key.type'='VARCHAR'
);
The following DDL statement defines a stream that has a row key in the
json
format. The row key in each record can be accessed via the rowkey
function, and it is a struct
that has a field named id
of the INTEGER
data type:CREATE STREAM orders (
order_time BIGINT,
item_name VARCHAR,
price BIGINT
)
WITH (
'topic'='sales',
'value.format'='json',
'key.format'='json',
'key.type'='"STRUCT<id INTEGER>"'
);
The following DDL statement defines a stream that has a row key in the
protobuf
format. The row key in each record has two fields: an INTEGER
field named id
and a VARCHAR
field named region
. A protobuf descriptor for the row key is defined with the name customer_key_msg
within the descriptor source of the relation’s topic.CREATE STREAM customers (
name VARCHAR,
address VARCHAR,
acct_balance INTEGER
)
WITH (
'topic' = 'customers',
'value.format' = 'protobuf',
'key.format' = 'protobuf',
'key.type' = 'STRUCT<id INTEGER, region VARCHAR>',
'value.descriptor.name' = 'customer_value_msg',
'key.descriptor.name' = 'customer_key_msg'
);
In addition to a DDL statement, a new Stream or Changelog can be defined using a
CREATE AS SELECT
(CAS) statement in CREATE STREAM AS SELECT and CREATE CHANGELOG AS SELECT, respectively. The query in the CAS statement has an impact on the row key definition of the new relation.In a CAS statement with a SELECT clause consisting of only projection and filter operators, using
SELECT
and WHERE
clauses, respectively, the row key definition for the new relation will be the same as the row key definition (if any) of the source.As an example, the below CSAS statement creates a new
pagevisits
stream by filtering some records from the pageviews
stream, which we defined above (see Examples). Given that pageviews
records have row keys with the primitive format,pagevisits
records will have row keys with a similar definition:CREATE STREAM
pagevisits
AS
SELECT userid, pageid
FROM pageviews
WHERE userid != 'User_2';
When running CAS, the
key.format
for the new relation (sink) can be specified in the sink’s WITH
clause. For example, the below CSAS statement is similar to the above one; However, it changes the row key format to json
for the new pagevisits
stream, while the source relation pageviews
has its row key with the primitive
format:CREATE STREAM
pagevisits
WITH ('key.format'='json')
AS
SELECT userid, pageid
FROM pageviews
WHERE userid != 'User_2';
If the SELECT clause in a CREATE CHANGELOG AS SELECT statement has a
GROUP BY
clause, the row key for the new relation will consist of columns in the GROUP BY
clause.The
key.format
for the new relation will be the same as a source relation’s key.format
. Similar to the example above, the key.format
can be changed using the sink’s WITH
clause.For example, the below query runs grouping and aggregation on
pageviews
using a tumbling window to create a new changelog visits_rate
. The row key for visits_rate
will have three fields: window_start
, window_end
, and userid
as they are the columns referred in the GROUP BY
clause:CREATE CHANGELOG
visits_rate
AS
SELECT window_start, window_end, userid, count(pageid) AS page_count
FROM TUMBLE(pageviews, size 5 second)
GROUP BY window_start, window_end, userid;
The below query creates a new changelog
region_stats
by running a grouping and aggregation on the source stream users
. Each record in region_stats
will have a row key with a single field named location
:CREATE CHANGELOG
region_stats
AS
SELECT contactinfo->city AS location, count(userid) AS usr_cnt
FROM "users"
WHERE interests[2] != 'Game'
GROUP BY contactinfo->city;
If the SELECT clause in a CREATE STREAM AS SELECT statement has a
JOIN
clause, the row key for the new relation will consist of the column from the left relation in the JOIN
criteria.The
key.format
for the new relation will be the same as the left source relation’s key.format
. Similar to the example above, the key.format
can be changed using the sink’s WITH
clause.For example, the below query runs an interval join on two streams,
pageviews
and users
, to create a new stream named pvusers
. Each record in pvusers
will have a row key with one field, userid
, as userid
is the join column in pageviews
, which is left-side of the join, referred in the join criteria: p.userid = u.userid
:CREATE STREAM
pvusers
AS
SELECT p.userid, u.registertime
FROM pageviews p JOIN "users" u WITHIN 5 minutes
ON p.userid = u.userid
WHERE p.userid != 'User_5';
Last modified 1mo ago