Row Key Definition

Define a Row Key in a DDL statement

Any Database or Database can have a Database #Row Key according to the messages in its Data Store. Row key definition in a DDL statement is optional and currently is only available for relations that are backed by a Kafka topic. If a row key is defined for a relation, source Kafka messages’ keys are used to set the row keys in the relation’s records. The row key in a record can be accessed via the rowkey function listed in the Row Metadata Functions.

You can define the row key for a relation by adding certain properties to the relation’s DDL statement in the WITH clause:

  • key.format: This is the data format for the row key (that is, how key bytes are serialized) in the messages coming from the relation’s topic. Supported formats for a row key are:

    • primitive

    • json

    • protobuf

    • avro

  • key.type: This defines the structure of the row key’s fields when reading records from a source topic:

    • For a row key with a primitive format, the key.type defines the data type of key values. Valid data types for a primitive row key are

      • SMALLINT

      • INTEGER

      • BIGINT

      • DOUBLE

      • VARCHAR.

    • For a row key in the json, avro, or protobuf format, the key.type is a struct whose fields define the names and data types of the key’s fields.

  • key.columns : This defines the name(s) of the value columns, separated by commas, that are used to construct the row key when writing to a sink. Note that when key.columns is set, key.type property is inferred from it. You cannot set key.type and key.columns together when you define a stream or changelog.

    • For a non-primitive key.format, the record key is created as a STRUCT whose fields are the columns listed in this property.

    • For a primitive key.format, this property must contain exactly one column with a primitive data type.

  • value.columns.exclude : This defines the name(s) of the columns, separated by commas, that should be excluded from the record’s value and included only in its key.

    • You can only set this property if you have already defined key.columns .

    • The excluded columns must appear at the end of the relation’s column list and must also be listed in key.columns.

  • key.descriptor.name: When you define a relation with a row key in the protobuf format, you must upload a descriptor for the key to DeltaStream prior to running the DDL statement. This descriptor must be associated with the relation’s Data Store. In the DDL statement, the key.descriptor.name parameter defines the name of the protobuf message descriptor in the descriptor source, which should be used for reading and writing records’ row keys.

Examples

The following DDL statement defines a stream that has a primitive row key of the VARCHAR type:

CREATE STREAM pageviews (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR
)
WITH (
   'topic'='pageviews',
   'value.format'='json',
   'key.format'='primitive',
   'key.type'='VARCHAR'
);

The following DDL statement defines a stream that has a row key in the json format. The row key in each record can be accessed via the rowkey function. It is a struct that has a field named id of the INTEGER data type:

CREATE STREAM orders (
   order_time BIGINT,
   item_name VARCHAR,
   price BIGINT
)
WITH (
   'topic'='sales',
   'value.format'='json',
   'key.format'='json',
   'key.type'='"STRUCT<id INTEGER>"'
   'key.type'='STRUCT<id INTEGER>'
);

The following DDL statement defines a stream whose row key is set using the value of the userid column. Given that the key is in the json format, the row key value is captured as STRUCT<userid VARCHAR>.

CREATE STREAM visits (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR
)
WITH (
   'topic'='pageviews',
   'value.format'='json',
   'key.format'='json',
   'key.columns'='userid'
);

The following DDL statement defines a stream that has a row key in the protobuf format. The row key in each record has 2 fields:

  1. an INTEGER field named id

  2. a VARCHAR field named region.

A protobuf descriptor for the row key is defined with the name customer_key_msg within the descriptor source of the relation’s topic.

CREATE STREAM customers (
   name VARCHAR,
   address VARCHAR,
   acct_balance INTEGER
)
WITH (
   'topic' = 'customers',
   'value.format' = 'protobuf',
   'key.format' = 'protobuf',
   'key.type' = 'STRUCT<id INTEGER, region VARCHAR>',
   'value.descriptor.name' = 'customer_value_msg',
   'key.descriptor.name' = 'customer_key_msg'
);

The following DDL statement defines a changelog whose row key is set as STRUCT<currency VARCHAR, district VARCHAR> using the value of the currency and district columns. Moreover, district is set as value.columns.exclude which means it only appears in the row key and not value. Therefore, the rates changelog has 3 value columns:

  1. ts

  2. currency

  3. conversion_rate

CREATE CHANGELOG rates (
   ts BIGINT,
   currency VARCHAR,
   conversion_rate DECIMAL(8, 2),
   district VARCHAR,
   PRIMARY KEY(currency)
)
WITH (
   'topic'='exchanges',
   'value.format'='json',
   'key.format'='json',
   'key.columns'='currency,district',
   'value.columns.exclude'='district'
);

Row Key in CREATE AS SELECT Statements

In addition to a DDL statement, you can define a new stream or changelog using a CREATE AS SELECT​ (CAS) statement in CREATE STREAM AS SELECT and CREATE CHANGELOG AS SELECT, respectively. In a CAS statement with a SELECT that does not have a row key definition property set for the sink, the sink relation also won't have a row key, irrespective of whether the source relation(s) have a row key definition.

You can set the below properties as sink properties to define the row key for the new object that you used CAS to create:

  • key.format

  • key.columns

  • value.columns.exclude

Note that when running CAS, you can specify the key.format for the new object (sink) in the sink’s WITH clause. If you have not defined a key.format for a sink that has key properties, the value.format is used as the key.format.

For example, the below CSAS statement creates a new stream, pagevisits, whose row key is in avro format and is generated using the value of the userid column.

CREATE STREAM
    pagevisits
WITH ('key.format'='avro', 'key.columns'='userid')
AS
    SELECT userid, pageid
    FROM pageviews
    WHERE userid != 'User_2';

The below query runs grouping and aggregation on pageviews. . It uses a tumbling window to create a new changelog visits_rate. The row key for visits_rate is defined using sink properties. Since no key.format is set, the value.format for visits_rate is used as the key.format.

CREATE CHANGELOG
    visits_rate WITH ('key.columns'='window_start,window_end,userid');
AS
    SELECT window_start, window_end, userid, count(pageid) AS page_count
    FROM TUMBLE(pageviews, size 5 second)
    GROUP BY window_start, window_end, userid;

In the next example, an interval join on two streams — pageviews and users — creates a new stream named pvusers. Each record in pvusers has a row key with 2 fields:

  1. use

  2. city

Further, given that city is in value.columns.exclude it only displays in the row key. It is not included in the value columns.

CREATE STREAM
    pvusers WITH ('key.format'='json','key.columns'='userid,city','value.columns.exclude'='city')
AS
    SELECT u.registertime AS ts, 
           p.userid,
           p.pageid,
           u.contactinfo->city AS city
    FROM pageviews p JOIN "users" u WITHIN 5 minutes
    ON p.userid = u.userid;

Last updated