LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Region
      • SQL
      • Store
      • Database
      • Query
      • Visualizing Data Lineage
      • Function
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • Tutorials
    • Managing Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Creating Stores for Streaming Data
    • Using Multiple Stores in Queries
    • Creating Relations to Structure Raw Data
    • Namespacing with Database and Schema
    • Creating and Querying Materialized Views
    • Creating a Function
    • Securing Your Connections to Data Stores
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Integrations
      • Connecting to Confluent Cloud
      • Databricks
      • PostgreSQL
      • Snowflake
      • WarpStream
    • Serialization
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Reference
    • Enterprise Security Integrations
      • Okta SAML Integration
      • Okta SCIM Integration
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST REGIONS
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • Rest API
Powered by GitBook
On this page
  • Setup
  • Move Data from One Store to Another
  • Join Data from Sources Belonging to Different Stores
  1. Tutorials

Using Multiple Stores in Queries

PreviousCreating Stores for Streaming DataNextCreating Relations to Structure Raw Data

Last updated 5 months ago

In DeltaStream, your streaming data resides in a Store. Apache Kafka and Amazon Kinesis are two examples of such stores. DeltaStream reads data from a streaming store, performs the desired computation, and writes the results of the computation to the same store (or to another, if you wish).

This tutorial demonstrates how you can easily move data from one of your stores to another. It also shows how you can perform joins between data in different stores.

Before you begin, you should create stores and relations in DeltaStream, if you haven't already. To do so, please review these tutorials:

  • Creating Stores for Streaming Data

  • Creating Relations to Structure Raw Data

Setup

For this tutorial, have the following set up:

  • Two stores—a Kafka Store called MSK and a Kinesis Store called kinesis_store

  • A stream in kinesis_store called pageviews_kinesis

  • A stream in MSK called users_kafka

In the lefthand navigation, click Resources ( ) to display a list of stores. Here you'll find your Kafka and Kinesis stores.

With your two stores established, each with an associated relation, you can write your queries.

Move Data from One Store to Another

To migrate data out of Kinesis and into Kafka, you can easily create a new stream that selects all the columns from an existing stream. Then you can specify the store that backs the new stream.

In the setup here, there's a stream belonging to your Kinesis store called pageviews_kinesis. Use a CREATE STREAM AS SELECT query (CSAS) to create a new stream that is essentially a copy of the pageviews_kinesis stream in the Kafka store. Label this new stream pageviews_kafka.

Note the CSAS query above includes the store property in the WITH clause of the sink stream, with a value of kafka_store. A CSAS query creates a new stream; specifying this property informs the query about which store should back this new stream. Leave the store property empty, and the query defaults to using the session’s current store. You must include the topic.replicas and topic.partitions properties; these are necessary when creating a new Kafka topic. If the source in this query was a stream backed by a Kafka topic, then DeltaStream would use the source’s topic.replicas and topic.partitions values by default. But here you're reading from a Kinesis-backed stream, and DeltaStream cannot infer the value of these properties; you must set them explicitly in the query.

Navigate to Resources > Kafka Store to see that a new Kafka topic, called pageviews_kafka, has been created and displays in the topic list. The streaming data for the newly-created pageviews_kafka stream is stored in this topic. Click the topic to print and see the records flowing through. These records are a copy of the pageviews_kinesis records.

Join Data from Sources Belonging to Different Stores

Navigate to Resources > Kinesis_store > Topics to see that there's a new Kinesis entity called pageviews_enriched. This is where the streaming data for the newly-created pageviews_enriched stream is stored. Click on that entity to print and see the records flowing through. These records result from the join between pageviews_kinesis and users_kafka.

The Kinesis Store has a Kinesis data stream called pageviews. In DeltaStream, use a CREATE STREAM query, as shown below, to create a called pageviews_kinesis that is backed by the pageviews Kinesis Stream.

The Kafka store has a Kafka topic called ds_users. In DeltaStream, use a CREATE STREAM query shown below to create a called users_kafka that is backed by the ds_users Kafka topic.

In DeltaStream, when you define a or you can use it as a source for queries. The example below demonstrates how you can join the pageviews_kinesis stream and the users_kafka stream. Simply use these streams as sources in your query; you do not need any additional specifications related to the stores that back them. DeltaStream keeps this information as metadata with the stream. Behind the scenes, DeltaStream seamlessly reads from both stores, performs the join, and then outputs the result to the sink stream. Again, use a CREATE STREAM AS SELECT query to create the output stream. Since this joins two streams, this query is an that requires the WITHIN clause.

Note in the CSAS above that you specify the output store to be kinesis_store in the WITH clause, similar to what you did in the . But even though you're creating a new Kinesis stream, called pageviews_enriched, you don’t need to provide the topic.shards property. DeltaStream infers the default value from the left-most source when possible. The sink stream and the left-most source are both backed by Kinesis streams, so pageview_kinesis’s topic.shards property is applied to pageviews_enriched.

section above
#interval-join-stream-stream
Stream
Stream
Stream
Changelog
Kafka and Kinesis Stores.
Creating the pageviews_kinesisStream.
Kinesis Stream Results
Creating the users_kafkaStream.
Kafka Stream Results
Creating a Kafka Stream that is a Copy of a Kinesis Stream.
The New Stream Copy.
Using a CREATE STREAM AS SELECT Query.