Comment on page
CREATE STORE
CREATE STORE
store_name
WITH (store_parameter = value [, ...]);
DeltaStream processes streaming data that is stored in streaming stores such as Apache Kafka and Amazon Kinesis. To access such data, the first step is to connect to such data stores. This is done using
CREATE STORE
statement. It defines a new Store with connection details to connect to a remote data source. Currently DeltaStream supports Kafka (Confluent Cloud, Amazon MSK, RedPanda, etc.) and Amazon Kinesis. Support for other streaming stores such as Google Pub/Sub and Apache Pulsar are coming soon.Name of the Store to define. For case-sensitive names, the name must be wrapped in double quotes, otherwise, the lowercased name will be used.
Parameter Name | Description |
---|---|
type | Required. Specifies the Store type.
Type: STORE_TYPE Valid values: KAFKA , KINESIS , SNOWFLAKE , DATABRICKS , POSTGRESQL |
access_region | Required, unless specified in properties.file . Specifies the region of the Store. In order to improve latency and reduce data transfer costs, the region should be the same cloud and region that the physical Store is running in.
Type: String
Valid values: See LIST REGIONS |
uris | Required, unless specified in properties.file . List of comma separated host:port URIs to connect to the store.
Type: String |
tls.disabled | Optional. Specifies if the store should be accessed over TLS.
Default value: TRUE Type: Boolean Valid values: TRUE or FALSE |
tls.verify_server_hostname | Optional. Specifies if the server CNAME should be validated against the certificate.
Default value: TRUE
Type: Boolean
Valid values: TRUE or FALSE |
tls.ca_cert_file | Optional. Path to a CA certificate file in PEM format.
Default value: Public CA chains.
Type: String
Valid values: Path to a SSL certificate in PEM format |
tls.cipher_suites | Optional. Comma separated list of cipher suites to use when establishing a TLS connection.
Default value: [] (all supported cipher suites are enabled)
Type: List
Valid values: Full cipher suite names describing algorithm content |
tls.protocols | Optional. Comma separated list TLS protocol versions to use while establishing a TLS connection.
Default value: TLSv1.2,TLSv1.1,TLSv1
Type: List
Valid values: TLS protocols with version |
schema_registry.name | Optional. Name of a Schema Registry to associate with the store. A Schema Registry must first be created using the CREATE SCHEMA_REGISTRY DDL statement. Only one Schema Registry can be associated with a store.
Default value: None
Type: String
Valid values: See LIST SCHEMA_REGISTRIES |
properties.file | Optional. The file path to a yaml file containing any store parameter.
Default value: None
Type: String
Valid values: File path in current user's filesystem |
Parameters to be used if
type
is KAFKA
:Parameter Name | Description |
---|---|
kafka.sasl.hash_function | Optional. SASL hash function to use when authenticating with Apache Kafka brokers.
Default value: NONE .
Type: HASH_FUNCTION
Valid values: NONE , PLAIN , SHA256 , and SHA512 |
kafka.sasl.username | Required if kafka.sasl.hash_function is not NONE , optional otherwise. Username to use when authenticating with Apache Kafka brokers.
Default value: None
Type: String |
kafka.sasl.password | Required if kafka.sasl.hash_function is not NONE , optional otherwise. Password to use when authenticating with Apache Kafka brokers.
Default value: None
Type: String |
tls.client.cert_file | Required if kafka.sasl.hash_function is SHA256 or SHA512 , optional otherwise. Path to a client certificate file in PEM format.
Default value: None
Type: String
Valid value: Path to a SSL certificate in PEM format |
tls.client.key_file | Required if kafka.sasl.hash_function is SHA256 or SHA512 , optional otherwise. Path to the client key file in PEM format.
Default value: None
Type: String
Valid value: Path to a SSL certificate in PEM format |
Parameters to be used if
type
is KINESIS
:Parameter Name | Description |
---|---|
kinesis.access_key_id | AWS IAM access key to use when authenticating with an Amazon Kinesis service.
Required: If static AWS credentials are required for authenticating with the Amazon Kinesis service.
Default value: None
Type: String |
kinesis.secret_access_key | AWS IAM secret access key to use when authenticating with an Amazon Kinesis service.
Required: If static AWS credentials are required for authenticating with the Amazon Kinesis service.
Default value: None
Type: String |
Parameters to be used if
type
is SNOWFLAKE
:Parameter Name | Description |
---|---|
snowflake.account_id | Snowflake account identifier assigned to the Snowflake account.
Required: Yes
Default value: None
Type: String |
snowflake.cloud.region | Snowflake cloud region name, where the account resources operate in.
Required: Yes Default value: None
Type: String
Valid values:
|
snowflake.role_name | Access control role to use for the Store operations after connecting to Snowflake.
Required: Yes Default value: None
Type: String |
snowflake.username | User login name for the Snowflake account.
Required: Yes
Default value: None
Type: String |
snowflake.warehouse_name | Warehouse name to use for queries and other store operations that require compute resources.
Required: Yes
Default value: None
Type: String |
snowflake.client.key_file | Path to the Snowflake account's private key in PEM format.
Required: Yes
Default value: None
Type: String |
snowflake.client.key_passphrase | Passphrase for decrypting the Snowflake account's private key.
Required: No
Default value: None
Type: String |
Parameters to be used if
type
is DATABRICKS
:Parameter Name | Description |
---|---|
databricks.app_token | Databricks personal access token used when authenticating with a Databricks workspace.
Required: Yes
Default value: None
Type: String |
databricks.warehouse_id | The identifier for a Databricks SQL Warehouse belonging to a Databricks workspace. This Warehouse will be used to create and query Tables in Databricks.
Required: Yes Default value: None
Type: String |
aws.access_key_id | AWS access key ID used for writing data to S3.
Required: Yes Default value: None
Type: String |
aws.secret_access_key | AWS secret access key used for writing data to S3.
Required: Yes
Default value: None
Type: String |
databricks.cloud.s3.bucket | The AWS S3 bucket that CREATE TABLE AS SELECT queries will write data to.
Required: Yes
Default value: None
Type: String |
databricks.cloud.region | The cloud region that the databricks.cloud.s3.bucket belongs to.
Required: Yes
Default value: None
Type: StringValid values:
|
Parameter Name | Description |
---|---|
postgres.username | Username to connect to the database instance specified with the store's uris parameter.
Required: Yes
Default value: None
Type: String |
postgres.password | Password to connect to the database instance using the store's username parameter.
Required: YesDefault value: None
Type: String |
The following creates a new Kafka store with name
my_kafka_store
:CREATE STORE
my_kafka_store
WITH (
'type' = KAFKA,
'access_region' = "AWS us-east-1",
'uris' = 'kafka.broker1.url:9092,kafka.broker2.url:9092',
'tls.ca_cert_file' = '/certs/us-east-1/self-signed-kafka-ca.crt'
);
The following creates a new Kafka store with name
MyKafkaStore++
:CREATE STORE
"MyKafkaStore++"
WITH (
'type' = KAFKA,
'properties.file' = '/User/user1/deltastream/kafka_store/properties.yaml'
);
$ cat /User/user1/deltastream/kafka_store/properties.yaml
uris: "http://uri1,http://uri2"
access_region: "AWS us-east-1"
kafka.sasl.hash_function: PLAIN
kafka.sasl.username: "ABCDEFGH12345678"
kafka.sasl.password: "kafkasaslpassword"
The following statement creates a new Kinesis store with name
my_kinesis_store
:CREATE STORE
my_kinesis_store
WITH (
'type' = KINESIS,
'access_region' = "AWS us-east-1",
'uris' = 'https://url.to.kinesis.aws:4566',
'kinesis.access_key_id' = 'testkey',
'kinesis.secret_access_key' = 'testsecret'
);
The following statement creates a new Kafka store with a Schema Registry named
sr
. Note that the store name is case-sensitive and thus has quotes around it:CREATE STORE
"kafkaStoreWithSR"
WITH (
'type' = KAFKA,
'access_region' = "AWS us-east-1",
'uris' = 'kafka.broker1.url:9092,kafka.broker2.url:9092',
'schema_registry.name' = sr
);
The following creates a new Confluent Cloud Kafka store with the case-sensitive name
ConfluentCloudKafkaStore
:CREATE STORE "ConfluentCloudKafkaStore"
WITH (
'type' = KAFKA,
'access_region' = "AWS us-east-1",
'uris' = 'abc-12345.us-east-1.aws.confluent.cloud:9092',
'kafka.sasl.hash_function' = PLAIN,
'kafka.sasl.username' = 'credentials_username',
'kafka.sasl.password' = 'credentials_password'
);
CREATE STORE sf
WITH (
'type' = SNOWFLAKE,
'access_region' = "AWS us-east-1",
'uris' = 'https://my-account.snowflakecomputing.com',
'snowflake.account_id' = 'my-account',
'snowflake.role_name' = 'ACCOUNTADMIN',
'snowflake.username' = 'STREAMING_USER',
'snowflake.warehouse_name' = 'COMPUTE_WH',
'snowflake.client.key_file' = '/path/to/pk/my_account_rsa.p8'
);
CREATE STORE sf
WITH (
'type' = SNOWFLAKE,
'access_region' = "AWS us-east-1",
'uris' = 'https://my-account.snowflakecomputing.com',
'snowflake.account_id' = 'my-account',
'snowflake.role_name' = 'ACCOUNTADMIN',
'snowflake.username' = 'STREAMING_USER',
'snowflake.warehouse_name' = 'COMPUTE_WH',
'snowflake.client.key_file' = '/path/to/pk/my_account_rsa.p8',
'properties.file' = '/path/to/deltastream/snowflake_store/properties.yaml'
;
$ cat /path/to/deltastream/snowflake_store/properties.yaml
snowflake.client.key_passphrase: "my$account$$key$$$phrase"
CREATE STORE databricks_store WITH (
'type' = DATABRICKS,
'access_region' = "AWS us-east-1",
'uris' = 'https://dbc-12345678-1234.cloud.databricks.com',
'databricks.app_token' = 'dapiabcdefghijklmnopqrstuvw123456789',
'databricks.warehouse_id' = 'abcdefgh1234567',
'aws.access_key_id' = 'AWS_ACCESS_KEY',
'aws.secret_access_key' = 'AWS_SECRET_ACCESS_KEY',
'databricks.cloud.s3.bucket' = 'mybucket',
'databricks.cloud.region' = 'AWS us-west-2'
);
CREATE STORE ps_store WITH (
'type' = POSTGRESQL,
'access_region' = "AWS us-east-1",
'uris' = 'postgresql://mystore.com:5432/demo',
'postgres.username' = 'user',
'postgres.password' = 'password'
);
Last modified 12d ago