Using Multiple Stores in Queries

In DeltaStream, a Store is where user streaming data resides. Apache Kafka and Amazon Kinesis are two instances of such Stores. DeltaStream reads data from streaming Stores, performs the desired computation, and writes the results of the computation to the same Store or another. In this tutorial, we will demonstrate how users with multiple Stores can easily move data from one Store to another Store and perform joins between data in different Stores.
Before beginning this tutorial, it is helpful to know how to create Stores and Relations. Check out these tutorials below:


In this tutorial, we will assume you have the following setup:
  • Two Stores—one Kafka Store called MSK and one Kinesis Store called kinesis_store
  • A Stream in kinesis_store called pageviews_kinesis
  • A Stream in MSK called users_kafka
By navigating to the Stores tab on the left menu, you'll see that we have already created a Kafka Store and a Kinesis Store.
Kafka and Kinesis Stores.
The Kinesis Store has a Kinesis stream called pageviews. We use a CREATE STREAM query, as shown below, to create a Stream called pageviews_kinesis that is backed by the pageviews Kinesis Stream.
Creating the pageviews_kinesisStream.
The Kafka Store has a Kinesis Topic called ds_users. We use a CREATE STREAM query shown below to create a Stream called users_kafka that is backed by the ds_users Kafka topic.
Creating the users_kafkaStream.
Now that we have established our two Stores, each with an associated Relation, we can now continue to writing queries.

Move Data from One Store to Another

Suppose we are trying to migrate data out of Kinesis and into Kafka. We can easily create a Stream that selects all the columns from an existing Stream and specify the store that the new Stream is backed by. In our setup, we have a Stream belonging to our Kinesis Store called pageviews_kinesis. Using a CREATE STREAM AS SELECT query (CSAS), we can create a new Stream that is essentially a copy of the pageviews_kinesis Stream in the Kafka Store. We’ll call this new Stream pageviews_kafka.
Creating a Kafka Stream that is a copy of a Kinesis Stream.
Notice in the CSAS query above that we include the store property in the WITH clause of the sink Stream, with a value of kafka_store. Since a CSAS query creates a new Stream, specifying this property informs the query about which Store should back this new Stream. If the store property is left empty, then the query will default to using the session’s current Store. We are also required to include the topic.replicas and topic.partitions properties as they are necessary when creating a new Kafka topic. If the source in this query was a Stream backed by a Kafka topic, then the source’s topic.replicas and topic.partitions values would be used by default. But since we are reading from a Kinesis backed Stream, DeltaStream cannot infer the value of these properties, and they must be set explicitly in the query.
By navigating to Stores > kafka_store > Topics, we will see that a new Kafka topic has been created called pageviews_kafka. This is where the streaming data for the newly created pageviews_kafka Stream is stored. By clicking on that Topic, we can print and see the records flowing through. These records are a copy of the pageviews_kinesis records.
The new Stream copy.

Join Data from Sources Belonging to Different Stores

In DeltaStream, once a Stream or Changelog is defined, it can be used as a source for queries. Below we’ll demonstrate how we can join the pageviews_kinesis Stream and the users_kafka Stream. We can simply use these Streams as sources in our query without any additional specifications related to the Stores that they are backed by. This information is kept as metadata with the Stream. Behind the scenes, DeltaStream will seamlessly read from both Stores, perform the join, and then output the result to the sink Stream. We will once again use a CREATE STREAM AS SELECT query to create the output Stream. Since this is joining two Streams, this query is an interval-join-stream-stream that requires the WITHIN clause.
Notice in the CSAS above that we specify the output Store to be kinesis_store in the WITH clause, similar to what we did in the section above. However, even though we are creating a new Kinesis Stream, pageviews_enriched, we notably don’t need to provide the topic.shards property. The default value is inferred from the left-most source when possible. The sink Stream and the left-most source are both backed by Kinesis Streams, so pageview_kinesis’s topic.shards property will be applied to pageviews_enriched.
By navigating to Stores > kinesis_store > Topics, we will see that a new Kinesis Stream has been created called pageviews_enriched. This is where the streaming data for the newly created pageviews_enriched Stream is stored. By clicking on that Topic, we can print and see the records flowing through. These records are a the result of the join between pageviews_kinesis and users_kafka.