CREATE TABLE AS SELECT
Syntax
CREATE TABLE
table_name
[WITH (table_parameter = value [, ... ])]
AS select_statement;
Description
CREATE TABLE AS
is a statement that:
Generates a DDL statement to create a new Table.
Launches a new query to write the results of the SELECT statement into the newly created table.
Arguments
table_name
This specifies the name of the new table. Optionally, use <database_name>.<schema_name>
as the prefix to the name to create the relation in that namespace. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.
WITH (<table_parameter> = <value> [, … ])
Optionally, this clause specifies Table Parameters.
select_statement
This statement specifies the SELECT statement to run.
Table Parameters
store
The name of the store that hosts the entity for this stream.
Required: No Default value: User’s default Data Store.
Type: String Valid values: See LIST STORES
Snowflake-Specific Parameters
snowflake.db.name
The name of the Snowflake database that would host the Snowflake Table.
Required: Yes Default values: None Type: String Valid values: Database names available from LIST ENTITIES.
snowflake.schema.name
The name of the Snowflake schema that would host the Snowflake Table.
Required: Yes
Default value: None
Type: String
Valid values: Schema names available from LIST ENTITIES under the snowflake.db.name
database name.
snowflake.table.name
The name of the Snowflake table to use when creating the Table. If the table doesn't exist in the store, a table with the table_name
is created in the corresponding store.
Required: No
Default value: table_name
Type: String
snowflake.buffer.millis
The amount of time to buffer events with the sink Table before writing the data into the corresponding Snowflake table.
It is important to note that the longer events are buffered, the larger the internal state of the query gets. Depending on the volume and size of the events for the corresponding query, memory limitations may be reached. Required: No Default value: 1000 Type: Long Valid values: (0, ...]
Snowflake stores provide a delivery guarantee of at_least_once
when producing events into a sink Table. Snowflake stores provide an insert-only mode when writing to Snowflake tables.
Databricks-Specific Parameters
databricks.catalog.name
The name of the Databricks catalog that would host the Databricks Table.
Required: Yes Default values: None Type: String Valid values: Database names available from LIST ENTITIES.
databricks.schema.name
The name of the Databricks schema that would host the Databricks Table.
Required: Yes
Default value: None
Type: String
Valid values: Schema names available from LIST ENTITIES under the databricks.catalog.name
catalog name.
databricks.table.name
The name of the Databricks table to use when creating the Table. A table with the table_name
is created in the corresponding store. If that table already exists, then an error is reported back to the user.
Required: No
Default value: table_name
Type: String
table.data.file.location
The S3 directory location for the Delta-formatted data to be written. The credentials for writing to S3 are given during store creation (see CREATE STORE). Note that the S3 bucket from the location specified by this parameter must match the databricks.cloud.s3.bucket
property defined in the store.
Required: Yes
Default value: None
Type: String
Databricks stores provide a delivery guarantee of exactly_once
when producing events into a sink Table. Databricks stores provide an insert-only mode when writing to Databricks tables.
Postgres-Specific Parameters
postgresql.db.name
Required: Yes
See CDC source parameters for the same parameter name
postgresql.schema.name
Required: Yes
See CDC source parameters for the same parameter name
postgresql.table.name
Required: No
See CDC source parameters for the same parameter name
Examples - Snowflake
Create a copy of a stream in a Snowflake table
The following creates a replica of the source Stream, pageviews
in the Snowflake Table, PV_TABLE
:
CREATE TABLE "PV_TABLE" WITH (
'store' = 'sfstore',
'snowflake.db.name' = 'DELTA_STREAMING',
'snowflake.schema.name' = 'PUBLIC'
) AS SELECT * FROM pageviews;
Create a stream of changes for a changelog in a Snowflake table
The following CTAS query creates a new Snowflake Table to store incremental changes resulting from a grouping aggregation on the transactions
Stream:
CREATE TABLE "CC_TYPE_USAGE" WITH (
'store' = 'sfstore',
'snowflake.db.name' = 'DELTA_STREAMING',
'snowflake.schema.name' = 'PUBLIC'
) AS SELECT
cc_type AS "CREDIT_TYPE",
tx_time AS "TRANSACTION_TS",
tx_id AS "TRANSACTION_ID"
FROM transactions
GROUP BY cc_type;
This query stores all changes to the grouping column cc_type
to the sink table CC_TYPE_USAGE
.
Examples — PostgreSQL
Assumptions for the following 2 examples:
pageviews
is a stream you have already defined.ps_store
is a PostgreSQL data store you have already created.For details and an example, see CREATE STORE.
Create a new table in a PostgresSQL data store — example 1
In this example, you create a new table, named pageviews
, in the given PostgreSQL data store under the public schema. The table has 3 columns:
viewtime
uid
pageid.
The query writes its results into this table as its sink.
CREATE TABLE pg_table WITH (
'store'='ps_store',
'postgresql.db.name'='demo',
'postgresql.schema.name'='public',
'postgresql.table.name'='pageviews') AS
SELECT viewtime, userid AS uid, pageid
FROM pageviews;
Create a new table in a PostgresSQL data store — example 2
This query creates a new table, named pagevisits
, in the given PostgreSQL data store under the public schema. The table has 2 columns:
userid
cnt
The query writes its results into this table.
CREATE TABLE pagevisits WITH (
'store'='ps_store',
'postgresql.db.name'='demo',
'postgresql.schema.name'='public') AS
SELECT userid, count(*) AS cnt
FROM pageviews
GROUP BY userid;
Create a copy of a stream in a Databricks table
The following creates a replica of the source Stream, pageviews
in the Databricks Table, pageviews_db
:
CREATE TABLE pageviews_db WITH (
'store' = 'databricks_store',
'databricks.catalog.name' = 'catalog1',
'databricks.schema.name' = 'schema1',
'databricks.table.name' = 'pageviews',
'table.data.file.location' = 's3://mybucket/test/0/pageviews'
) AS SELECT * FROM pageviews;
Upon issuing this query, a Databricks table is created in catalog1.schema1.pageviews
that uses s3://mybucket/test/0/pageviews
as its external location. This query writes the Delta-formatted parquet files and updates the Delta log in that S3 location.
Last updated