CREATE STORE

Syntax

CREATE STORE
    store_name
WITH (store_parameter = value [, ...]);

Description

DeltaStream processes streaming data that is stored in streaming stores such as Apache Kafka and Amazon Kinesis. To access such data, the first step is to connect to such data stores. This is done using CREATE STORE statement. It defines a new Store with connection details to connect to a remote data source. Currently DeltaStream supports Kafka (Confluent Cloud, Amazon MSK, RedPanda, etc.) and Amazon Kinesis. Support for other streaming stores such as Google Pub/Sub and Apache Pulsar are coming soon.

Stores can only be created by a Role with CREATE_STORE privilege.

Arguments

store_name

Name of the Store to define. For case-sensitive names, the name must be wrapped in double quotes, otherwise, the lowercased name will be used.

WITH (store_parameter = value [, …​ ])

This clause specifies Store Parameters.

Store Parameters

Parameter NameDescription

type

Specifies the Store type.

Required: Yes

Type: STORE_TYPE

Valid values: KAFKA, KINESIS, SNOWFLAKE, DATABRICKS, POSTGRESQL

access_region

Specifies the region of the Store. In order to improve latency and reduce data transfer costs, the region should be the same cloud and region that the physical Store is running in.

Required: Yes, unless specified in properties.file. Type: String Valid values: See LIST REGIONS

uris

List of comma separated host:port URIs to connect to the store.

Required: Yes, unless specified in properties.file. Type: String

tls.disabled

Optional. Specifies if the store should be accessed over TLS.

Required: No Default value: TRUE

Type: Boolean

Valid values: TRUE or FALSE

tls.verify_server_hostname

Specifies if the server CNAME should be validated against the certificate.

Required: No Default value: TRUE Type: Boolean Valid values: TRUE or FALSE

tls.ca_cert_file

Path to a CA certificate file in PEM format. Prefix the path with @ for the file to be uploaded to the server.

Required: No Default value: Public CA chains. Type: String Valid values: Path to a SSL certificate in PEM format

tls.cipher_suites

Comma separated list of cipher suites to use when establishing a TLS connection.

Required: No Default value: [] (all supported cipher suites are enabled) Type: List Valid values: Full cipher suite names describing algorithm content

tls.protocols

Comma separated list TLS protocol versions to use while establishing a TLS connection.

Required: No Default value: TLSv1.2,TLSv1.1,TLSv1 Type: List Valid values: TLS protocols with version

schema_registry.name

Name of a Schema Registry to associate with the store. A Schema Registry must first be created using the CREATE SCHEMA_REGISTRY DDL statement. Only one Schema Registry can be associated with a store.

Required: No Default value: None Type: String Valid values: See LIST SCHEMA_REGISTRIES

properties.file

The file path to a yaml file containing any store parameter. Prefix the path with @ for the file to be uploaded to the server.

Required: No Default value: None Type: String Valid values: File path in current user's filesystem

Kafka Specific Parameters

Parameters to be used if type is KAFKA:

Parameter NameDescription

kafka.sasl.hash_function

SASL hash function to use when authenticating with Apache Kafka brokers.

Required: No Default value: NONE. Type: HASH_FUNCTION Valid values: NONE, PLAIN, SHA256, SHA512, and AWS_MSK_IAM

kafka.sasl.username

Username to use when authenticating with Apache Kafka brokers.

Required: Yes, if kafka.sasl.hash_function is not NONE or AWS_MSK_IAM

Default value: None Type: String

kafka.sasl.password

Password to use when authenticating with Apache Kafka brokers.

Required: Yes, if kafka.sasl.hash_function is not NONE or AWS_MSK_IAM

Default value: None Type: String

kafka.msk.aws_region

AWS region to use when authenticating with MSK.

Required: Yes, if kafka.sasl.hash_function is AWS_MSK_IAM Default value: None Type: String Example: us-east-1

kafka.msk.iam_role_arn

AWS IAM role ARN to use when authenticating with MSK.

Required: Yes, if kafka.sasl.hash_function is AWS_MSK_IAM Default value: None Type: String Example: arn:aws:iam::123456789012:role/example-IAM-role

tls.client.cert_file

Path to a client certificate file in PEM format. Prefix the path with @ for the file to be uploaded to the server.

Required: Yes, if kafka.sasl.hash_function is SHA256 or SHA512 Default value: None Type: String Valid value: Path to a SSL certificate in PEM format

tls.client.key_file

Path to the client key file in PEM format. Prefix the path with @ for the file to be uploaded to the server.

Required: Yes, if kafka.sasl.hash_function is SHA256 or SHA512 Default value: None Type: String Valid value: Path to a SSL certificate in PEM format

Kinesis Specific Parameters

Parameters to be used if type is KINESIS:

Parameter NameDescription

kinesis.iam_role_arn

AWS IAM role ARN to use when authenticating with an Amazon Kinesis service.

Required: Yes, unless authenticating with the Amazon Kinesis Service using static AWS credentials Default value: None Type: String Example: arn:aws:iam::123456789012:role/example-IAM-role

kinesis.access_key_id

AWS IAM access key to use when authenticating with an Amazon Kinesis service. Required: Yes, if authenticating with the Amazon Kinesis Service using static AWS credentials Default value: None Type: String

kinesis.secret_access_key

AWS IAM secret access key to use when authenticating with an Amazon Kinesis service. Required: Yes, if authenticating with the Amazon Kinesis Service using static AWS credentials Default value: None Type: String

Snowflake Specific Parameters

Parameters to be used if type is SNOWFLAKE:

Parameter NameDescription

snowflake.account_id

Snowflake account identifier assigned to the Snowflake account. Required: Yes Default value: None Type: String

snowflake.cloud.region

Snowflake cloud region name, where the account resources operate in. Required: Yes

Default value: None Type: String Valid values:

  • AWS us-east-1

  • AWS us-east-2

  • AWS us-west-1

  • AWS us-west-2

snowflake.role_name

Access control role to use for the Store operations after connecting to Snowflake. Required: Yes

Default value: None Type: String

snowflake.username

User login name for the Snowflake account. Required: Yes Default value: None Type: String

snowflake.warehouse_name

Warehouse name to use for queries and other store operations that require compute resources. Required: Yes Default value: None Type: String

snowflake.client.key_file

Path to the Snowflake account's private key in PEM format. Prefix the path with @ for the file to be uploaded to the server. Required: Yes Default value: None Type: String

snowflake.client.key_passphrase

Passphrase for decrypting the Snowflake account's private key.

Required: No Default value: None Type: String

Databricks Specific Parameters

Parameters to be used if type is DATABRICKS:

Parameter NameDescription

databricks.app_token

Databricks personal access token used when authenticating with a Databricks workspace. Required: Yes Default value: None Type: String

databricks.warehouse_id

The identifier for a Databricks SQL Warehouse belonging to a Databricks workspace. This Warehouse will be used to create and query Tables in Databricks. Required: Yes

Default value: None Type: String

databricks.warehouse_port

The port for a Databricks SQL Warehouse belonging to a Databricks workspace. Required: No

Default value: 443 Type: Integer

aws.access_key_id

AWS access key ID used for writing data to S3. Required: Yes

Default value: None Type: String

aws.secret_access_key

AWS secret access key used for writing data to S3.

Required: Yes Default value: None Type: String

databricks.cloud.s3.bucket

The AWS S3 bucket that CREATE TABLE AS SELECT queries will write data to. Required: Yes Default value: None Type: String

databricks.cloud.region

The cloud region that the databricks.cloud.s3.bucket belongs to. Required: Yes Default value: None Type: String

Valid values:

  • AWS us-east-1

  • AWS us-east-2

  • AWS us-west-1

  • AWS us-west-2

PostgreSQL Specific Parameters

Parameter NameDescription

postgres.username

Username to connect to the database instance specified with the store's uris parameter. Required: Yes Default value: None Type: String

postgres.password

Password to connect to the database instance using the store's username parameter. Required: Yes

Default value: None Type: String

Examples

Create a Kafka Store with credentials

The following creates a new Kafka store with name my_kafka_store:

CREATE STORE 
    my_kafka_store 
WITH ( 
    'type' = KAFKA, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'kafka.broker1.url:9092,kafka.broker2.url:9092', 
    'tls.ca_cert_file' = '@/certs/us-east-1/self-signed-kafka-ca.crt'
);

Create an MSK Store with IAM credentials

The following creates a new Kafka store with name my_kafka_store:

CREATE STORE 
    msk_iam_store 
WITH ( 
    'type' = KAFKA, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'b-1.abc.abc.c10.kafka.us-east-1.amazonaws.com:9098,b-2.abc.abc.c10.kafka.us-east-1.amazonaws.com:9098,b-3.abc.abc.c10.kafka.us-east-1.amazonaws.com:9098', 
    'kafka.sasl.hash_function' = AWS_MSK_IAM,
    'kafka.msk.aws_region'='us-east-1',
    'kafka.msk.iam_role_arn'='arn:aws:iam::123456789012:role/example-IAM-role'
);

Create a Kafka Store with credentials from a file

The following creates a new Kafka store with name MyKafkaStore++:

CREATE STORE 
    "MyKafkaStore++" 
WITH ( 
    'type' = KAFKA,
    'properties.file' = '@/User/user1/deltastream/kafka_store/properties.yaml'
);
$ cat /User/user1/deltastream/kafka_store/properties.yaml
uris: "http://uri1,http://uri2"
access_region: "AWS us-east-1"
kafka.sasl.hash_function: PLAIN
kafka.sasl.username: "ABCDEFGH12345678"
kafka.sasl.password: "kafkasaslpassword"

Create a Kinesis Store with IAM credentials

The following statement creates a new Kinesis store with name my_kinesis_store:

CREATE STORE 
    my_kinesis_store 
WITH ( 
    'type' = KINESIS, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'https://url.to.kinesis.aws:4566', 
    'kinesis.iam_role_arn' = 'arn:aws:iam::123456789012:role/example-IAM-role'
);

Create a Kinesis Store with static credentials

The following statement creates a new Kinesis store with name my_kinesis_store:

CREATE STORE 
    my_kinesis_store 
WITH ( 
    'type' = KINESIS, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'https://url.to.kinesis.aws:4566', 
    'kinesis.access_key_id' = 'testkey', 
    'kinesis.secret_access_key' = 'testsecret'
);

Create a Kafka Store with a Schema Registry

The following statement creates a new Kafka store with a Schema Registry named sr. Note that the store name is case-sensitive and thus has quotes around it:

CREATE STORE
    "kafkaStoreWithSR"
WITH (
    'type' = KAFKA, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'kafka.broker1.url:9092,kafka.broker2.url:9092', 
    'schema_registry.name' = sr
);

Create a Confluent Kafka Store with credentials

The following creates a new Confluent Cloud Kafka store with the case-sensitive name ConfluentCloudKafkaStore:

CREATE STORE "ConfluentCloudKafkaStore" 
WITH ( 
    'type' = KAFKA,
    'access_region' = "AWS us-east-1",
    'uris' = 'abc-12345.us-east-1.aws.confluent.cloud:9092',
    'kafka.sasl.hash_function' = PLAIN,
    'kafka.sasl.username' = 'credentials_username',
    'kafka.sasl.password' = 'credentials_password'
);

Create a Snowflake Store

CREATE STORE sf 
WITH ( 
    'type' = SNOWFLAKE,
    'access_region' = "AWS us-east-1",
    'uris' = 'https://my-account.snowflakecomputing.com',
    'snowflake.account_id' = 'my-account',
    'snowflake.role_name' = 'ACCOUNTADMIN',
    'snowflake.username' = 'STREAMING_USER',
    'snowflake.warehouse_name' = 'COMPUTE_WH',
    'snowflake.client.key_file' = '@/path/to/pk/my_account_rsa.p8'
);

Create a Snowflake Store with client key passphrase

CREATE STORE sf 
WITH ( 
    'type' = SNOWFLAKE,
    'access_region' = "AWS us-east-1",
    'uris' = 'https://my-account.snowflakecomputing.com',
    'snowflake.account_id' = 'my-account',
    'snowflake.role_name' = 'ACCOUNTADMIN',
    'snowflake.username' = 'STREAMING_USER',
    'snowflake.warehouse_name' = 'COMPUTE_WH',
    'snowflake.client.key_file' = '@/path/to/pk/my_account_rsa.p8',
    'properties.file' = '@/path/to/deltastream/snowflake_store/properties.yaml'
;
$ cat /path/to/deltastream/snowflake_store/properties.yaml
snowflake.client.key_passphrase: "my$account$$key$$$phrase"

Create a Databricks Store

CREATE STORE databricks_store WITH (
  'type' = DATABRICKS,
  'access_region' = "AWS us-east-1", 
  'uris' = 'https://dbc-12345678-1234.cloud.databricks.com', 
  'databricks.app_token' = 'dapiabcdefghijklmnopqrstuvw123456789', 
  'databricks.warehouse_id' = 'abcdefgh1234567', 
  'aws.access_key_id' = 'AWS_ACCESS_KEY', 
  'aws.secret_access_key' = 'AWS_SECRET_ACCESS_KEY', 
  'databricks.cloud.s3.bucket' = 'mybucket', 
  'databricks.cloud.region' = 'AWS us-west-2'
);

Create a PostgreSQL Store

CREATE STORE ps_store WITH (
  'type' = POSTGRESQL,
  'access_region' = "AWS us-east-1",
  'uris' = 'postgresql://mystore.com:5432/demo', 
  'postgres.username' = 'user',
  'postgres.password' = 'password'
);

Last updated