# Row Key Definition

## Define a Row Key in a DDL statement

Row key definition in a DDL statement is optional and currently is only available for relations that are backed by a Kafka topic. If a row key is defined for a relation, source Kafka messages’ keys are used to set the row keys in the relation’s records. The row key in a record can be accessed via the `rowkey` function listed in the [row-functions](https://docs.deltastream.io/reference/sql-syntax/query/functions/row-functions "mention").

You can define the row key for a relation by adding certain properties to the relation’s DDL statement in the `WITH` clause:

* `key.format`: This is the data format for the row key (that is, how key bytes are serialized) in the messages coming from the relation’s topic. Supported formats for a row key are:
  * `primitive`
  * `json`
  * `protobuf`
  * `avro`
* `key.type`: This defines the structure of the row key’s fields when reading records from a source topic:
  * For a row key with a `primitive` format, the `key.type` defines the data type of key values. Valid data types for a primitive row key are
    * `SMALLINT`
    * `INTEGER`
    * `BIGINT`
    * `DOUBLE`
    * `VARCHAR`.
  * For a row key in the `json`, `avro`, or `protobuf` format, the `key.type` is a `struct` whose fields define the names and data types of the key’s fields.
* `key.columns` : This defines the name(s) of the value columns, separated by commas, that are used to construct the row key when writing to a sink. Note that when `key.columns` is set, `key.type` property is inferred from it. You cannot set `key.type` and `key.columns` together when you define a stream or changelog.
  * For a non-primitive `key.format`, the record key is created as a `STRUCT` whose fields are the columns listed in this property.
  * For a primitive `key.format`, this property must contain exactly one column with a primitive data type.
* `value.columns.exclude` : This defines the name(s) of the columns, separated by commas, that should be excluded from the record’s value and included only in its key.
  * You can only set this property if you have already defined `key.columns` .
  * The excluded columns must appear at the end of the relation’s column list and must also be listed in `key.columns`.
* `key.descriptor.name`: When you define a relation with a row key in the `protobuf` format, you must upload a [descriptor](https://github.com/deltastreaminc/ds-docs-gitbook/blob/main-dcap/reference/sql-syntax/broken-reference/README.md) for the key to DeltaStream prior to running the DDL statement. This descriptor must be associated with the relation’s [#entity](https://docs.deltastream.io/overview/core-concepts/store#entity "mention"). In the DDL statement, the `key.descriptor.name` parameter defines the name of the `protobuf` message descriptor in the descriptor source, which should be used for reading and writing records’ row keys.

### Examples

The following DDL statement defines a stream that has a `primitive` row key of the `VARCHAR` type:

```sql
CREATE STREAM pageviews (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR
)
WITH (
   'topic'='pageviews',
   'value.format'='json',
   'key.format'='primitive',
   'key.type'='VARCHAR'
);
```

The following DDL statement defines a stream that has a row key in the `json` format. The row key in each record can be accessed via the `rowkey` [function](https://docs.deltastream.io/reference/sql-syntax/query/functions/row-functions). It is a `struct` that has a field named `id` of the `INTEGER` data type:

```sql
CREATE STREAM orders (
   order_time BIGINT,
   item_name VARCHAR,
   price BIGINT
)
WITH (
   'topic'='sales',
   'value.format'='json',
   'key.format'='json',
   'key.type'='STRUCT<`id` INTEGER>'
);
```

The following DDL statement defines a stream whose row key is set using the value of the `userid` column. Given that the key is in the `json` format, the row key value is captured as `STRUCT<userid VARCHAR>.`

```sql
CREATE STREAM visits (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR
)
WITH (
   'topic'='pageviews',
   'value.format'='json',
   'key.format'='json',
   'key.columns'='userid'
);
```

The following DDL statement defines a stream that has a row key in the `protobuf` format. The row key in each record has 2 fields:

1. an `INTEGER` field named `id`
2. a `VARCHAR` field named `region`.

A protobuf descriptor for the row key is defined with the name `customer_key_msg` within the descriptor source of the relation’s topic.

```sql
CREATE STREAM customers (
   name VARCHAR,
   address VARCHAR,
   acct_balance INTEGER
)
WITH (
   'topic' = 'customers',
   'value.format' = 'protobuf',
   'key.format' = 'protobuf',
   'key.type' = 'STRUCT<id INTEGER, region VARCHAR>',
   'value.descriptor.name' = 'customer_value_msg',
   'key.descriptor.name' = 'customer_key_msg'
);
```

The following DDL statement defines a changelog whose row key is set as `STRUCT<currency VARCHAR, district VARCHAR>` using the value of the `currency` and `district` columns. Moreover, `district` is set as `value.columns.exclude` which means it only appears in the row key and not value. Therefore, the `rates` changelog has 3 value columns:

1. `ts`
2. `currency`
3. `conversion_rate`

```sql
CREATE CHANGELOG rates (
   ts BIGINT,
   currency VARCHAR,
   conversion_rate DECIMAL(8, 2),
   district VARCHAR,
   PRIMARY KEY(currency)
)
WITH (
   'topic'='exchanges',
   'value.format'='json',
   'key.format'='json',
   'key.columns'='currency,district',
   'value.columns.exclude'='district'
);
```

## Row Key in `CREATE AS SELECT` Statements

In addition to a DDL statement, you can define a new stream or changelog using a `CREATE AS SELECT​` (CAS) statement in [create-stream-as](https://docs.deltastream.io/reference/sql-syntax/query/create-stream-as "mention") and [create-changelog-as](https://docs.deltastream.io/reference/sql-syntax/query/create-changelog-as "mention"), respectively. In a CAS statement with a [select](https://docs.deltastream.io/reference/sql-syntax/query/select "mention") that does not have a row key definition property set for the sink, the sink relation also won't have a row key, irrespective of whether the source relation(s) have a row key definition.

You can set the below properties as sink properties to define the row key for the new object that you used CAS to create:

* `key.format`
* `key.columns`
* `value.columns.exclude`

Note that when running CAS, you can specify the `key.format` for the new object (sink) in the sink’s `WITH` clause. If you have not defined a `key.format` for a sink that has key properties, the `value.format` is used as the `key.format`.

For example, the below CSAS statement creates a new stream, `pagevisits`, whose row key is in `avro` format and is generated using the value of the `userid` column.

```sql
CREATE STREAM
    pagevisits
WITH ('key.format'='avro', 'key.columns'='userid')
AS
    SELECT userid, pageid
    FROM pageviews
    WHERE userid != 'User_2';
```

The below query runs grouping and aggregation on `pageviews.` . It uses a tumbling window to create a new changelog `visits_rate`. The row key for `visits_rate` is defined using sink properties. Since no `key.format` is set, the `value.format` for `visits_rate` is used as the `key.format`.

```sql
CREATE CHANGELOG
    visits_rate WITH ('key.columns'='window_start,window_end,userid');
AS
    SELECT window_start, window_end, userid, count(pageid) AS page_count
    FROM TUMBLE(pageviews, size 5 second)
    GROUP BY window_start, window_end, userid;
```

In the next example, an interval join on two streams — `pageviews` and `users` — creates a new stream named `pvusers`. Each record in `pvusers` has a row key with 2 fields:

1. `use`
2. `city`

Further, given that `city` is in `value.columns.exclude` it only displays in the row key. It is not included in the value columns.

```sql
CREATE STREAM
    pvusers WITH ('key.format'='json','key.columns'='userid,city','value.columns.exclude'='city')
AS
    SELECT u.registertime AS ts, 
           p.userid,
           p.pageid,
           u.contactinfo->city AS city
    FROM pageviews p JOIN "users" u WITHIN 5 minutes
    ON p.userid = u.userid;
```
