DeltaStream
Search
K
Comment on page

CREATE STORE

Syntax

CREATE STORE
store_name
WITH (store_parameter = value [, ...]);

Description

DeltaStream processes streaming data that is stored in streaming stores such as Apache Kafka and Amazon Kinesis. To access such data, the first step is to connect to such data stores. This is done using CREATE STORE statement. It defines a new Store with connection details to connect to a remote data source. Currently DeltaStream supports Kafka (Confluent Cloud, Amazon MSK, RedPanda, etc.) and Amazon Kinesis. Support for other streaming stores such as Google Pub/Sub and Apache Pulsar are coming soon.

Arguments

store_name

Name of the Store to define. For case-sensitive names, the name must be wrapped in double quotes, otherwise, the lowercased name will be used.

WITH (store_parameter = value [, …​ ])

This clause specifies Store parameters; see Store Parameters for more information.

Store Parameters

Parameter Name
Description
type
Required. Specifies the Store type.
Type: STORE_TYPE
Valid values: KAFKA, KINESIS, SNOWFLAKE, DATABRICKS, POSTGRESQL
access_region
Required, unless specified in properties.file. Specifies the region of the Store. In order to improve latency and reduce data transfer costs, the region should be the same cloud and region that the physical Store is running in. Type: String Valid values: See LIST REGIONS
uris
Required, unless specified in properties.file. List of comma separated host:port URIs to connect to the store. Type: String
tls.disabled
Optional. Specifies if the store should be accessed over TLS. Default value: TRUE
Type: Boolean
Valid values: TRUE or FALSE
tls.verify_server_hostname
Optional. Specifies if the server CNAME should be validated against the certificate. Default value: TRUE Type: Boolean Valid values: TRUE or FALSE
tls.ca_cert_file
Optional. Path to a CA certificate file in PEM format. Default value: Public CA chains. Type: String Valid values: Path to a SSL certificate in PEM format
tls.cipher_suites
Optional. Comma separated list of cipher suites to use when establishing a TLS connection. Default value: [] (all supported cipher suites are enabled) Type: List Valid values: Full cipher suite names describing algorithm content
tls.protocols
Optional. Comma separated list TLS protocol versions to use while establishing a TLS connection. Default value: TLSv1.2,TLSv1.1,TLSv1 Type: List Valid values: TLS protocols with version
schema_registry.name
Optional. Name of a Schema Registry to associate with the store. A Schema Registry must first be created using the CREATE SCHEMA_REGISTRY DDL statement. Only one Schema Registry can be associated with a store. Default value: None Type: String Valid values: See LIST SCHEMA_REGISTRIES
properties.file
Optional. The file path to a yaml file containing any store parameter. Default value: None Type: String Valid values: File path in current user's filesystem

Kafka Specific Parameters

Parameters to be used if type is KAFKA:
Parameter Name
Description
kafka.sasl.hash_function
Optional. SASL hash function to use when authenticating with Apache Kafka brokers. Default value: NONE. Type: HASH_FUNCTION Valid values: NONE, PLAIN, SHA256, and SHA512
kafka.sasl.username
Required if kafka.sasl.hash_function is not NONE, optional otherwise. Username to use when authenticating with Apache Kafka brokers. Default value: None Type: String
kafka.sasl.password
Required if kafka.sasl.hash_function is not NONE, optional otherwise. Password to use when authenticating with Apache Kafka brokers. Default value: None Type: String
tls.client.cert_file
Required if kafka.sasl.hash_function is SHA256 or SHA512, optional otherwise. Path to a client certificate file in PEM format. Default value: None Type: String Valid value: Path to a SSL certificate in PEM format
tls.client.key_file
Required if kafka.sasl.hash_function is SHA256 or SHA512, optional otherwise. Path to the client key file in PEM format. Default value: None Type: String Valid value: Path to a SSL certificate in PEM format

Kinesis Specific Parameters

Parameters to be used if type is KINESIS:
Parameter Name
Description
kinesis.access_key_id
AWS IAM access key to use when authenticating with an Amazon Kinesis service. Required: If static AWS credentials are required for authenticating with the Amazon Kinesis service. Default value: None Type: String
kinesis.secret_access_key
AWS IAM secret access key to use when authenticating with an Amazon Kinesis service. Required: If static AWS credentials are required for authenticating with the Amazon Kinesis service. Default value: None Type: String

Snowflake Specific Parameters

Parameters to be used if type is SNOWFLAKE:
Parameter Name
Description
snowflake.account_id
Snowflake account identifier assigned to the Snowflake account. Required: Yes Default value: None Type: String
snowflake.cloud.region
Snowflake cloud region name, where the account resources operate in. Required: Yes
Default value: None Type: String Valid values:
  • AWS us-east-1
  • AWS us-east-2
  • AWS us-west-1
  • AWS us-west-2
snowflake.role_name
Access control role to use for the Store operations after connecting to Snowflake. Required: Yes
Default value: None Type: String
snowflake.username
User login name for the Snowflake account. Required: Yes Default value: None Type: String
snowflake.warehouse_name
Warehouse name to use for queries and other store operations that require compute resources. Required: Yes Default value: None Type: String
snowflake.client.key_file
Path to the Snowflake account's private key in PEM format. Required: Yes Default value: None Type: String
snowflake.client.key_passphrase
Passphrase for decrypting the Snowflake account's private key.
Required: No Default value: None Type: String

Databricks Specific Parameters

Parameters to be used if type is DATABRICKS:
Parameter Name
Description
databricks.app_token
Databricks personal access token used when authenticating with a Databricks workspace. Required: Yes Default value: None Type: String
databricks.warehouse_id
The identifier for a Databricks SQL Warehouse belonging to a Databricks workspace. This Warehouse will be used to create and query Tables in Databricks. Required: Yes
Default value: None Type: String
aws.access_key_id
AWS access key ID used for writing data to S3. Required: Yes
Default value: None Type: String
aws.secret_access_key
AWS secret access key used for writing data to S3.
Required: Yes Default value: None Type: String
databricks.cloud.s3.bucket
The AWS S3 bucket that CREATE TABLE AS SELECT queries will write data to. Required: Yes Default value: None Type: String
databricks.cloud.region
The cloud region that the databricks.cloud.s3.bucket belongs to. Required: Yes Default value: None Type: String
Valid values:
  • AWS us-east-1
  • AWS us-east-2
  • AWS us-west-1
  • AWS us-west-2

PostgreSQL Specific Parameters

Parameter Name
Description
postgres.username
Username to connect to the database instance specified with the store's uris parameter. Required: Yes Default value: None Type: String
postgres.password
Password to connect to the database instance using the store's username parameter. Required: Yes
Default value: None Type: String

Examples

Create a Kafka Store with credentials

The following creates a new Kafka store with name my_kafka_store:
CREATE STORE
my_kafka_store
WITH (
'type' = KAFKA,
'access_region' = "AWS us-east-1",
'uris' = 'kafka.broker1.url:9092,kafka.broker2.url:9092',
'tls.ca_cert_file' = '/certs/us-east-1/self-signed-kafka-ca.crt'
);

Create a Kafka Store with credentials from a file

The following creates a new Kafka store with name MyKafkaStore++:
CREATE STORE
"MyKafkaStore++"
WITH (
'type' = KAFKA,
'properties.file' = '/User/user1/deltastream/kafka_store/properties.yaml'
);
$ cat /User/user1/deltastream/kafka_store/properties.yaml
uris: "http://uri1,http://uri2"
access_region: "AWS us-east-1"
kafka.sasl.hash_function: PLAIN
kafka.sasl.username: "ABCDEFGH12345678"
kafka.sasl.password: "kafkasaslpassword"

Create a Kinesis Store with credentials

The following statement creates a new Kinesis store with name my_kinesis_store:
CREATE STORE
my_kinesis_store
WITH (
'type' = KINESIS,
'access_region' = "AWS us-east-1",
'uris' = 'https://url.to.kinesis.aws:4566',
'kinesis.access_key_id' = 'testkey',
'kinesis.secret_access_key' = 'testsecret'
);

Create a Kafka Store with a Schema Registry

The following statement creates a new Kafka store with a Schema Registry named sr. Note that the store name is case-sensitive and thus has quotes around it:
CREATE STORE
"kafkaStoreWithSR"
WITH (
'type' = KAFKA,
'access_region' = "AWS us-east-1",
'uris' = 'kafka.broker1.url:9092,kafka.broker2.url:9092',
'schema_registry.name' = sr
);

Create a Confluent Kafka Store with credentials

The following creates a new Confluent Cloud Kafka store with the case-sensitive name ConfluentCloudKafkaStore:
CREATE STORE "ConfluentCloudKafkaStore"
WITH (
'type' = KAFKA,
'access_region' = "AWS us-east-1",
'uris' = 'abc-12345.us-east-1.aws.confluent.cloud:9092',
'kafka.sasl.hash_function' = PLAIN,
'kafka.sasl.username' = 'credentials_username',
'kafka.sasl.password' = 'credentials_password'
);

Create a Snowflake Store

CREATE STORE sf
WITH (
'type' = SNOWFLAKE,
'access_region' = "AWS us-east-1",
'uris' = 'https://my-account.snowflakecomputing.com',
'snowflake.account_id' = 'my-account',
'snowflake.role_name' = 'ACCOUNTADMIN',
'snowflake.username' = 'STREAMING_USER',
'snowflake.warehouse_name' = 'COMPUTE_WH',
'snowflake.client.key_file' = '/path/to/pk/my_account_rsa.p8'
);

Create a Snowflake Store with client key passphrase

CREATE STORE sf
WITH (
'type' = SNOWFLAKE,
'access_region' = "AWS us-east-1",
'uris' = 'https://my-account.snowflakecomputing.com',
'snowflake.account_id' = 'my-account',
'snowflake.role_name' = 'ACCOUNTADMIN',
'snowflake.username' = 'STREAMING_USER',
'snowflake.warehouse_name' = 'COMPUTE_WH',
'snowflake.client.key_file' = '/path/to/pk/my_account_rsa.p8',
'properties.file' = '/path/to/deltastream/snowflake_store/properties.yaml'
;
$ cat /path/to/deltastream/snowflake_store/properties.yaml
snowflake.client.key_passphrase: "my$account$$key$$$phrase"

Create a Databricks Store

CREATE STORE databricks_store WITH (
'type' = DATABRICKS,
'access_region' = "AWS us-east-1",
'uris' = 'https://dbc-12345678-1234.cloud.databricks.com',
'databricks.app_token' = 'dapiabcdefghijklmnopqrstuvw123456789',
'databricks.warehouse_id' = 'abcdefgh1234567',
'aws.access_key_id' = 'AWS_ACCESS_KEY',
'aws.secret_access_key' = 'AWS_SECRET_ACCESS_KEY',
'databricks.cloud.s3.bucket' = 'mybucket',
'databricks.cloud.region' = 'AWS us-west-2'
);

Create a PostgreSQL Store

CREATE STORE ps_store WITH (
'type' = POSTGRESQL,
'access_region' = "AWS us-east-1",
'uris' = 'postgresql://mystore.com:5432/demo',
'postgres.username' = 'user',
'postgres.password' = 'password'
);