Using Multiple Stores in Queries
In DeltaStream, a Store is where user streaming data resides. Apache Kafka and Amazon Kinesis are two instances of such Stores. DeltaStream reads data from streaming Stores, performs the desired computation, and writes the results of the computation to the same Store or another. In this tutorial, we will demonstrate how users with multiple Stores can easily move data from one Store to another Store and perform joins between data in different Stores.
Before beginning this tutorial, it is helpful to know how to create Stores and Relations. Check out these tutorials below:
In this tutorial, we will assume you have the following setup:
- Two Stores—one Kafka Store called
MSK
and one Kinesis Store calledkinesis_store
- A Stream in
kinesis_store
calledpageviews_kinesis
- A Stream in
MSK
calledusers_kafka
By navigating to the Stores tab on the left menu, you'll see that we have already created a Kafka Store and a Kinesis Store.

Kafka and Kinesis Stores.
The Kinesis Store has a Kinesis stream called
pageviews
. We use a CREATE STREAM query, as shown below, to create a Stream called pageviews_kinesis
that is backed by the pageviews
Kinesis Stream.
Creating the
pageviews_kinesis
Stream.
The Kafka Store has a Kinesis Topic called ds_
users
. We use a CREATE STREAM query shown below to create a Stream called users_kafka
that is backed by the ds_users
Kafka topic.
Creating the
users_kafka
Stream.
Now that we have established our two Stores, each with an associated Relation, we can now continue to writing queries.
Suppose we are trying to migrate data out of Kinesis and into Kafka. We can easily create a Stream that selects all the columns from an existing Stream and specify the store that the new Stream is backed by. In our setup, we have a Stream belonging to our Kinesis Store called
pageviews_kinesis
. Using a CREATE STREAM AS SELECT query (CSAS), we can create a new Stream that is essentially a copy of the pageviews_kinesis
Stream in the Kafka Store. We’ll call this new Stream pageviews_kafka
.
Creating a Kafka Stream that is a copy of a Kinesis Stream.
Notice in the CSAS query above that we include the
store
property in the WITH
clause of the sink Stream, with a value of kafka_store
. Since a CSAS query creates a new Stream, specifying this property informs the query about which Store should back this new Stream. If the store
property is left empty, then the query will default to using the session’s current Store. We are also required to include the topic.replicas
and topic.partitions
properties as they are necessary when creating a new Kafka topic. If the source in this query was a Stream backed by a Kafka topic, then the source’s topic.replicas
and topic.partitions
values would be used by default. But since we are reading from a Kinesis backed Stream, DeltaStream cannot infer the value of these properties, and they must be set explicitly in the query.By navigating to Stores >
kafka_store
> Topics, we will see that a new Kafka topic has been created called pageviews_kafka
. This is where the streaming data for the newly created pageviews_kafka
Stream is stored. By clicking on that Topic, we can print and see the records flowing through. These records are a copy of the pageviews_kinesis
records.
The new Stream copy.
In DeltaStream, once a Stream or Changelog is defined, it can be used as a source for queries. Below we’ll demonstrate how we can join the
pageviews_kinesis
Stream and the users_kafka
Stream. We can simply use these Streams as sources in our query without any additional specifications related to the Stores that they are backed by. This information is kept as metadata with the Stream. Behind the scenes, DeltaStream will seamlessly read from both Stores, perform the join, and then output the result to the sink Stream. We will once again use a CREATE STREAM AS SELECT query to create the output Stream. Since this is joining two Streams, this query is an interval-join-stream-stream that requires the WITHIN
clause.
Using a
CREATE STREAM AS SELECT
query.Notice in the CSAS above that we specify the output Store to be
kinesis_store
in the WITH
clause, similar to what we did in the section above. However, even though we are creating a new Kinesis Stream, pageviews_enriched
, we notably don’t need to provide the topic.shards
property. The default value is inferred from the left-most source when possible. The sink Stream and the left-most source are both backed by Kinesis Streams, so pageview_kinesis
’s topic.shards
property will be applied to pageviews_enriched
.By navigating to Stores > kinesis_store > Topics, we will see that a new Kinesis Stream has been created called
pageviews_enriched
. This is where the streaming data for the newly created pageviews_enriched
Stream is stored. By clicking on that Topic, we can print and see the records flowing through. These records are a the result of the join between pageviews_kinesis
and users_kafka
.