# Row Metadata Functions

## Description

DeltaStream’s row metadata functions are used to access extra information about a record, beside the value columns. Each record in a relation is created according to a message, read from the relation’s source [#entity](https://docs.deltastream.io/overview/core-concepts/store#entity "mention"). DeltaStream extracts some information from each message and makes the below information available for each corresponding record:

* **Row timestamp**: Each record has an associated timestamp that is set based on its original source message’s timestamp. The row timestamp value is of the `BIGINT` data type.
* **Row key**: If a [relation-key-definition](https://docs.deltastream.io/reference/sql-syntax/relation-key-definition "mention") is provided for a source [#\_stream](https://docs.deltastream.io/overview/core-concepts/databases#_stream "mention") or [#\_changelog](https://docs.deltastream.io/overview/core-concepts/databases#_changelog "mention"), backed by a Kafka topic, then its records will have row keys. For a given record, row key is set according to its source Kafka message's key. The `key.format` and `key.type` parameters in the source relation’s definition are used for this purpose. Check [relation-key-definition](https://docs.deltastream.io/reference/sql-syntax/relation-key-definition "mention") for more details.
* **Row metadata**: Each record has a number of associated metadata fields. The names and data types of metadata fields are store dependent, and they are extracted from the original source message for the record. You can see the details about them for each supported store type in the below table. For each record, the row metadata is available as a `STRUCT` instance that encapsulates all metadata fields for that record.

| Store Type | Row Metadata Items                                                                                                                                                                                                                                                                                                                                                                                       |
| ---------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Kafka      | <ul><li><code>topic (VARCHAR)</code>: Entity’s name for the record’s relation.</li><li><code>partition (INTEGER)</code> : Identifier of the Kafka partition that the record is stored in.</li><li><code>offset (BIGINT)</code> : Offset of the record in its Kafka partition.</li><li><code>timestamp\_type (VARCHAR)</code> : Type of the record timestamp, assigned by Kafka.</li></ul>                |
| Kinesis    | <ul><li><code>stream (VARCHAR)</code>: Name of the Kinesis stream that stores the record.</li><li><code>partition\_key (VARCHAR)</code> : Kinesis partition key for the record.</li><li><code>shard\_id (VARCHAR)</code> : Identifier of the Kinesis shard that the record is stored in.</li><li><code>sequence\_number (VARCHAR)</code> : Sequence number of the record in its Kinesis shard.</li></ul> |

There are three row metadata functions available to access the above information about a record:

| Function             | Description                                                                                                                                                                                                                                         |
| -------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `record_timestamp()` | Returns the value of the row timestamp for each record.                                                                                                                                                                                             |
| `record_key()`       | Returns the row key for the record (if a [relation-key-definition](https://docs.deltastream.io/reference/sql-syntax/relation-key-definition "mention") is provided for the source relation, backed by a Kafka topic). Otherwise, it returns `NULL`. |
| `record_metadata()`  | Returns a value of the `STRUCT` data type that contains the row metadata items for each record.                                                                                                                                                     |

Row metadata functions can be used similarly to the [built-in-functions](https://docs.deltastream.io/reference/sql-syntax/query/functions/built-in-functions "mention") by simply calling their names. They can appear in the `SELECT`, `WHERE`, and `GROUP BY` clauses of a [select](https://docs.deltastream.io/reference/sql-syntax/query/select "mention") statement. A row metadata function can be called with or without an argument. Depending on the query, it is used in one of the following ways:

* If the `FROM` clause of the query is referring to only one relation, a row metadata function call does not need an argument.
* If the `FROM` clause of the query is referring to more than one relation (for example, it is a `JOIN`), then a row metadata function call needs the alias or name of the Relation it is referring to as its argument. This is required to resolve the ambiguity in the function call. When multiple Relations are referred in the context of a given query, the metadata information, as explained above, is available for records in each Relation. The Relation name or alias, added as the argument to a given row metadata function call, clarifies which source Relation the function call is referring to.

## Examples

In the following examples, assume two streams named `pageviews` and `users` have been created using the below DDL statements, both on topics stored in Kafka stores. Note that the `users` definition includes a row key. Therefore, each record in the `users` Stream has a row key, and according to the key definition, its value is a `STRUCT` with a single field, called `userid`, and of the `VARCHAR` data type.

```sql
CREATE STREAM pageviews (
  viewtime BIGINT,
  userid VARCHAR,
  pageid VARCHAR
) WITH (
  'topic' = 'pageviews',
  'value.format' = 'json'
);
```

```sql
CREATE STREAM "users" (
  registertime BIGINT,
  userid VARCHAR,
  regionid VARCHAR,
  gender VARCHAR,
  interests ARRAY<VARCHAR>,
  contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>
 ) WITH (
   'topic'='users',
   'value.format'='json',
   'key.type'='STRUCT<userid VARCHAR>',
   'key.format'='json'
 );
```

#### Row metadata function calls with select and filter

In the below example, the query uses the `record_timestamp` and `record_metadata` row metadata functions to access each record’s row timestamp and row metadata. Moreover, since row metadata is of the `STRUCT` data type, the query uses the `->` operator to access row metadata items inside this `STRUCT`. Note that the row metadata function calls in the `SELECT` clause extract the extra information from each record and add them as value columns to the query’s result. The row metadata function call in the `WHERE` clause applies filtering on the input records based on their partition values.

```sql
SELECT userid,
       record_timestamp() AS row_ts,
       record_metadata()->partition AS row_partition,
       record_metadata()->offset AS row_offset
FROM pageviews
WHERE record_metadata()->partition = 0;
```

#### Row metadata function call to access the row key

The `users` stream has a [row key](https://docs.deltastream.io/reference/sql-syntax/relation-key-definition) (see the DDL statement above). Therefore, the below query uses the `record_key` row metadata function to extract the value of this key in each record. Given that the row key for `users` is a `STRUCT` with one field called `userid`, you can use the `record_key()->userid` expression in the `SELECT` clause to extract the value of this field inside the key’s `STRUCT`. Further, since the `record_metadata` row function returns a `STRUCT`, the data type of the `meta` column in the query result is also a `STRUCT`. This struct contains all the row metadata items, extracted from the source object, for each record.

```sql
SELECT contactinfo->phone AS phone,
       record_timestamp() AS row_ts,
       record_key()->userid AS uid,
       record_metadata() as meta
FROM "users";
```

#### Row metadata function call with grouping

The below query creates a new [#\_changelog](https://docs.deltastream.io/overview/core-concepts/databases#_changelog "mention") called `userlogs`. To do this, it runs `GROUP BY` and aggregates on the `users` stream, employing a hopping window. Note that the row metadata function calls are used in both `SELECT` and `GROUP BY` clauses. In the `SELECT` clause, the query counts the number of rows’ offsets in each group; each group forms according to a hopping window’s start and end times, along with the value of the `userid` field, extracted from the row key.

```sql
CREATE CHANGELOG userslogs AS
SELECT window_start,
       window_end,
       record_key()->userid AS uid,
       count(record_metadata()->offset) AS offset_cnt,
       count(contactinfo->zipcode) AS zip_cnt
FROM HOP(users, size 6 second, advance by 3 seconds)
GROUP BY window_start, window_end, record_key()->userid;
```

#### Row metadata function call with join

The below query runs an interval join between the `pageviews` and `users` streams. It uses row metadata function calls to access the row timestamp, row key, and row metadata in the records from both streams. Given that two objects are referred in this query, each row metadata function call requires the name or alias of the object it is referring to as its argument. The argument value can be the alias defined for that object in the `FROM` clause (`p` for `pageviews` and `u` for `users` in this query), or it can be the name of the object. An object name can be specified alone (for example,`pageviews`), or it can be specified as a fully- or partially-qualified name by specifying the `database_name` and/or `schema_name` in the format `[<database_name>.<schema_name>.]<relation_name>` (for example `db1.public.pageviews)`.

```sql
SELECT p.userid AS pid, 
       record_key(u)->userid AS u_key_uid,
       record_timestamp(p) AS pv_time,
       record_timestamp(u) AS u_time,
       record_metadata(pageviews)->offset AS p_offset,
       record_metadata(users)->offset AS u_offset
FROM pageviews p JOIN users u
WITHIN 1 minute
ON u.userid = p.userid;
```
