INSERT INTO

Syntax

INSERT INTO
    relation_name
    select_statement
    [PARTITION BY partition_by_clause];

Description

For an existing Relation, INSERT INTO runs the given query (i.e. a SELECT statement) and adds its results into the sink Relation. The list of columns of the sink Relation and the SELECT columns list in the query should be data type compatible. Moreover, the Relation type of a sink Relation should match with the Relation type of query results. For example, the results of a query that uses grouping aggregation cannot be inserted into a #_stream as the result type for a query with GROUP BY as a #_changelog.

INSERT INTO does not support MATERIALIZED VIEW as the sink Relation.

Arguments

relation_name

This specifies the name of the Relation to add results to. Relation names can be specified as fully/partially qualified names via the specifying database_name and/or schema_name in the format [<database_name>.<schema_name>.]<relation_name> (such as db1.public.pageviews). Otherwise, the current Database and Schema will be used to identify the Relation. For case-sensitive names, the name must be wrapped in double quotes; otherwise, the lowercase name will be used.

select_statement

This clause specifies the SELECT statement to run; see SELECT for more information.

PARTITION BY partition_by_clause

Optionally, this clause allows the user to set the partition key of records according to their values for a given set of columns. The PARTITION BY clause in the statement defines a list of one or more columns (separated by commas) as partitioning columns. By default, the key for the sink's records will have a data format equal to the sink's value data format. To set a specific key format, Set the key.format Stream parameter to specify a different key format. PARTITION BY is supported for CREATE STREAM AS SELECT and INSERT INTO queries where the sink is a Stream. Currently, PARTITION BY only applies for queries whose sink Stream is backed by a Kafka store.

When using PARTITION BY in an INSERT INTO query, the key type produced by the PARTITION BY clause must match the key type in the sink's output Stream.

Examples

Select all INSERT INTO

The following copies all data from the source Relation and inserts it into a preexisting Relation.

INSERT INTO pageviews2 SELECT * FROM pageviews;

INSERT INTO with grouping and aggregation

The following runs a query that finds the average ViewTime in a 5 second window and inserts the results into the already existing Relation Aggr Pageviews2.

INSERT INTO
  "Aggr Pageviews2" 
SELECT 
  window_start, 
  window_end, 
  avg("ViewTime") AS "AvgTime", 
  "UserID", 
  "pageId" 
FROM TUMBLE("CaseSensitivePageviews", size 5 second) 
GROUP BY 
  window_start, 
  window_end, 
  "UserID", 
  "pageId";

Combine multiple queries’ results with INSERT INTO

INSERT INTO can be used to combine the results of multiple queries into a single sink Relation, as long as:

  • Every query has the same sink Relation type.

  • The SELECT columns list in every query has the same number of columns, with similar data types, in the same order.

For example, assume two Changelogs are created from the users Stream to collect stats on the total number of users in different cities in Europe and the U.S.

CREATE CHANGELOG users_eu
AS SELECT contactinfo->city AS city, count(userid) AS ucount
FROM users
WHERE regionid = 'EUROPE'
GROUP BY contactinfo->city;

CREATE CHANGELOG users_us
AS SELECT contactinfo->city AS city, count(userid) AS ucount
FROM users
WHERE regionid = 'US'
GROUP BY contactinfo->city;

Moreover, assume we are interested in keeping track of cities in Europe or the U.S. with more than a thousand users, in a single Relation. We can create a third Changelog, named total_users, with the below DDL and use the following two INSERT INTO statements to combine results from the above Changelogs and add them to the total_users Changelog:

CREATE CHANGELOG total_users (
   city VARCHAR,
   total_cnt BIGINT,
   PRIMARY KEY(city)
)
WITH (
   'topic'='total_users',
   'value.format'='json'
);
INSERT INTO total_users
SELECT * FROM users_eu
WHERE ucount > 1000;
INSERT INTO total_users
SELECT * FROM users_us
WHERE ucount > 1000;

INSERT INTO with the PARTITION BY clause

The below DDL statements create two Streams, pageviews and keyed_pageviews. Notice that keyed_pageviews has the key.format and key.typeStream properties set while pageviews doesn't. These DDLs are followed by an INSERT INTO query which has a PARTITION BY clause that sets the key for the sink Stream. In order for the query to be accepted, the key type generated by the PARTITION BY clause must match the key.type specified from the CREATE STREAM keyed_pageviews DDL, which it does as shown in the example below.

CREATE STREAM pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR)
    WITH ('topic'='pageviews', 'value.format'='JSON');
CREATE STREAM keyed_pageviews (viewtime BIGINT, userid VARCHAR, pageid VARCHAR)
    WITH ('topic'='keyed_pageviews', 'value.format'='JSON', 'key.format'='JSON', 'key.type'='STRUCT<userid VARCHAR>');
// keyed_pageviews key.format defaults to value.format
INSERT INTO keyed_pageviews
    SELECT * FROM pageviews
    PARTITION BY userid;

Given this input for pageviews:

demodb.public/kafka_store# PRINT ENTITY pageviews;
+--------+-----------------------------------------------------------------+
| key    | value                                                           |
+========+=================================================================+
| <null> | {"viewtime":1690327704650,"userid":"User_9","pageid":"Page_11"} |
+--------+-----------------------------------------------------------------+
| <null> | {"viewtime":1690327705651,"userid":"User_6","pageid":"Page_94"} |
+--------+-----------------------------------------------------------------+

We can expect the following output in keyed_pageviews:

demodb.public/kafka_store# PRINT ENTITY keyed_pageviews;
+---------------------+-----------------------------------------------------------------+
| key                 | value                                                           |
+=====================+=================================================================+
| {"userid":"User_9"} | {"viewtime":1690327704650,"userid":"User_9","pageid":"Page_11"} |
+---------------------+-----------------------------------------------------------------+
| {"userid":"User_6"} | {"viewtime":1690327705651,"userid":"User_6","pageid":"Page_94"} |
+---------------------+-----------------------------------------------------------------+

Last updated