CREATE STORE
Syntax
Description
DeltaStream processes streaming data that is stored in streaming stores such as Apache Kafka and Amazon Kinesis. The first step toward accessing this data is to connect to such data stores. you do this using the CREATE STORE
statement. This statement defines a new store with connection details to connect to a remote data source.
Note Currently DeltaStream supports Kafka (Confluent Cloud, Amazon MSK, RedPanda, and more) and Amazon Kinesis. Support for additional streaming stores such as Google Pub/Sub and Apache Pulsar are coming soon.
Only a role with CREATE_STORE
privilege can create a store.
Arguments
store_name
Name of the store to define. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.
WITH (store_parameter = value [, … ])
This clause specifies Store Parameters.
Store Parameters
type
Specifies the store type.
Required: Yes
Type: STORE_TYPE
Valid values: KAFKA
, KINESIS
, SNOWFLAKE
, DATABRICKS
, POSTGRESQL
access_region
Specifies the region of the store. To improve latency and reduce data transfer costs, the region should be in the same cloud and region in which the physical store is running.
Required: Yes, unless specified in properties.file
.
Type: String
Valid values: See LIST REGIONS
uris
List of comma-separated host:port
URIs to connect to the store.
Required: Yes, unless specified in properties.file
.
Type: String
tls.disabled
Optional. Specifies if the store should be accessed over TLS.
Required: No
Default value: TRUE
Type: Boolean
Valid values: TRUE
or FALSE
tls.verify_server_hostname
Specifies if the server CNAME should be validated against the certificate.
Required: No
Default value: TRUE
Type: Boolean
Valid values: TRUE
or FALSE
tls.ca_cert_file
Path to a CA certificate file in PEM format. Preface the path with @
for the file to be uploaded to the server.
Required: No Default value: Public CA chains. Type: String Valid values: Path to a SSL certificate in PEM format
tls.cipher_suites
Comma-separated list of cipher suites to use when establishing a TLS connection.
Required: No Default value: [] (all supported cipher suites are enabled) Type: List Valid values: Full cipher suite names describing algorithm content
tls.protocols
Comma-separated list TLS protocol versions to use while establishing a TLS connection.
Required: No
Default value: TLSv1.2,TLSv1.1,TLSv1
Type: List
Valid values: TLS protocols with version
schema_registry.name
Name of a schema registry to associate with the store. You must first create a schema registry using the CREATE SCHEMA_REGISTRY DDL statement. Only one schema registry can be associated with a store.
Required: No Default value: None Type: String Valid values: See LIST SCHEMA_REGISTRIES
properties.file
The file path to a .yaml file containing any store parameter. Preface the path with @
for the file to be uploaded to the server.
Required: No Default value: None Type: String Valid values: File path in current user's filesystem
Kafka Specific Parameters
Parameters to be used if type
is KAFKA
:
kafka.sasl.hash_function
SASL hash function to use when authenticating with Apache Kafka brokers.
Required: No
Default value: NONE
.
Type: HASH_FUNCTION
Valid values: NONE
, PLAIN
, SHA256
, SHA512
, and AWS_MSK_IAM
kafka.sasl.username
Username to use when authenticating with Apache Kafka brokers.
Required: Yes, if kafka.sasl.hash_function
is not NONE
or AWS_MSK_IAM
Default value: None Type: String
kafka.sasl.password
Password to use when authenticating with Apache Kafka brokers.
Required: Yes, if kafka.sasl.hash_function
is not NONE
or AWS_MSK_IAM
Default value: None Type: String
kafka.msk.aws_region
AWS region to use when authenticating with MSK.
Required: Yes, if kafka.sasl.hash_function
is AWS_MSK_IAM
Default value: None
Type: String
Example: us-east-1
kafka.msk.iam_role_arn
AWS IAM role ARN to use when authenticating with MSK.
Required: Yes, if kafka.sasl.hash_function
is AWS_MSK_IAM
Default value: None
Type: String
Example: arn:aws:iam::123456789012:role/example-IAM-role
tls.client.cert_file
Path to a client certificate file in PEM format. Preface the path with @
for the file to be uploaded to the server.
Required: Yes, if kafka.sasl.hash_function
is SHA256
or SHA512
Default value: None
Type: String
Valid value: Path to a SSL certificate in PEM format
tls.client.key_file
Path to the client key file in PEM format. Prefix the path with @
for the file to be uploaded to the server.
Required: Yes, if kafka.sasl.hash_function
is SHA256
or SHA512
Default value: None
Type: String
Valid value: Path to a SSL certificate in PEM format
Kinesis Specific Parameters
Parameters to be used if type
is KINESIS
:
kinesis.iam_role_arn
AWS IAM role ARN to use when authenticating with an Amazon Kinesis service.
Required: Yes, unless authenticating with the Amazon Kinesis Service using static AWS credentials
Default value: None
Type: String
Example: arn:aws:iam::123456789012:role/example-IAM-role
kinesis.access_key_id
AWS IAM access key to use when authenticating with an Amazon Kinesis service. Required: Yes, if authenticating with the Amazon Kinesis Service using static AWS credentials Default value: None Type: String
kinesis.secret_access_key
AWS IAM secret access key to use when authenticating with an Amazon Kinesis service. Required: Yes, if authenticating with the Amazon Kinesis Service using static AWS credentials Default value: None Type: String
Snowflake Specific Parameters
Parameters to be used if type
is SNOWFLAKE
:
snowflake.account_id
Snowflake account identifier assigned to the Snowflake account. Required: Yes Default value: None Type: String
snowflake.cloud.region
Name of the Snowflake cloud region in which the account resources operate. Required: Yes
Default value: None Type: String Valid values:
AWS us-east-1
AWS us-east-2
AWS us-west-1
AWS us-west-2
snowflake.role_name
Access control role to use for the store operations after connecting to Snowflake. Required: Yes
Default value: None Type: String
snowflake.username
User login name for the Snowflake account. Required: Yes Default value: None Type: String
snowflake.warehouse_name
Warehouse name to use for queries and other store operations that require compute resources. Required: Yes Default value: None Type: String
snowflake.client.key_file
Path to the Snowflake account's private key in PEM format. Prefix the path with @
for the file to be uploaded to the server.
Required: Yes
Default value: None
Type: String
snowflake.client.key_passphrase
Passphrase for decrypting the Snowflake account's private key.
Required: No Default value: None Type: String
Databricks Specific Parameters
Parameters to be used if type
is DATABRICKS
:
databricks.app_token
Databricks personal access token used when authenticating with a Databricks workspace. Required: Yes Default value: None Type: String
databricks.warehouse_id
The identifier for a Databricks SQL warehouse belonging to a Databricks workspace. This warehouse is used to create and query tables in Databricks. Required: Yes
Default value: None Type: String
databricks.warehouse_port
The port for a Databricks SQL warehouse belonging to a Databricks workspace. Required: No
Default value: 443 Type: Integer
aws.access_key_id
AWS access key ID used for writing data to S3. Required: Yes
Default value: None Type: String
aws.secret_access_key
AWS secret access key used for writing data to S3.
Required: Yes Default value: None Type: String
databricks.cloud.s3.bucket
The AWS S3 bucket that CREATE TABLE AS SELECT queries will write data to. Required: Yes Default value: None Type: String
databricks.cloud.region
The cloud region that the databricks.cloud.s3.bucket
belongs to.
Required: Yes
Default value: None
Type: String
Valid values:
AWS us-east-1
AWS us-east-2
AWS us-west-1
AWS us-west-2
PostgreSQL Specific Parameters
postgres.username
Username to connect to the database instance specified with the store's uris
parameter.
Required: Yes
Default value: None
Type: String
postgres.password
Password to connect to the database instance using the store's username
parameter.
Required: Yes
Default value: None Type: String
Examples
Create a Kafka store with credentials
The following creates a new Kafka store with name my_kafka_store
:
Create an MSK store with IAM credentials
The following creates a new Kafka store with name my_kafka_store
:
Create a Kafka store with credentials from a file
The following creates a new Kafka store with name MyKafkaStore++
:
Create a Kinesis store with IAM credentials
The following statement creates a new Kinesis store with name my_kinesis_store
:
Create a Kinesis store with static credentials
The following statement creates a new Kinesis store with name my_kinesis_store
:
Create a Kafka store with a schema registry
The following statement creates a new Kafka store with a schema registry named sr
. Note that the store name is case-sensitive and thus has quotes around it:
Create a Confluent Kafka store with credentials
The following creates a new Confluent Cloud Kafka store with the case-sensitive name ConfluentCloudKafkaStore
:
Create a Snowflake store
Create a Snowflake store with client key passphrase
Create a Databricks store
Create a PostgreSQL store
Last updated