CREATE STORE

Syntax

CREATE STORE
    store_name
WITH (store_parameter = value [, ...]);

Description

DeltaStream processes streaming data that is stored in streaming stores such as Apache Kafka and Amazon Kinesis. To access such data, the first step is to connect to such data stores. This is done using CREATE STORE statement. It defines a new Store with connection details to connect to a remote data source. Currently DeltaStream supports Kafka (Confluent Cloud, Amazon MSK, RedPanda, etc.) and Amazon Kinesis. Support for other streaming stores such as Google Pub/Sub and Apache Pulsar are coming soon.

Stores can only be created by a Role with CREATE_STORE privilege.

Arguments

store_name

Name of the Store to define. For case-sensitive names, the name must be wrapped in double quotes, otherwise, the lowercased name will be used.

WITH (store_parameter = value [, …​ ])

This clause specifies Store Parameters.

Store Parameters

Kafka Specific Parameters

Parameters to be used if type is KAFKA:

Kinesis Specific Parameters

Parameters to be used if type is KINESIS:

Snowflake Specific Parameters

Parameters to be used if type is SNOWFLAKE:

Databricks Specific Parameters

Parameters to be used if type is DATABRICKS:

PostgreSQL Specific Parameters

Examples

Create a Kafka Store with credentials

The following creates a new Kafka store with name my_kafka_store:

CREATE STORE 
    my_kafka_store 
WITH ( 
    'type' = KAFKA, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'kafka.broker1.url:9092,kafka.broker2.url:9092', 
    'tls.ca_cert_file' = '/certs/us-east-1/self-signed-kafka-ca.crt'
);

Create an MSK Store with IAM credentials

The following creates a new Kafka store with name my_kafka_store:

CREATE STORE 
    msk_iam_store 
WITH ( 
    'type' = KAFKA, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'b-1.abc.abc.c10.kafka.us-east-1.amazonaws.com:9098,b-2.abc.abc.c10.kafka.us-east-1.amazonaws.com:9098,b-3.abc.abc.c10.kafka.us-east-1.amazonaws.com:9098', 
    'kafka.sasl.hash_function' = AWS_MSK_IAM,
    'kafka.msk.aws_region'='us-east-1',
    'kafka.msk.iam_role_arn'='arn:aws:iam::123456789012:role/example-IAM-role'
);

Create a Kafka Store with credentials from a file

The following creates a new Kafka store with name MyKafkaStore++:

CREATE STORE 
    "MyKafkaStore++" 
WITH ( 
    'type' = KAFKA,
    'properties.file' = '/User/user1/deltastream/kafka_store/properties.yaml'
);
$ cat /User/user1/deltastream/kafka_store/properties.yaml
uris: "http://uri1,http://uri2"
access_region: "AWS us-east-1"
kafka.sasl.hash_function: PLAIN
kafka.sasl.username: "ABCDEFGH12345678"
kafka.sasl.password: "kafkasaslpassword"

Create a Kinesis Store with IAM credentials

The following statement creates a new Kinesis store with name my_kinesis_store:

CREATE STORE 
    my_kinesis_store 
WITH ( 
    'type' = KINESIS, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'https://url.to.kinesis.aws:4566', 
    'kinesis.iam_role_arn' = 'arn:aws:iam::123456789012:role/example-IAM-role'
);

Create a Kinesis Store with static credentials

The following statement creates a new Kinesis store with name my_kinesis_store:

CREATE STORE 
    my_kinesis_store 
WITH ( 
    'type' = KINESIS, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'https://url.to.kinesis.aws:4566', 
    'kinesis.access_key_id' = 'testkey', 
    'kinesis.secret_access_key' = 'testsecret'
);

Create a Kafka Store with a Schema Registry

The following statement creates a new Kafka store with a Schema Registry named sr. Note that the store name is case-sensitive and thus has quotes around it:

CREATE STORE
    "kafkaStoreWithSR"
WITH (
    'type' = KAFKA, 
    'access_region' = "AWS us-east-1", 
    'uris' = 'kafka.broker1.url:9092,kafka.broker2.url:9092', 
    'schema_registry.name' = sr
);

Create a Confluent Kafka Store with credentials

The following creates a new Confluent Cloud Kafka store with the case-sensitive name ConfluentCloudKafkaStore:

CREATE STORE "ConfluentCloudKafkaStore" 
WITH ( 
    'type' = KAFKA,
    'access_region' = "AWS us-east-1",
    'uris' = 'abc-12345.us-east-1.aws.confluent.cloud:9092',
    'kafka.sasl.hash_function' = PLAIN,
    'kafka.sasl.username' = 'credentials_username',
    'kafka.sasl.password' = 'credentials_password'
);

Create a Snowflake Store

CREATE STORE sf 
WITH ( 
    'type' = SNOWFLAKE,
    'access_region' = "AWS us-east-1",
    'uris' = 'https://my-account.snowflakecomputing.com',
    'snowflake.account_id' = 'my-account',
    'snowflake.role_name' = 'ACCOUNTADMIN',
    'snowflake.username' = 'STREAMING_USER',
    'snowflake.warehouse_name' = 'COMPUTE_WH',
    'snowflake.client.key_file' = '/path/to/pk/my_account_rsa.p8'
);

Create a Snowflake Store with client key passphrase

CREATE STORE sf 
WITH ( 
    'type' = SNOWFLAKE,
    'access_region' = "AWS us-east-1",
    'uris' = 'https://my-account.snowflakecomputing.com',
    'snowflake.account_id' = 'my-account',
    'snowflake.role_name' = 'ACCOUNTADMIN',
    'snowflake.username' = 'STREAMING_USER',
    'snowflake.warehouse_name' = 'COMPUTE_WH',
    'snowflake.client.key_file' = '/path/to/pk/my_account_rsa.p8',
    'properties.file' = '/path/to/deltastream/snowflake_store/properties.yaml'
;
$ cat /path/to/deltastream/snowflake_store/properties.yaml
snowflake.client.key_passphrase: "my$account$$key$$$phrase"

Create a Databricks Store

CREATE STORE databricks_store WITH (
  'type' = DATABRICKS,
  'access_region' = "AWS us-east-1", 
  'uris' = 'https://dbc-12345678-1234.cloud.databricks.com', 
  'databricks.app_token' = 'dapiabcdefghijklmnopqrstuvw123456789', 
  'databricks.warehouse_id' = 'abcdefgh1234567', 
  'aws.access_key_id' = 'AWS_ACCESS_KEY', 
  'aws.secret_access_key' = 'AWS_SECRET_ACCESS_KEY', 
  'databricks.cloud.s3.bucket' = 'mybucket', 
  'databricks.cloud.region' = 'AWS us-west-2'
);

Create a PostgreSQL Store

CREATE STORE ps_store WITH (
  'type' = POSTGRESQL,
  'access_region' = "AWS us-east-1",
  'uris' = 'postgresql://mystore.com:5432/demo', 
  'postgres.username' = 'user',
  'postgres.password' = 'password'
);

Last updated