Row Key Definition
Define a Row Key in a DDL statement
Any Database or Database can have a Row Key according to the messages in its Data Store. Row key definition in a DDL statement is optional and currently is only available for relations that are backed by a Kafka topic. If a row key is defined for a relation, source Kafka messages’ keys are used to set the row keys in the relation’s records. The row key in a record can be accessed via the rowkey
function listed in the Row Metadata Functions.
You can define the row key for a relation by adding certain properties to the relation’s DDL statement in the WITH
clause:
key.format
: This is the data format for the row key (that is, how key bytes are serialized) in the messages coming from the relation’s topic. Supported formats for a row key are:primitive
json
protobuf
avro
key.type
: This defines the structure of the row key’s fields when reading records from a source topic:For a row key with a
primitive
format, thekey.type
defines the data type of key values. Valid data types for a primitive row key areSMALLINT
INTEGER
BIGINT
DOUBLE
VARCHAR
.
For a row key in the
json
,avro
, orprotobuf
format, thekey.type
is astruct
whose fields define the names and data types of the key’s fields.
key.columns
: This defines the name(s) of the value columns, separated by commas, that are used to construct the row key when writing to a sink. Note that whenkey.columns
is set,key.type
property is inferred from it. You cannot setkey.type
andkey.columns
together when you define a stream or changelog.For a non-primitive
key.format
, the record key is created as aSTRUCT
whose fields are the columns listed in this property.For a primitive
key.format
, this property must contain exactly one column with a primitive data type.
value.columns.exclude
: This defines the name(s) of the columns, separated by commas, that should be excluded from the record’s value and included only in its key.You can only set this property if you have already defined
key.columns
.The excluded columns must appear at the end of the relation’s column list and must also be listed in
key.columns
.
key.descriptor.name
: When you define a relation with a row key in theprotobuf
format, you must upload a descriptor for the key to DeltaStream prior to running the DDL statement. This descriptor must be associated with the relation’s Data Store. In the DDL statement, thekey.descriptor.name
parameter defines the name of theprotobuf
message descriptor in the descriptor source, which should be used for reading and writing records’ row keys.
Examples
The following DDL statement defines a stream that has a primitive
row key of the VARCHAR
type:
CREATE STREAM pageviews (
viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR
)
WITH (
'topic'='pageviews',
'value.format'='json',
'key.format'='primitive',
'key.type'='VARCHAR'
);
The following DDL statement defines a stream that has a row key in the json
format. The row key in each record can be accessed via the rowkey
function. It is a struct
that has a field named id
of the INTEGER
data type:
CREATE STREAM orders (
order_time BIGINT,
item_name VARCHAR,
price BIGINT
)
WITH (
'topic'='sales',
'value.format'='json',
'key.format'='json',
'key.type'='"STRUCT<id INTEGER>"'
'key.type'='STRUCT<id INTEGER>'
);
The following DDL statement defines a stream whose row key is set using the value of the userid
column. Given that the key is in the json
format, the row key value is captured as STRUCT<userid VARCHAR>.
CREATE STREAM visits (
viewtime BIGINT,
userid VARCHAR,
pageid VARCHAR
)
WITH (
'topic'='pageviews',
'value.format'='json',
'key.format'='json',
'key.columns'='userid'
);
The following DDL statement defines a stream that has a row key in the protobuf
format. The row key in each record has 2 fields:
an
INTEGER
field namedid
a
VARCHAR
field namedregion
.
A protobuf descriptor for the row key is defined with the name customer_key_msg
within the descriptor source of the relation’s topic.
CREATE STREAM customers (
name VARCHAR,
address VARCHAR,
acct_balance INTEGER
)
WITH (
'topic' = 'customers',
'value.format' = 'protobuf',
'key.format' = 'protobuf',
'key.type' = 'STRUCT<id INTEGER, region VARCHAR>',
'value.descriptor.name' = 'customer_value_msg',
'key.descriptor.name' = 'customer_key_msg'
);
The following DDL statement defines a changelog whose row key is set as STRUCT<currency VARCHAR, district VARCHAR>
using the value of the currency
and district
columns. Moreover, district
is set as value.columns.exclude
which means it only appears in the row key and not value. Therefore, the rates
changelog has 3 value columns:
ts
currency
conversion_rate
CREATE CHANGELOG rates (
ts BIGINT,
currency VARCHAR,
conversion_rate DECIMAL(8, 2),
district VARCHAR,
PRIMARY KEY(currency)
)
WITH (
'topic'='exchanges',
'value.format'='json',
'key.format'='json',
'key.columns'='currency,district',
'value.columns.exclude'='district'
);
Row Key in CREATE AS SELECT
Statements
CREATE AS SELECT
StatementsIn addition to a DDL statement, you can define a new stream or changelog using a CREATE AS SELECT
(CAS) statement in CREATE STREAM AS SELECT and CREATE CHANGELOG AS SELECT, respectively. In a CAS statement with a SELECT that does not have a row key definition property set for the sink, the sink relation also won't have a row key, irrespective of whether the source relation(s) have a row key definition.
You can set the below properties as sink properties to define the row key for the new object that you used CAS to create:
key.format
key.columns
value.columns.exclude
Note that when running CAS, you can specify the key.format
for the new object (sink) in the sink’s WITH
clause. If you have not defined a key.format
for a sink that has key properties, the value.format
is used as the key.format
.
For example, the below CSAS statement creates a new stream, pagevisits
, whose row key is in avro
format and is generated using the value of the userid
column.
CREATE STREAM
pagevisits
WITH ('key.format'='avro', 'key.columns'='userid')
AS
SELECT userid, pageid
FROM pageviews
WHERE userid != 'User_2';
The below query runs grouping and aggregation on pageviews.
. It uses a tumbling window to create a new changelog visits_rate
. The row key for visits_rate
is defined using sink properties. Since no key.format
is set, the value.format
for visits_rate
is used as the key.format
.
CREATE CHANGELOG
visits_rate WITH ('key.columns'='window_start,window_end,userid');
AS
SELECT window_start, window_end, userid, count(pageid) AS page_count
FROM TUMBLE(pageviews, size 5 second)
GROUP BY window_start, window_end, userid;
In the next example, an interval join on two streams — pageviews
and users
— creates a new stream named pvusers
. Each record in pvusers
has a row key with 2 fields:
use
city
Further, given that city
is in value.columns.exclude
it only displays in the row key. It is not included in the value columns.
CREATE STREAM
pvusers WITH ('key.format'='json','key.columns'='userid,city','value.columns.exclude'='city')
AS
SELECT u.registertime AS ts,
p.userid,
p.pageid,
u.contactinfo->city AS city
FROM pageviews p JOIN "users" u WITHIN 5 minutes
ON p.userid = u.userid;
Last updated