LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Compute Pools
      • Data Store
      • Database
      • Function
      • Query
      • SQL
      • Visualizing Data Lineage
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • How do I...?
    • Create and Manage Data Stores
      • Create Data Stores for Streaming Data
      • Explore Data Store and Topic Details
      • Use Multiple Data Stores in Queries
    • Manage Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Create DeltaStream Objects to Structure Raw Data
    • Use Namespacing for Organizing Data
    • Create and Query Materialized Views
    • Create a Compute Pool to Work with Iceberg
    • Create a Function
    • Secure my Connection to a Data Store
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Serialize my Data
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Integrations
    • Setting up Data Store Integrations
      • AWS S3
      • ClickHouse
      • Confluent Cloud
      • Databricks
      • Iceberg REST Catalog
      • PostgreSQL
      • Snowflake
      • WarpStream
  • Setting up Enterprise Security Integrations
    • Okta SAML Integration
    • Okta SCIM Integration
  • use cases
    • Using an AWS S3 Store as a Source to Feed an MSK Topic
  • Reference
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST COMPUTE_POOLS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
        • START COMPUTE_POOL
        • STOP COMPUTE_POOL
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE COMPUTE_POOL
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP COMPUTE_POOL
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • START/STOP COMPUTE_POOL
        • UPDATE COMPUTE_POOL
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • DeltaStream OpenAPI
      • Deltastream
      • Models
Powered by GitBook
On this page
  • Before you begin
  • Working with Multiple Data Stores
  • Move Data from One Data Store to Another
  • Join Data from Sources Belonging to Different Data Stores
  1. How do I...?
  2. Create and Manage Data Stores

Use Multiple Data Stores in Queries

PreviousExplore Data Store and Topic DetailsNextManage Users and User Roles

Last updated 16 days ago

In DeltaStream, your streaming data resides in a Data Store. Apache Kafka and Amazon Kinesis are two examples of such data stores. DeltaStream reads data from a streaming data store, performs the desired computation, and writes the results of the computation to the same data store (or to another, if you wish).

This article demonstrates how you can easily move data from one of your data stores to another. It also shows how you can perform joins between data in different data stores.

Before you begin, To do so, please review the following:

  • Create Data Stores for Streaming Data

  • Create DeltaStream Objects to Structure Raw Data

Before you begin

Before you begin, you should create data stores and DeltaStream objects in DeltaStream, if you haven't already. Set up the following:

  • Two data stores—a Kafka data store called MSK and a Kinesis data store called kinesis_store

  • A stream in kinesis_store called pageviews_kinesis

  • A stream in MSK called users_kafka

For details on setting these up, please see Create Data Stores for Streaming Data and Create DeltaStream Objects to Structure Raw Data.

Working with Multiple Data Stores

  1. In the lefthand navigation, click Resources ( ) to display a list of data stores. Here you'll find your Kafka and Kinesis stores.

With your two data stores established, each with an associated relation, you can write your queries.

Move Data from One Data Store to Another

To migrate data out of Kinesis and into Kafka, create a new stream that selects all the columns from an existing stream. Then you can specify the data store that backs the new stream.

In the setup here, there's a stream belonging to your Kinesis data store called pageviews_kinesis. Use a CREATE STREAM AS SELECT query (CSAS) to create a new stream that is essentially a copy of the pageviews_kinesis stream in the Kafka data store. Label this new stream pageviews_kafka.

Note the CSAS query above includes the store property in the WITH clause of the sink stream, with a value of kafka_store. A CSAS query creates a new stream; specifying this property informs the query about which data store should back this new stream. Leave the store property empty, and the query defaults to using the session’s current data store. You must include the topic.replicas and topic.partitions properties; these are necessary when creating a new Kafka topic.

Tip If the source in this query was a stream backed by a Kafka topic, then DeltaStream would use the source’s topic.replicas and topic.partitions values by default. But here you're reading from a Kinesis-backed stream, and DeltaStream cannot infer the value of these properties; you must set them explicitly in the query.

Navigate to Resources > Kafka Store to see that a new Kafka topic, called pageviews_kafka, has been created and displays in the topic list. The streaming data for the newly-created pageviews_kafka stream is stored in this topic. Click the topic to print and see the records flowing through. These records are a copy of the pageviews_kinesis records.

Join Data from Sources Belonging to Different Data Stores

Navigate to Resources > Kinesis_store > Topics to see that there's a new Kinesis entity called pageviews_enriched. This is where the streaming data for the newly-created pageviews_enriched stream is stored. Click that entity to print and see the records flowing through. These records result from the join between pageviews_kinesis and users_kafka.

The Kinesis data store has a Kinesis data stream called pageviews. In DeltaStream, use a CREATE STREAM query, as shown below, to create a called pageviews_kinesis that is backed by the pageviews Kinesis stream.

The Kafka data store has a Kafka topic called ds_users. In DeltaStream, use a CREATE STREAM query shown below to create a called users_kafka that is backed by the ds_users Kafka topic.

In DeltaStream, when you define a or you can use it as a source for queries. The example below demonstrates how you can join the pageviews_kinesis stream and the users_kafka stream. Simply use these streams as sources in your query; you do not need any additional specifications related to the data stores that back them. DeltaStream keeps this information as metadata with the stream. Behind the scenes, DeltaStream seamlessly reads from both data stores, performs the join, and then outputs the result to the sink stream. Again, use a CREATE STREAM AS SELECT query to create the output stream. Since this joins two streams, this query is an that requires the WITHIN clause.

Note in the CSAS above that in the WITH clause you specify the output data store to be kinesis_store, similar to what you did in the . But even though you're creating a new Kinesis stream, called pageviews_enriched, you don’t need to provide the topic.shards property. DeltaStream infers the default value from the left-most source when possible. The sink stream and the left-most source are both backed by Kinesis streams, so pageview_kinesis’s topic.shards property is applied to pageviews_enriched.

section above
Stream
Stream
Stream
Changelog
Kafka and Kinesis Stores.
Creating the pageviews_kinesisStream.
Kinesis Stream Results
Creating the users_kafkaStream.
Kafka Stream Results
Creating a Kafka Stream that is a copy of a Kinesis Stream.
The new stream copy.
Using a CREATE STREAM AS SELECT query.
#interval-join-stream-stream