LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Region
      • SQL
      • Store
      • Database
      • Query
      • Visualizing Data Lineage
      • Function
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • Tutorials
    • Managing Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Creating Stores for Streaming Data
    • Using Multiple Stores in Queries
    • Creating Relations to Structure Raw Data
    • Namespacing with Database and Schema
    • Creating and Querying Materialized Views
    • Creating a Function
    • Securing Your Connections to Data Stores
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Integrations
      • Connecting to Confluent Cloud
      • Databricks
      • PostgreSQL
      • Snowflake
      • WarpStream
    • Serialization
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Reference
    • Enterprise Security Integrations
      • Okta SAML Integration
      • Okta SCIM Integration
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST REGIONS
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • Rest API
Powered by GitBook
On this page
  • Syntax
  • Description
  • Arguments
  • Entity Parameters
  • Kafka-Specific Entity Parameters
  • Kinesis-Specific Entity Parameters
  • Examples
  1. Reference
  2. SQL Syntax
  3. DDL

CREATE ENTITY

PreviousCREATE DESCRIPTOR_SOURCENextCREATE FUNCTION_SOURCE

Last updated 25 days ago

Syntax

CREATE ENTITY fully_qualified_entity_name
[IN STORE store_name]
[WITH (entity_parameter = value [, ...])];

Description

This command creates a new entity supported by a Store. Use these entities to host s created through DDL or Query.

To list the entities created by this command, use LIST ENTITIES.

Arguments

fully_qualified_entity_name

The full name of the entity to create.

IN STORE store_name

Optionally, this creates the entity in the specified store. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lowercase name.

WITH (entity_parameter = value [, …​])

This clause specifies Entity Parameters.

Entity Parameters

Parameter Name
Description

key.descriptor

value.descriptor

Kafka-Specific Entity Parameters

Parameters to be used if the associated Store is type KAFKA:

Parameter Name
Description

topic.partitions

The number of partitions to use when creating the entity.

Required: No Default value: 1 Type: Integer Valid values: [1,…]

topic.replicas

The number of replicas to use when creating the entity.

Required: No Default value: 1 Type: Integer Valid values: [1,…]

kafka.topic.*

Kinesis-Specific Entity Parameters

Parameters to be used if the associated Store is type KINESIS:

Parameter Name
Description

kinesis.shards

The number of shards to use when creating the entity.

Required: No Default value: 1 Type: Integer Valid values: [1,…]

Examples

Create a new Kafka topic with defaults

The following creates an entity called pv using the default parameters in your default store:

demodb.public/demostore# CREATE ENTITY pv;
+--------+-------+----------+------------------------------------------+
|  Type  |  Name |  Command |  Summary                                 |
+========+=======+==========+==========================================+
| entity | pv    | CREATE   | entity "pv" was successfully created in  |
|        |       |          | store "demostore"                        |
+--------+-------+----------+------------------------------------------+
demodb.public/demostore# LIST ENTITIES;
+--------------+----------+
|  Name        |  Is Leaf |
+==============+==========+
| pv_pb        | true     |
+--------------+----------+
| pv           | true     |
+--------------+----------+

Create a new Kafka topic with passthrough configuration for retention

CREATE ENTITY customers WITH (
  'store' = 'kafka_store', 
  'topic.partitions' = 1, 
  'topic.replicas' = 2, 
  'kafka.topic.retention.ms' = '172800000');

Create a new Kafka topic with additional topic configuration

The following creates an entity called pv_compact. It also overrides the default partitions, replicas, and cleanup.policy configuration of the Kafka topic in your default store:

demodb.public/demostore# CREATE ENTITY pv_compact WITH ( 'kafka.partitions' = 2, 'kafka.replicas' = 1, 'kafka.topic.cleanup.policy' = 'compact');
+--------+------------+----------+------------------------------------------+
|  Type  |  Name      |  Command |  Summary                                 |
+========+============+==========+==========================================+
| entity | pv_compact | CREATE   | entity "pv_compact" was successfully     |
|        |            |          | created in store "demostore"             |
+--------+------------+----------+------------------------------------------+
demodb.public/demostore# DESCRIBE ENTITY pv_compact;
+------------+-------------+-----------+-----------------+-------------------+------------------------------------------+
|  Name      |  Partitions |  Replicas |  Key Descriptor |  Value Descriptor |  Configs                                 |
+============+=============+===========+=================+===================+==========================================+
| pv_compact | 2           | 1         | <null>          | <null>            | { "cleanup.policy": "compact" }          |
+------------+-------------+-----------+-----------------+-------------------+------------------------------------------+

Create a new Kafka topic with key and value ProtoBuf Descriptors

The following creates an entity called pageviews_pb in your default store. It also sets the key and value descriptors necessary for serializing its records:

demodb.public/demostore# LIST DESCRIPTORS;
+-----------+--------------+----------+----------+-------------------------------+-------------------------------+
|  Name     |  Source Name |  Type    |  Owner   |  Created At                   |  Updated At                   |
+===========+==============+==========+==========+===============================+===============================+
| Pageviews | pb_value     | protobuf | sysadmin | 2024-07-16 18:30:39 +0000 UTC | 2024-07-16 18:30:39 +0000 UTC |
+-----------+--------------+----------+----------+-------------------------------+-------------------------------+
| Pageviews | pb_key       | protobuf | sysadmin | 2024-07-16 18:30:31 +0000 UTC | 2024-07-16 18:30:31 +0000 UTC |
| Key       |              |          |          |                               |                               |
+-----------+--------------+----------+----------+-------------------------------+-------------------------------+
demodb.public/demostore# CREATE ENTITY pv_pb WITH ( 'key.descriptor' = pb_key."PageviewsKey", 'value.descriptor' = pb_value."Pageviews");
+--------+-------+----------+------------------------------------------+
|  Type  |  Name |  Command |  Summary                                 |
+========+=======+==========+==========================================+
| entity | pv_pb | CREATE   | entity "pv_pb" was successfully created  |
|        |       |          | in store "demostore"                     |
+--------+-------+----------+------------------------------------------+
demodb.public/demostore# DESCRIBE ENTITY pv_pb;
+-------+-------------+-----------+---------------------+--------------------+-----------------------------------+
|  Name |  Partitions |  Replicas |  Key Descriptor     |  Value Descriptor  |  Configs                          |
+=======+=============+===========+=====================+====================+===================================+
| pv_pb | 1           | 1         | pb_key.PageviewsKey | pb_value.Pageviews | { "segment.bytes": "1073741824" } |
+-------+-------------+-----------+---------------------+--------------------+-----------------------------------+

Create a new entity in Kinesis Store with Kinesis parameters

The following creates an entity called pv_kinesis in the store named kinesis_store with 3 shards:

demodb.public/demostore# CREATE ENTITY pv_kinesis IN STORE kinesis_store WITH ('kinesis.shards' = 3);
+--------+------------+----------+-----------------------------------------------+
|  Type  |  Name      |  Command |  Summary                                      |
+========+============+==========+===============================================+
| entity | pv_kinesis | CREATE   | entity "pv_kinesis" was successfully created  |
|        |            |          | in store "kinesis_store"                      |
+--------+------------+----------+-----------------------------------------------+

Create a Snowflake database

db.public/sfstore# CREATE ENTITY "DELTA_STREAMING";
+------------+-----------------+------------+------------------------------------------+
|  Type      |  Name           |  Command   |  Summary                                 |
+============+=================+============+==========================================+
| entity     | DELTA_STREAMING | CREATE     | entity DELTA_STREAMING was successfully  |
|            |                 |            | created in store sfstore                 |
+------------+-----------------+------------+------------------------------------------+
db.public/sfstore# LIST ENTITIES;
+-----------------+------------+
|  Name           |  Is Leaf   |
+=================+============+
| DELTA_STREAMING | false      |
+-----------------+------------+
| FLINK_STREAMING | false      |
+-----------------+------------+

Create a Snowflake schema in a database

In this example, you create a new schema within the existing DELTA_STREAMING Snowflake database:

db.public/sfstore# CREATE ENTITY "DELTA_STREAMING"."MY_STREAMING_SCHEMA";
+------------+-------------------------------------+------------+------------------------------------------+
|  Type      |  Name                               |  Command   |  Summary                                 |
+============+=====================================+============+==========================================+
| entity     | DELTA_STREAMING.MY_STREAMING_SCHEMA | CREATE     | entity                                   |
|            |                                     |            | DELTA_STREAMING.MY_STREAMING_SCHEMA was  |
|            |                                     |            | successfully created in store            |
|            |                                     |            | sfstore                                  |
+------------+-------------------------------------+------------+------------------------------------------+
db.public/sfstore# LIST ENTITIES IN "DELTA_STREAMING";
+---------------------+------------+
|  Name               |  Is Leaf   |
+=====================+============+
| MY_STREAMING_SCHEMA | false      |
+---------------------+------------+
| PUBLIC              | false      |
+---------------------+------------+              

Create a Databricks catalog

demodb.public/databricks_store# CREATE ENTITY cat1;
+------------+------------+------------+------------------------------------------+
|  Type      |  Name      |  Command   |  Summary                                 |
+============+============+============+==========================================+
| entity     | cat1       | CREATE     | entity cat1 was successfully created in  |
|            |            |            | store databricks_store                   |
+------------+------------+------------+------------------------------------------+
demodb.public/test_databricks_store# LIST ENTITIES;
+----------------+------------+
|  Name          |  Is Leaf   |
+================+============+
| cat1           | false      |
+----------------+------------+
| system         | false      |
+----------------+------------+

Create a Databricks schema in a catalog

In this example, you create a new schema within the existing DELTA_STREAMING Snowflake database:

demodb.public/databricks_store# CREATE ENTITY cat1.schema1;
+------------+--------------+------------+------------------------------------------+
|  Type      |  Name        |  Command   |  Summary                                 |
+============+==============+============+==========================================+
| entity     | cat1.schema1 | CREATE     | entity cat1.schema1 was successfully     |
|            |              |            | created in store databricks_store        |
+------------+--------------+------------+------------------------------------------+
demodb.public/databricks_store# LIST ENTITIES IN cat1;
+--------------------+------------+
|  Name              |  Is Leaf   |
+====================+============+
| default            | false      |
+--------------------+------------+
| information_schema | false      |
+--------------------+------------+
| schema1            | false      |
+--------------------+------------+

A qualified name used to decode a record's key, if applicable. Reset the descriptor by setting it to NULL.

Required: No Default value: None Type: String Valid values: See .

A qualified name used to decode a record's value. Reset the descriptor by setting it to NULL.

Required: No Default value: None Type: String Valid values: See .

A configuration specific for the topic being created — for example, . Required: No Default value: None Type: String Valid values: Kafka topic configuration specific to the underlying type.

LIST DESCRIPTORS
LIST DESCRIPTORS
Store
Kafka Entity Configuration for Confluent Platform
Relation
descriptor
descriptor