CREATE STORE
Syntax
CREATE STORE
store_name
WITH (store_parameter = value [, ...]);
Description
DeltaStream processes streaming data that is stored in streaming stores such as Apache Kafka and Amazon Kinesis. The first step toward accessing this data is to connect to such data stores. You do this using the CREATE STORE
statement.
Only a role with CREATE_STORE
privilege can create a store.
Arguments
store_name
Name of the store to define. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.
WITH (store_parameter = value [, … ])
This clause specifies Store Parameters.
Store Parameters
type
Specifies the store type.
Required: Yes
Type: STORE_TYPE
Valid values: KAFKA
, KINESIS
, SNOWFLAKE
, DATABRICKS
, POSTGRESQL
uris
List of comma-separated host:port
URIs to connect to the store.
Required: Yes, unless specified in properties.file
.
Type: String
tls.disabled
Optional. Specifies if the store should be accessed over TLS.
Required: No
Default value: FALSE
Type: Boolean
Valid values: TRUE
or FALSE
tls.verify_server_hostname
Specifies if the server CNAME should be validated against the certificate.
Required: No
Default value: TRUE
Type: Boolean
Valid values: TRUE
or FALSE
tls.ca_cert_file
Path to a CA certificate file in PEM format. Preface the path with @
for the file to be uploaded to the server.
Required: No Default value: Public CA chains. Type: String Valid values: Path to a SSL certificate in PEM format
tls.cipher_suites
Comma-separated list of cipher suites to use when establishing a TLS connection.
Required: No Default value: [] (all supported cipher suites are enabled) Type: List Valid values: Full cipher suite names describing algorithm content
tls.protocols
Comma-separated list TLS protocol versions to use while establishing a TLS connection.
Required: No
Default value: TLSv1.2,TLSv1.1,TLSv1
Type: List
Valid values: TLS protocols with version
schema_registry.name
Name of a schema registry to associate with the store. You must first create a schema registry using the CREATE SCHEMA_REGISTRY DDL statement. Only one schema registry can be associated with a store.
Required: No Default value: None Type: String Valid values: See LIST SCHEMA_REGISTRIES
properties.file
The file path to a .yaml file containing any store parameter. Preface the path with @
for the file to be uploaded to the server.
Required: No Default value: None Type: String Valid values: File path in current user's filesystem
ClickHouse-Specific Parameters
Parameters to be used if type
is CLICKHOUSE
:
clickhouse.username
Username to connect to the database instance specified with the store's uris
parameter.
Required: Yes
Default value: None
Type: String
clickhouse.password
Password to connect to the database instance using the store's username
parameter.
Required: Yes
Default value: None Type: String
Databricks-Specific Parameters
Parameters to be used if type
is DATABRICKS
:
databricks.app_token
Databricks personal access token used when authenticating with a Databricks workspace. Required: Yes Default value: None Type: String
databricks.warehouse_id
The identifier for a Databricks SQL warehouse belonging to a Databricks workspace. This warehouse is used to create and query tables in Databricks. Required: Yes
Default value: None Type: String
databricks.warehouse_port
The port for a Databricks SQL warehouse belonging to a Databricks workspace. Required: No
Default value: 443 Type: Integer
aws.access_key_id
AWS access key ID used for writing data to S3. Required: Yes
Default value: None Type: String
aws.secret_access_key
AWS secret access key used for writing data to S3.
Required: Yes Default value: None Type: String
databricks.cloud.s3.bucket
The AWS S3 bucket that CREATE TABLE AS SELECT queries will write data to. Required: Yes Default value: None Type: String
databricks.cloud.region
The cloud region that the databricks.cloud.s3.bucket
belongs to.
Required: Yes
Default value: None
Type: String
Valid values:
AWS us-east-1
AWS us-east-2
AWS us-west-1
AWS us-west-2
Iceberg REST Catalog-Specific Parameters
Parameters to be used if type
is ICEBERG REST
:
iceberg.rest.client_id
Iceberg client_id Required: Yes
Default value: NONE
Type: STRING
iceberg.rest.client_secret
Iceberg client_secret Required: Yes
Default value: NONE
Type: STRING
iceberg.rest.client_scope
Iceberg client_scope Required: Yes
Default value: NONE
Type: STRING
iceberg.catalog.id
Iceberg catalog name Required: Yes
Default value: NONE
Type: STRING
Kafka-Specific Parameters
Parameters to be used if type
is KAFKA
:
kafka.sasl.hash_function
SASL hash function to use when authenticating with Apache Kafka brokers.
Required: No
Default value: NONE
.
Type: HASH_FUNCTION
Valid values: NONE
, PLAIN
, SHA256
, SHA512
, and AWS_MSK_IAM
kafka.sasl.username
Username to use when authenticating with Apache Kafka brokers.
Required: Yes, if kafka.sasl.hash_function
is not NONE
or AWS_MSK_IAM
Default value: None Type: String
kafka.sasl.password
Password to use when authenticating with Apache Kafka brokers.
Required: Yes, if kafka.sasl.hash_function
is not NONE
or AWS_MSK_IAM
Default value: None Type: String
kafka.msk.aws_region
AWS region to use when authenticating with MSK.
Required: Yes, if kafka.sasl.hash_function
is AWS_MSK_IAM
Default value: None
Type: String
Example: us-east-1
kafka.msk.iam_role_arn
AWS IAM role ARN to use when authenticating with MSK.
Required: Yes, if kafka.sasl.hash_function
is AWS_MSK_IAM
Default value: None
Type: String
Example: arn:aws:iam::123456789012:role/example-IAM-role
tls.client.cert_file
Path to a client certificate file in PEM format. Preface the path with @
for the file to be uploaded to the server.
Required: Yes, if kafka.sasl.hash_function
is SHA256
or SHA512
Default value: None
Type: String
Valid value: Path to a SSL certificate in PEM format
tls.client.key_file
Path to the client key file in PEM format. Prefix the path with @
for the file to be uploaded to the server.
Required: Yes, if kafka.sasl.hash_function
is SHA256
or SHA512
Default value: None
Type: String
Valid value: Path to a SSL certificate in PEM format
Kinesis-Specific Parameters
Parameters to be used if type
is KINESIS
:
kinesis.iam_role_arn
AWS IAM role ARN to use when authenticating with an Amazon Kinesis service.
Required: Yes, unless authenticating with the Amazon Kinesis Service using static AWS credentials
Default value: None
Type: String
Example: arn:aws:iam::123456789012:role/example-IAM-role
kinesis.access_key_id
AWS IAM access key to use when authenticating with an Amazon Kinesis service. Required: Yes, if authenticating with the Amazon Kinesis Service using static AWS credentials Default value: None Type: String
kinesis.secret_access_key
AWS IAM secret access key to use when authenticating with an Amazon Kinesis service. Required: Yes, if authenticating with the Amazon Kinesis Service using static AWS credentials Default value: None Type: String
PostgreSQL-Specific Parameters
Parameters to be used if type
is PostgreSQL
:
postgres.username
Username to connect to the database instance specified with the store's uris
parameter.
Required: Yes
Default value: None
Type: String
postgres.password
Password to connect to the database instance using the store's username
parameter.
Required: Yes
Default value: None Type: String
To secure the Postgres connection, DeltaStream requires you to set tls.verify_server_hostname
to TRUE
and to set tls.disabled
to FALSE
.
S3-Specific Parameters
Parameters to be used if type
is S3
:
aws.iam_role_arn
AWS IAM role ARN to use when authenticating with S3. Required: No
Default value: None Type: String
aws.iam_external_id
IAM External ID.
Required: Yes, if aws.iam_role_arn
is specified.
Default value: None Type: String
aws.access_key_id
AWS IAM role ARN to use when authenticating with S3. Required: Yes, if authenticating using static AWS credentials.
Default value: None Type: String
aws.secret_access_key
IAM External ID. Required: Yes, if authenticating using static AWS credentials.
Default value: None Type: String
Snowflake-Specific Parameters
Parameters to be used if type
is SNOWFLAKE
:
snowflake.account_id
Snowflake account identifier assigned to the Snowflake account. Required: Yes Default value: None Type: String
snowflake.cloud.region
Name of the Snowflake cloud region in which the account resources operate. Required: Yes
Default value: None Type: String Valid values:
AWS us-east-1
AWS us-east-2
AWS us-west-1
AWS us-west-2
snowflake.role_name
Access control role to use for the store operations after connecting to Snowflake. Required: Yes
Default value: None Type: String
snowflake.username
User login name for the Snowflake account. Required: Yes Default value: None Type: String
snowflake.warehouse_name
Warehouse name to use for queries and other store operations that require compute resources. Required: Yes Default value: None Type: String
snowflake.client.key_file
Path to the Snowflake account's private key in PEM format. Prefix the path with @
for the file to be uploaded to the server.
Required: Yes
Default value: None
Type: String
snowflake.client.key_passphrase
Passphrase for decrypting the Snowflake account's private key.
Required: No Default value: None Type: String
Examples
Create a Kafka store with credentials
The following creates a new Kafka store with name my_kafka_store
:
CREATE STORE
my_kafka_store
WITH (
'type' = KAFKA,
'uris' = 'kafka.broker1.url:9092,kafka.broker2.url:9092',
'tls.ca_cert_file' = '@/certs/us-east-1/self-signed-kafka-ca.crt'
);
Create an MSK store with IAM credentials
The following creates a new Kafka store with name my_kafka_store
:
CREATE STORE
msk_iam_store
WITH (
'type' = KAFKA,
'uris' = 'b-1.abc.abc.c10.kafka.us-east-1.amazonaws.com:9098,b-2.abc.abc.c10.kafka.us-east-1.amazonaws.com:9098,b-3.abc.abc.c10.kafka.us-east-1.amazonaws.com:9098',
'kafka.sasl.hash_function' = AWS_MSK_IAM,
'kafka.msk.aws_region'='us-east-1',
'kafka.msk.iam_role_arn'='arn:aws:iam::123456789012:role/example-IAM-role'
);
Create a Kafka store with credentials from a file
The following creates a new Kafka store with name MyKafkaStore++
:
CREATE STORE
"MyKafkaStore++"
WITH (
'type' = KAFKA,
'properties.file' = '@/User/user1/deltastream/kafka_store/properties.yaml'
);
$ cat /User/user1/deltastream/kafka_store/properties.yaml
uris: "http://uri1,http://uri2"
kafka.sasl.hash_function: PLAIN
kafka.sasl.username: "ABCDEFGH12345678"
kafka.sasl.password: "kafkasaslpassword"
Create a Kinesis store with IAM credentials
The following statement creates a new Kinesis store with name my_kinesis_store
:
CREATE STORE
my_kinesis_store
WITH (
'type' = KINESIS,
'uris' = 'https://url.to.kinesis.aws:4566',
'kinesis.iam_role_arn' = 'arn:aws:iam::123456789012:role/example-IAM-role'
);
Create a Kinesis store with static credentials
The following statement creates a new Kinesis store with name my_kinesis_store
:
CREATE STORE
my_kinesis_store
WITH (
'type' = KINESIS,
'uris' = 'https://url.to.kinesis.aws:4566',
'kinesis.access_key_id' = 'testkey',
'kinesis.secret_access_key' = 'testsecret'
);
Create a Kafka store with a schema registry
The following statement creates a new Kafka store with a schema registry named sr
. Note that the store name is case-sensitive and thus has quotes around it:
CREATE STORE
"kafkaStoreWithSR"
WITH (
'type' = KAFKA,
'uris' = 'kafka.broker1.url:9092,kafka.broker2.url:9092',
'schema_registry.name' = sr
);
Create a Confluent Kafka store with credentials
The following creates a new Confluent Cloud Kafka store with the case-sensitive name ConfluentCloudKafkaStore
:
CREATE STORE "ConfluentCloudKafkaStore"
WITH (
'type' = KAFKA,
'uris' = 'abc-12345.us-east-1.aws.confluent.cloud:9092',
'kafka.sasl.hash_function' = PLAIN,
'kafka.sasl.username' = 'credentials_username',
'kafka.sasl.password' = 'credentials_password'
);
Create a Snowflake store
CREATE STORE sf
WITH (
'type' = SNOWFLAKE,
'uris' = 'https://my-account.snowflakecomputing.com',
'snowflake.account_id' = 'my-account',
'snowflake.role_name' = 'ACCOUNTADMIN',
'snowflake.username' = 'STREAMING_USER',
'snowflake.warehouse_name' = 'COMPUTE_WH',
'snowflake.client.key_file' = '@/path/to/pk/my_account_rsa.p8'
);
Create a Snowflake store with client key passphrase
CREATE STORE sf
WITH (
'type' = SNOWFLAKE,
'uris' = 'https://my-account.snowflakecomputing.com',
'snowflake.account_id' = 'my-account',
'snowflake.role_name' = 'ACCOUNTADMIN',
'snowflake.username' = 'STREAMING_USER',
'snowflake.warehouse_name' = 'COMPUTE_WH',
'snowflake.client.key_file' = '@/path/to/pk/my_account_rsa.p8',
'properties.file' = '@/path/to/deltastream/snowflake_store/properties.yaml'
;
$ cat /path/to/deltastream/snowflake_store/properties.yaml
snowflake.client.key_passphrase: "my$account$$key$$$phrase"
Create a Databricks store
CREATE STORE databricks_store WITH (
'type' = DATABRICKS,
'uris' = 'https://dbc-12345678-1234.cloud.databricks.com',
'databricks.app_token' = 'dapiabcdefghijklmnopqrstuvw123456789',
'databricks.warehouse_id' = 'abcdefgh1234567',
'aws.access_key_id' = 'AWS_ACCESS_KEY',
'aws.secret_access_key' = 'AWS_SECRET_ACCESS_KEY',
'databricks.cloud.s3.bucket' = 'mybucket',
'databricks.cloud.region' = 'AWS us-west-2'
);
Create a PostgreSQL store
CREATE STORE ps_store WITH (
'type' = POSTGRESQL,
'uris' = 'postgresql://mystore.com:5432/demo',
'postgres.username' = 'user',
'postgres.password' = 'password'
);
Create a ClickHouse store
CREATE STORE ch WITH (
'type' = CLICKHOUSE,
'uris' = 'jdbc:clickhouse://uri:8443',
'clickhouse.username' = 'my_user',
'clickhouse.password' = 'my_pass'
);
Create an Iceberg AWS Glue Catalog Store with IAM credentials
CREATE STORE iceberg_glue_store WITH (
'type' = ICEBERG_GLUE,
'uris' = 'https://mybucket.s3.amazonaws.com/',
'aws.iam_role_arn'='arn:aws:iam::123456789012:role/example-IAM-role',
'aws.iam_external_id'='EXTERNAL ID'
'aws.region'='us-east-1',
'iceberg.warehouse.default_path'='s3://bucket/path',
'iceberg.catalog.id'='mycatalog'
);
Create an Iceberg REST Catalog Store using Snowflake OpenCatalog (Apache Polaris)
CREATE STORE iceberg_rest_store WITH (
'type' = ICEBERG_REST,
'uris' = 'https://abcd-xyz.snowflakecomputing.com/polaris/api/catalog',
'iceberg.catalog.id' = 'my-catalog',
'iceberg.rest.client_id' = 'CLIENT_ID',
'iceberg.rest.client_secret' = 'CLIENT_SECRET',
'iceberg.rest.scope' = 'SCOPE'
);
Create an S3 store
CREATE STORE s3_store WITH (
'type' = S3,
'uris' = 'https://mybucket.s3.amazonaws.com/',
'aws.access_key_id'='AWS_ACCESS_KEY',
'aws.secret_access_key'='AWS_SECRET_ACCESS_KEY'
);
Create an S3 Store with IAM credentials
CREATE STORE s3_store WITH (
'type' = S3,
'uris' = 'https://mybucket.s3.amazonaws.com/',
'aws.iam_role_arn'='arn:aws:iam::123456789012:role/example-IAM-role',
'aws.iam_external_id'='EXTERNAL ID'
);
Notes
All parameters above are required
Last updated