CREATE STORE

Syntax

CREATE STORE
    store_name
WITH (store_parameter = value [, ...]);

Description

DeltaStream processes streaming data that is stored in streaming stores such as Apache Kafka and Amazon Kinesis. The first step toward accessing this data is to connect to such data stores. you do this using the CREATE STORE statement. This statement defines a new store with connection details to connect to a remote data source.

Note Currently DeltaStream supports Kafka (Confluent Cloud, Amazon MSK, RedPanda, and more) and Amazon Kinesis. Support for additional streaming stores such as Google Pub/Sub and Apache Pulsar are coming soon.

Only a role with CREATE_STORE privilege can create a store.

Arguments

store_name

Name of the store to define. If the name is case sensitive you must wrap it in double quotes; otherwise the system uses the lower case name.

WITH (store_parameter = value [, …​ ])

This clause specifies Store Parameters.

Store Parameters

Parameter Name
Description

type

Specifies the store type.

Required: Yes

Type: STORE_TYPE

Valid values: KAFKA, KINESIS, SNOWFLAKE, DATABRICKS, POSTGRESQL

access_region

Specifies the region of the store. To improve latency and reduce data transfer costs, the region should be in the same cloud and region in which the physical store is running.

Required: Yes, unless specified in properties.file. Type: String Valid values: See LIST REGIONS

uris

List of comma-separated host:port URIs to connect to the store.

Required: Yes, unless specified in properties.file. Type: String

tls.disabled

Optional. Specifies if the store should be accessed over TLS.

Required: No Default value: TRUE

Type: Boolean

Valid values: TRUE or FALSE

tls.verify_server_hostname

Specifies if the server CNAME should be validated against the certificate.

Required: No Default value: TRUE Type: Boolean Valid values: TRUE or FALSE

tls.ca_cert_file

Path to a CA certificate file in PEM format. Preface the path with @ for the file to be uploaded to the server.

Required: No Default value: Public CA chains. Type: String Valid values: Path to a SSL certificate in PEM format

tls.cipher_suites

Comma-separated list of cipher suites to use when establishing a TLS connection.

Required: No Default value: [] (all supported cipher suites are enabled) Type: List Valid values: Full cipher suite names describing algorithm content

tls.protocols

Comma-separated list TLS protocol versions to use while establishing a TLS connection.

Required: No Default value: TLSv1.2,TLSv1.1,TLSv1 Type: List Valid values: TLS protocols with version

schema_registry.name

Name of a schema registry to associate with the store. You must first create a schema registry using the CREATE SCHEMA_REGISTRY DDL statement. Only one schema registry can be associated with a store.

Required: No Default value: None Type: String Valid values: See LIST SCHEMA_REGISTRIES

properties.file

The file path to a .yaml file containing any store parameter. Preface the path with @ for the file to be uploaded to the server.

Required: No Default value: None Type: String Valid values: File path in current user's filesystem

Kafka Specific Parameters

Parameters to be used if type is KAFKA:

Parameter Name
Description

kafka.sasl.hash_function

SASL hash function to use when authenticating with Apache Kafka brokers.

Required: No Default value: NONE. Type: HASH_FUNCTION Valid values: NONE, PLAIN, SHA256, SHA512, and AWS_MSK_IAM

kafka.sasl.username

Username to use when authenticating with Apache Kafka brokers.

Required: Yes, if kafka.sasl.hash_function is not NONE or AWS_MSK_IAM

Default value: None Type: String

kafka.sasl.password

Password to use when authenticating with Apache Kafka brokers.

Required: Yes, if kafka.sasl.hash_function is not NONE or AWS_MSK_IAM

Default value: None Type: String

kafka.msk.aws_region

AWS region to use when authenticating with MSK.

Required: Yes, if kafka.sasl.hash_function is AWS_MSK_IAM Default value: None Type: String Example: us-east-1

kafka.msk.iam_role_arn

AWS IAM role ARN to use when authenticating with MSK.

Required: Yes, if kafka.sasl.hash_function is AWS_MSK_IAM Default value: None Type: String Example: arn:aws:iam::123456789012:role/example-IAM-role

tls.client.cert_file

Path to a client certificate file in PEM format. Preface the path with @ for the file to be uploaded to the server.

Required: Yes, if kafka.sasl.hash_function is SHA256 or SHA512 Default value: None Type: String Valid value: Path to a SSL certificate in PEM format

tls.client.key_file

Path to the client key file in PEM format. Prefix the path with @ for the file to be uploaded to the server.

Required: Yes, if kafka.sasl.hash_function is SHA256 or SHA512 Default value: None Type: String Valid value: Path to a SSL certificate in PEM format

Kinesis Specific Parameters

Parameters to be used if type is KINESIS:

Parameter Name
Description

kinesis.iam_role_arn

AWS IAM role ARN to use when authenticating with an Amazon Kinesis service.

Required: Yes, unless authenticating with the Amazon Kinesis Service using static AWS credentials Default value: None Type: String Example: arn:aws:iam::123456789012:role/example-IAM-role

kinesis.access_key_id

AWS IAM access key to use when authenticating with an Amazon Kinesis service. Required: Yes, if authenticating with the Amazon Kinesis Service using static AWS credentials Default value: None Type: String

kinesis.secret_access_key

AWS IAM secret access key to use when authenticating with an Amazon Kinesis service. Required: Yes, if authenticating with the Amazon Kinesis Service using static AWS credentials Default value: None Type: String

Snowflake Specific Parameters

Parameters to be used if type is SNOWFLAKE:

Parameter Name
Description

snowflake.account_id

Snowflake account identifier assigned to the Snowflake account. Required: Yes Default value: None Type: String

snowflake.cloud.region

Name of the Snowflake cloud region in which the account resources operate. Required: Yes

Default value: None Type: String Valid values:

  • AWS us-east-1

  • AWS us-east-2

  • AWS us-west-1

  • AWS us-west-2

snowflake.role_name

Access control role to use for the store operations after connecting to Snowflake. Required: Yes

Default value: None Type: String

snowflake.username

User login name for the Snowflake account. Required: Yes Default value: None Type: String

snowflake.warehouse_name

Warehouse name to use for queries and other store operations that require compute resources. Required: Yes Default value: None Type: String

snowflake.client.key_file

Path to the Snowflake account's private key in PEM format. Prefix the path with @ for the file to be uploaded to the server. Required: Yes Default value: None Type: String

snowflake.client.key_passphrase

Passphrase for decrypting the Snowflake account's private key.

Required: No Default value: None Type: String

Databricks Specific Parameters

Parameters to be used if type is DATABRICKS:

Parameter Name
Description

databricks.app_token

Databricks personal access token used when authenticating with a Databricks workspace. Required: Yes Default value: None Type: String

databricks.warehouse_id

The identifier for a Databricks SQL warehouse belonging to a Databricks workspace. This warehouse is used to create and query tables in Databricks. Required: Yes

Default value: None Type: String

databricks.warehouse_port

The port for a Databricks SQL warehouse belonging to a Databricks workspace. Required: No

Default value: 443 Type: Integer

aws.access_key_id

AWS access key ID used for writing data to S3. Required: Yes

Default value: None Type: String

aws.secret_access_key

AWS secret access key used for writing data to S3.

Required: Yes Default value: None Type: String

databricks.cloud.s3.bucket

The AWS S3 bucket that CREATE TABLE AS SELECT queries will write data to. Required: Yes Default value: None Type: String

databricks.cloud.region

The cloud region that the databricks.cloud.s3.bucket belongs to. Required: Yes Default value: None Type: String

Valid values:

  • AWS us-east-1

  • AWS us-east-2

  • AWS us-west-1

  • AWS us-west-2

PostgreSQL Specific Parameters

Parameter Name
Description

postgres.username

Username to connect to the database instance specified with the store's uris parameter. Required: Yes Default value: None Type: String

postgres.password

Password to connect to the database instance using the store's username parameter. Required: Yes

Default value: None Type: String

Examples

Create a Kafka store with credentials

The following creates a new Kafka store with name my_kafka_store:

CREATE STORE 
    my_kafka_store 
WITH ( 
    'type' = KAFKA, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'kafka.broker1.url:9092,kafka.broker2.url:9092', 
    'tls.ca_cert_file' = '@/certs/us-east-1/self-signed-kafka-ca.crt'
);

Create an MSK store with IAM credentials

The following creates a new Kafka store with name my_kafka_store:

CREATE STORE 
    msk_iam_store 
WITH ( 
    'type' = KAFKA, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'b-1.abc.abc.c10.kafka.us-east-1.amazonaws.com:9098,b-2.abc.abc.c10.kafka.us-east-1.amazonaws.com:9098,b-3.abc.abc.c10.kafka.us-east-1.amazonaws.com:9098', 
    'kafka.sasl.hash_function' = AWS_MSK_IAM,
    'kafka.msk.aws_region'='us-east-1',
    'kafka.msk.iam_role_arn'='arn:aws:iam::123456789012:role/example-IAM-role'
);

Create a Kafka store with credentials from a file

The following creates a new Kafka store with name MyKafkaStore++:

CREATE STORE 
    "MyKafkaStore++" 
WITH ( 
    'type' = KAFKA,
    'properties.file' = '@/User/user1/deltastream/kafka_store/properties.yaml'
);
$ cat /User/user1/deltastream/kafka_store/properties.yaml
uris: "http://uri1,http://uri2"
access_region: "AWS us-east-1"
kafka.sasl.hash_function: PLAIN
kafka.sasl.username: "ABCDEFGH12345678"
kafka.sasl.password: "kafkasaslpassword"

Create a Kinesis store with IAM credentials

The following statement creates a new Kinesis store with name my_kinesis_store:

CREATE STORE 
    my_kinesis_store 
WITH ( 
    'type' = KINESIS, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'https://url.to.kinesis.aws:4566', 
    'kinesis.iam_role_arn' = 'arn:aws:iam::123456789012:role/example-IAM-role'
);

Create a Kinesis store with static credentials

The following statement creates a new Kinesis store with name my_kinesis_store:

CREATE STORE 
    my_kinesis_store 
WITH ( 
    'type' = KINESIS, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'https://url.to.kinesis.aws:4566', 
    'kinesis.access_key_id' = 'testkey', 
    'kinesis.secret_access_key' = 'testsecret'
);

Create a Kafka store with a schema registry

The following statement creates a new Kafka store with a schema registry named sr. Note that the store name is case-sensitive and thus has quotes around it:

CREATE STORE
    "kafkaStoreWithSR"
WITH (
    'type' = KAFKA, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'kafka.broker1.url:9092,kafka.broker2.url:9092', 
    'schema_registry.name' = sr
);

Create a Confluent Kafka store with credentials

The following creates a new Confluent Cloud Kafka store with the case-sensitive name ConfluentCloudKafkaStore:

CREATE STORE "ConfluentCloudKafkaStore" 
WITH ( 
    'type' = KAFKA,
    'access_region' = "AWS us-east-1",
    'uris' = 'abc-12345.us-east-1.aws.confluent.cloud:9092',
    'kafka.sasl.hash_function' = PLAIN,
    'kafka.sasl.username' = 'credentials_username',
    'kafka.sasl.password' = 'credentials_password'
);

Create a Snowflake store

CREATE STORE sf 
WITH ( 
    'type' = SNOWFLAKE,
    'access_region' = "AWS us-east-1",
    'uris' = 'https://my-account.snowflakecomputing.com',
    'snowflake.account_id' = 'my-account',
    'snowflake.role_name' = 'ACCOUNTADMIN',
    'snowflake.username' = 'STREAMING_USER',
    'snowflake.warehouse_name' = 'COMPUTE_WH',
    'snowflake.client.key_file' = '@/path/to/pk/my_account_rsa.p8'
);

Create a Snowflake store with client key passphrase

CREATE STORE sf 
WITH ( 
    'type' = SNOWFLAKE,
    'access_region' = "AWS us-east-1",
    'uris' = 'https://my-account.snowflakecomputing.com',
    'snowflake.account_id' = 'my-account',
    'snowflake.role_name' = 'ACCOUNTADMIN',
    'snowflake.username' = 'STREAMING_USER',
    'snowflake.warehouse_name' = 'COMPUTE_WH',
    'snowflake.client.key_file' = '@/path/to/pk/my_account_rsa.p8',
    'properties.file' = '@/path/to/deltastream/snowflake_store/properties.yaml'
;
$ cat /path/to/deltastream/snowflake_store/properties.yaml
snowflake.client.key_passphrase: "my$account$$key$$$phrase"

Create a Databricks store

CREATE STORE databricks_store WITH (
  'type' = DATABRICKS,
  'access_region' = "AWS us-east-1", 
  'uris' = 'https://dbc-12345678-1234.cloud.databricks.com', 
  'databricks.app_token' = 'dapiabcdefghijklmnopqrstuvw123456789', 
  'databricks.warehouse_id' = 'abcdefgh1234567', 
  'aws.access_key_id' = 'AWS_ACCESS_KEY', 
  'aws.secret_access_key' = 'AWS_SECRET_ACCESS_KEY', 
  'databricks.cloud.s3.bucket' = 'mybucket', 
  'databricks.cloud.region' = 'AWS us-west-2'
);

Create a PostgreSQL store

CREATE STORE ps_store WITH (
  'type' = POSTGRESQL,
  'access_region' = "AWS us-east-1",
  'uris' = 'postgresql://mystore.com:5432/demo', 
  'postgres.username' = 'user',
  'postgres.password' = 'password'
);

Last updated