LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Region
      • SQL
      • Store
      • Database
      • Query
      • Visualizing Data Lineage
      • Function
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • Tutorials
    • Managing Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Creating Stores for Streaming Data
    • Using Multiple Stores in Queries
    • Creating Relations to Structure Raw Data
    • Namespacing with Database and Schema
    • Creating and Querying Materialized Views
    • Creating a Function
    • Securing Your Connections to Data Stores
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Integrations
      • Connecting to Confluent Cloud
      • Databricks
      • PostgreSQL
      • Snowflake
      • WarpStream
    • Serialization
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Reference
    • Enterprise Security Integrations
      • Okta SAML Integration
      • Okta SCIM Integration
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST REGIONS
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • Rest API
Powered by GitBook
On this page
  • Define a Row Key in a DDL statement
  • Examples
  • Row Key in CREATE AS SELECT Statements
  • CAS with Simple SELECT
  • CCAS with GROUP BY
  • CSAS with JOIN
  1. Reference
  2. SQL Syntax

Row Key Definition

PreviousSTOP SANDBOXNextRest API

Last updated 5 months ago

Define a Row Key in a DDL statement

Any or can have a according to the messages in its . Row key definition in a DDL statement is optional and currently is only available for relations that are backed by a Kafka topic. If a row key is defined for a relation, source Kafka messages’ keys are used to set the row keys in the relation’s records. The row key in a record can be accessed via the rowkey function listed in the Row Metadata Functions.

You can define the row key for a relation by adding certain properties to the relation’s DDL statement in the WITH clause:

  • key.format: This is the data format for the row key (that is, how key bytes are serialized) in the messages coming from the relation’s topic. Supported formats for a row key are:

    • primitive

    • json

    • protobuf

    • avro

  • key.type: This defines the structure of the row key’s fields:

    • For a row key with a primitive format, the key.type defines the data type of key values. Valid data types for a primitive row key are

      • SMALLINT

      • INTEGER

      • BIGINT

      • DOUBLE

      • VARCHAR.

    • For a row key in the json, avro, or protobuf format, the key.type is a struct whose fields define the names and data types of the key’s fields.

  • key.descriptor.name: When you define a relation with a row key in the protobuf format, you must upload a for the key to DeltaStream prior to running the DDL statement. This descriptor must be associated with the relation’s . In the DDL statement, the key.descriptor.name parameter defines the name of the protobuf message descriptor in the descriptor source, which should be used for reading and writing records’ row keys.

Examples

The following DDL statement defines a stream that has a primitive row key of the VARCHAR type:

CREATE STREAM pageviews (
   viewtime BIGINT,
   userid VARCHAR,
   pageid VARCHAR
)
WITH (
   'topic'='pageviews',
   'value.format'='json',
   'key.format'='primitive',
   'key.type'='VARCHAR'
);
CREATE STREAM orders (
   order_time BIGINT,
   item_name VARCHAR,
   price BIGINT
)
WITH (
   'topic'='sales',
   'value.format'='json',
   'key.format'='json',
   'key.type'='"STRUCT<id INTEGER>"'
);

The following DDL statement defines a stream that has a row key in the protobuf format. The row key in each record has two fields

  1. an INTEGER field named id

  2. a VARCHAR field named region.

A protobuf descriptor for the row key is defined with the name customer_key_msg within the descriptor source of the relation’s topic.

CREATE STREAM customers (
   name VARCHAR,
   address VARCHAR,
   acct_balance INTEGER
)
WITH (
   'topic' = 'customers',
   'value.format' = 'protobuf',
   'key.format' = 'protobuf',
   'key.type' = 'STRUCT<id INTEGER, region VARCHAR>',
   'value.descriptor.name' = 'customer_value_msg',
   'key.descriptor.name' = 'customer_key_msg'
);

Row Key in CREATE AS SELECT Statements

CAS with Simple SELECT

In a CAS statement with a SELECT clause consisting of only projection and filter operators, using SELECT and WHERE clauses, respectively, the row key definition for the new relation is the same as the row key definition (if any) of the source.

As an example, the below CSAS statement creates a new pagevisits stream by filtering some records from the pageviews stream, which we defined above (see Examples). Given that pageviews records have row keys with the primitive format,pagevisits, records have row keys with a similar definition:

CREATE STREAM
    pagevisits
AS
    SELECT userid, pageid
    FROM pageviews
    WHERE userid != 'User_2';

When running CAS, you can specify the key.format for the new relation (sink) in the sink’s WITH clause. For example, the below CSAS statement is similar to the above one, except that it changes the row key format to json for the new pagevisits stream, while the source relation pageviews has its row key with the primitive format:

CREATE STREAM
    pagevisits
WITH ('key.format'='json')
AS
    SELECT userid, pageid
    FROM pageviews
    WHERE userid != 'User_2';

CCAS with GROUP BY

If the SELECT clause in a CREATE CHANGELOG AS SELECT statement has a GROUP BY clause, the row key for the new relation consists of columns in the GROUP BY clause.

The key.format for the new relation is the same as a source relation’s key.format. Similar to the example above, you can change the key.format using the sink’s WITH clause.

For example, the below query runs grouping and aggregation on pageviews using a tumbling window to create a new changelog visits_rate. The row key for visits_rate has three fields:

  1. window_start

  2. window_end

  3. userid

as they are the columns referred in the GROUP BY clause:

CREATE CHANGELOG
    visits_rate
AS
    SELECT window_start, window_end, userid, count(pageid) AS page_count
    FROM TUMBLE(pageviews, size 5 second)
    GROUP BY window_start, window_end, userid;

The below query creates a new changelog region_stats by running a grouping and aggregation on the source stream users. Each record in region_stats has a row key with a single field named location:

CREATE CHANGELOG
    region_stats
AS
    SELECT contactinfo->city AS location, count(userid) AS usr_cnt
    FROM "users"
    WHERE interests[2] != 'Game'
    GROUP BY contactinfo->city;

CSAS with JOIN

If the SELECT clause in a CREATE STREAM AS SELECT statement has a JOIN clause, the row key for the new relation consists of the column from the left relation in the JOIN criteria.

The key.format for the new relation is the same as the left source relation’s key.format. Similar to the example above, you can change the key.format using the sink’s WITH clause.

For example, the below query runs an interval join on two streams, pageviews and users, to create a new stream named pvusers. Each record in pvusers has a row key with one field, userid, as userid is the join column in pageviews, which is the left-side of the join, referred in the join criteria: p.userid = u.userid:

CREATE STREAM
    pvusers
AS
    SELECT p.userid, u.registertime
    FROM pageviews p JOIN "users" u WITHIN 5 minutes
    ON p.userid = u.userid
    WHERE p.userid != 'User_5';

The following DDL statement defines a stream that has a row key in the json format. The row key in each record can be accessed via the rowkey . It is a struct that has a field named id of the INTEGER data type:

In addition to a DDL statement, a new or can be defined using a CREATE AS SELECT​ (CAS) statement in CREATE STREAM AS SELECT and CREATE CHANGELOG AS SELECT, respectively. The query in the CAS statement has an impact on the row key definition of the new relation.

function
descriptor
#entity
#entity
#_stream
#_changelog
Row Key
#_stream
#_changelog