LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Region
      • SQL
      • Store
      • Database
      • Query
      • Visualizing Data Lineage
      • Function
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • Tutorials
    • Managing Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Creating Stores for Streaming Data
    • Using Multiple Stores in Queries
    • Creating Relations to Structure Raw Data
    • Namespacing with Database and Schema
    • Creating and Querying Materialized Views
    • Creating a Function
    • Securing Your Connections to Data Stores
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Integrations
      • Connecting to Confluent Cloud
      • Databricks
      • PostgreSQL
      • Snowflake
      • WarpStream
    • Serialization
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Reference
    • Enterprise Security Integrations
      • Okta SAML Integration
      • Okta SCIM Integration
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST REGIONS
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • Rest API
Powered by GitBook
On this page
  • Description
  • Examples
  1. Reference
  2. SQL Syntax
  3. Query
  4. Function

Row Metadata Functions

PreviousBuilt-in FunctionsNextINSERT INTO

Last updated 5 months ago

Description

DeltaStream’s row metadata functions are used to access extra information about a record, beside the value columns. Each record in a relation is created according to a message, read from the relation’s source . DeltaStream extracts some information from each message and makes the below information available for each corresponding record:

  • Row timestamp: Each record has an associated timestamp that is set based on its original source message’s timestamp. The row timestamp value is of the BIGINT data type.

  • Row key: If a Row Key Definition is provided for a source or , backed by a Kafka topic, then its records will have row keys. For a given record, row key is set according to its source Kafka message's key. The key.format and key.type parameters in the source relation’s definition are used for this purpose. Check Row Key Definition for more details.

  • Row metadata: Each record has a number of associated metadata fields. The names and data types of metadata fields are store dependent, and they are extracted from the original source message for the record. You can see the details about them for each supported store type in the below table. For each record, the row metadata is available as a STRUCT instance that encapsulates all metadata fields for that record.

Store Type
Row Metadata Items

Kafka

  • topic (VARCHAR): Entity’s name for the record’s relation.

  • partition (INTEGER) : Identifier of the Kafka partition that the record is stored in.

  • offset (BIGINT) : Offset of the record in its Kafka partition.

  • timestamp_type (VARCHAR) : Type of the record timestamp, assigned by Kafka.

Kinesis

  • stream (VARCHAR): Name of the Kinesis stream that stores the record.

  • partition_key (VARCHAR) : Kinesis partition key for the record.

  • shard_id (VARCHAR) : Identifier of the Kinesis shard that the record is stored in.

  • sequence_number (VARCHAR) : Sequence number of the record in its Kinesis shard.

There are three row metadata functions available to access the above information about a record:

Function
Description

rowtime()

Returns the value of the row timestamp for each record.

rowkey()

rowmeta()

Returns a value of the STRUCT data type that contains the row metadata items for each record.

Row metadata functions can be used similarly to the Built-in Functions by simply calling their names. They can appear in the SELECT, WHERE, and GROUP BY clauses of a SELECT statement. A row metadata function can be called with or without an argument. Depending on the query, it is used in one of the following ways:

  • If the FROM clause of the query is referring to only one relation, a row metadata function call does not need an argument.

  • If the FROM clause of the query is referring to more than one relation (for example, it is a JOIN), then a row metadata function call needs the alias or name of the Relation it is referring to as its argument. This is required to resolve the ambiguity in the function call. When multiple Relations are referred in the context of a given query, the metadata information, as explained above, is available for records in each Relation. The Relation name or alias, added as the argument to a given row metadata function call, clarifies which source Relation the function call is referring to.

Examples

In the following examples, assume two streams named pageviews and users have been created using the below DDL statements, both on topics stored in Kafka stores. Note that the users definition includes a row key. Therefore, each record in the users Stream has a row key, and according to the key definition, its value is a STRUCT with a single field, called userid, and of the VARCHAR data type.

CREATE STREAM pageviews (
  viewtime BIGINT,
  userid VARCHAR,
  pageid VARCHAR
) WITH (
  'topic' = 'pageviews',
  'value.format' = 'json'
);
CREATE STREAM "users" (
  registertime BIGINT,
  userid VARCHAR,
  regionid VARCHAR,
  gender VARCHAR,
  interests ARRAY<VARCHAR>,
  contactinfo STRUCT<phone VARCHAR, city VARCHAR, "state" VARCHAR, zipcode VARCHAR>
 ) WITH (
   'topic'='users',
   'value.format'='json',
   'key.type'='STRUCT<userid VARCHAR>',
   'key.format'='json'
 );

Row metadata function calls with select and filter

In the below example, the query uses the rowtime and rowmeta row metadata functions to access each record’s row timestamp and row metadata. Moreover, since row metadata is of the STRUCT data type, the query uses the -> operator to access row metadata items inside this STRUCT. Note that the row metadata function calls in the SELECT clause are used to extract the extra information from each record and add them as value columns to the query’s result. The row metadata function call in the WHERE clause is used to apply filtering on the input records based on their partition values.

SELECT userid,
       rowtime() AS row_ts,
       rowmeta()->partition AS row_partition,
       rowmeta()->offset AS row_offset
FROM pageviews
WHERE rowmeta()->partition = 0;

Row metadata function call to access the row key

SELECT contactinfo->phone AS phone,
       rowtime() AS row_ts,
       rowkey()->userid AS uid,
       rowmeta() as meta
FROM "users";

Row metadata function call with grouping

CREATE CHANGELOG userslogs AS
SELECT window_start,
       window_end,
       rowkey()->userid AS uid,
       count(rowmeta()->offset) AS offset_cnt,
       count(contactinfo->zipcode) AS zip_cnt
FROM HOP(users, size 6 second, advance by 3 seconds)
GROUP BY window_start, window_end, rowkey()->userid;

Row metadata function call with join

The below query runs an interval join between the pageviews and users streams. It uses row metadata function calls to access the row timestamp, row key, and row metadata in the records from both streams. Given that two Relations are referred in this query, each row metadata function call requires the name or alias of the Relation it is referring to as its argument. Note that the argument value can be the alias that is defined for that Relation in the FROM clause (p for pageviews and u for users in this query), or it can be the name of the Relation. A Relation name can be specified alone (e.g. pageviews), or it can be specified as a fully or partially qualified name via specifying the database_name and/or schema_name in the format [<database_name>.<schema_name>.]<relation_name> (e.g. db1.public.pageviews).

SELECT p.userid AS pid, 
       rowkey(u)->userid AS u_key_uid,
       rowtime(p) AS pv_time,
       rowtime(u) AS u_time,
       rowmeta(pageviews)->offset AS p_offset,
       rowmeta(users)->offset AS u_offset
FROM pageviews p JOIN users u
WITHIN 1 minute
ON u.userid = p.userid;

Returns the row key for the record (if a is provided for the source relation, backed by a Kafka topic). Otherwise, it returns NULL.

The users Stream has a (see the DDL statement above). Therefore, the below query uses the rowkey row metadata function to extract the value of this key in each record. Given that the row key for users is a STRUCT with one field called userid, the rowkey()->userid expression in the SELECT clause is used to extract the value of this field inside the key’s STRUCT. Moreover, since the rowmeta row function returns a STRUCT, the data type of the meta column in the query’s result is a STRUCT that contains all the row metadata items, extracted from the source Relation, for each record.

The below query creates a new called userlogs by running GROUP BY and aggregation on the users Stream using a hopping window. Note that the row metadata function calls are used in both SELECT and GROUP BY clauses. In the SELECT clause, the query counts the number of rows’ offsets in each group, while each group is formed according to a hopping window’s start and end times along with the value of the userid field, extracted from the row key.

row key
Row Key Definition
#entity
#_stream
#_changelog
#_changelog