LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Region
      • SQL
      • Store
      • Database
      • Query
      • Visualizing Data Lineage
      • Function
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • Tutorials
    • Managing Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Creating Stores for Streaming Data
    • Using Multiple Stores in Queries
    • Creating Relations to Structure Raw Data
    • Namespacing with Database and Schema
    • Creating and Querying Materialized Views
    • Creating a Function
    • Securing Your Connections to Data Stores
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Integrations
      • Connecting to Confluent Cloud
      • Databricks
      • PostgreSQL
      • Snowflake
      • WarpStream
    • Serialization
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Reference
    • Enterprise Security Integrations
      • Okta SAML Integration
      • Okta SCIM Integration
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST REGIONS
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • Rest API
Powered by GitBook
On this page
  • Syntax
  • Description
  • Arguments
  • Examples
  1. Reference
  2. SQL Syntax
  3. Query

INSERT INTO

PreviousRow Metadata FunctionsNextMaterialized View

Last updated 9 months ago

Syntax

INSERT INTO
    relation_name
    select_statement
    [PARTITION BY partition_by_clause];

Description

For an existing Relation, INSERT INTO runs the given query (i.e. a SELECT statement) and adds its results into the sink Relation. The list of columns of the sink Relation and the SELECT columns list in the query should be data type compatible. Moreover, the Relation type of a sink Relation should match with the Relation type of query results. For example, the results of a query that uses grouping aggregation cannot be inserted into a as the result type for a query with GROUP BY as a .

INSERT INTO does not support MATERIALIZED VIEW as the sink Relation.

Arguments

relation_name

This specifies the name of the Relation to add results to. Relation names can be specified as fully/partially qualified names via the specifying database_name and/or schema_name in the format [<database_name>.<schema_name>.]<relation_name> (such as db1.public.pageviews). Otherwise, the current Database and Schema will be used to identify the Relation. For case-sensitive names, the name must be wrapped in double quotes; otherwise, the lowercase name will be used.

select_statement

This clause specifies the SELECT statement to run; see SELECT for more information.

PARTITION BY partition_by_clause

Optionally, this clause allows the user to set the partition key of records according to their values for a given set of columns. The PARTITION BY clause in the statement defines a list of one or more columns (separated by commas) as partitioning columns. By default, the key for the sink's records will have a data format equal to the sink's value data format. To set a specific key format, Set the key.format Stream parameter to specify a different key format. PARTITION BY is supported for CREATE STREAM AS SELECT and INSERT INTO queries where the sink is a Stream. Currently, PARTITION BY only applies for queries whose sink Stream is backed by a Kafka store.

When using PARTITION BY in an INSERT INTO query, the key type produced by the PARTITION BY clause must match the key type in the sink's output Stream.

Examples

Select all INSERT INTO

The following copies all data from the source Relation and inserts it into a preexisting Relation.

INSERT INTO pageviews2 SELECT * FROM pageviews;

INSERT INTO with grouping and aggregation

The following runs a query that finds the average ViewTime in a 5 second window and inserts the results into the already existing Relation Aggr Pageviews2.

INSERT INTO
  "Aggr Pageviews2" 
SELECT 
  window_start, 
  window_end, 
  avg("ViewTime") AS "AvgTime", 
  "UserID", 
  "pageId" 
FROM TUMBLE("CaseSensitivePageviews", size 5 second) 
GROUP BY 
  window_start, 
  window_end, 
  "UserID", 
  "pageId";

Combine multiple queries’ results with INSERT INTO

INSERT INTO can be used to combine the results of multiple queries into a single sink Relation, as long as:

  • Every query has the same sink Relation type.

  • The SELECT columns list in every query has the same number of columns, with similar data types, in the same order.

For example, assume two Changelogs are created from the users Stream to collect stats on the total number of users in different cities in Europe and the U.S.

CREATE CHANGELOG users_eu
AS SELECT contactinfo->city AS city, count(userid) AS ucount
FROM users
WHERE regionid = 'EUROPE'
GROUP BY contactinfo->city;

CREATE CHANGELOG users_us
AS SELECT contactinfo->city AS city, count(userid) AS ucount
FROM users
WHERE regionid = 'US'
GROUP BY contactinfo->city;

Moreover, assume we are interested in keeping track of cities in Europe or the U.S. with more than a thousand users, in a single Relation. We can create a third Changelog, named total_users, with the below DDL and use the following two INSERT INTO statements to combine results from the above Changelogs and add them to the total_users Changelog:

CREATE CHANGELOG total_users (
   city VARCHAR,
   total_cnt BIGINT,
   PRIMARY KEY(city)
)
WITH (
   'topic'='total_users',
   'value.format'='json'
);
INSERT INTO total_users
SELECT * FROM users_eu
WHERE ucount > 1000;
INSERT INTO total_users
SELECT * FROM users_us
WHERE ucount > 1000;

INSERT INTO with the PARTITION BY clause

The below DDL statements create two Streams, pageviews and keyed_pageviews. Notice that keyed_pageviews has the key.format and key.typeStream properties set while pageviews doesn't. These DDLs are followed by an INSERT INTO query which has a PARTITION BY clause that sets the key for the sink Stream. In order for the query to be accepted, the key type generated by the PARTITION BY clause must match the key.type specified from the CREATE STREAM keyed_pageviews DDL, which it does as shown in the example below.

CREATE STREAM pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR)
    WITH ('topic'='pageviews', 'value.format'='JSON');
CREATE STREAM keyed_pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR)
    WITH ('topic'='keyed_pageviews', 'value.format'='JSON', 'key.format'='JSON', 'key.type'='STRUCT<userid VARCHAR>');
// keyed_pageviews key.format defaults to value.format
INSERT INTO keyed_pageviews
    SELECT * FROM pageviews
    PARTITION BY userid;

Given this input for pageviews:

demodb.public/kafka_store# PRINT ENTITY pageviews;
+--------+-----------------------------------------------------------------+
| key    | value                                                           |
+========+=================================================================+
| <null> | {"viewtime":1690327704650,"userid":"User_9","pageid":"Page_11"} |
+--------+-----------------------------------------------------------------+
| <null> | {"viewtime":1690327705651,"userid":"User_6","pageid":"Page_94"} |
+--------+-----------------------------------------------------------------+

We can expect the following output in keyed_pageviews:

demodb.public/kafka_store# PRINT ENTITY keyed_pageviews;
+---------------------+-----------------------------------------------------------------+
| key                 | value                                                           |
+=====================+=================================================================+
| {"userid":"User_9"} | {"viewtime":1690327704650,"userid":"User_9","pageid":"Page_11"} |
+---------------------+-----------------------------------------------------------------+
| {"userid":"User_6"} | {"viewtime":1690327705651,"userid":"User_6","pageid":"Page_94"} |
+---------------------+-----------------------------------------------------------------+
#_stream
#_changelog