Using Multiple Stores in Queries
In DeltaStream, a Store is where user streaming data resides. Apache Kafka and Amazon Kinesis are two instances of such Stores. DeltaStream reads data from streaming Stores, performs the desired computation, and writes the results of the computation to the same Store or another. In this tutorial, we will demonstrate how users with multiple Stores can easily move data from one Store to another Store and perform joins between data in different Stores.
Before beginning this tutorial, it is helpful to know how to create Stores and Relations. Check out these tutorials below:
In this tutorial, we will assume you have the following setup:
- Two Stores—one Kafka Store called
MSKand one Kinesis Store called
- A Stream in
- A Stream in
By navigating to the Stores tab on the left menu, you'll see that we have already created a Kafka Store and a Kinesis Store.
Kafka and Kinesis Stores.
Now that we have established our two Stores, each with an associated Relation, we can now continue to writing queries.
Suppose we are trying to migrate data out of Kinesis and into Kafka. We can easily create a Stream that selects all the columns from an existing Stream and specify the store that the new Stream is backed by. In our setup, we have a Stream belonging to our Kinesis Store called
pageviews_kinesis. Using a CREATE STREAM AS SELECT query (CSAS), we can create a new Stream that is essentially a copy of the
pageviews_kinesisStream in the Kafka Store. We’ll call this new Stream
Creating a Kafka Stream that is a copy of a Kinesis Stream.
Notice in the CSAS query above that we include the
storeproperty in the
WITHclause of the sink Stream, with a value of
kafka_store. Since a CSAS query creates a new Stream, specifying this property informs the query about which Store should back this new Stream. If the
storeproperty is left empty, then the query will default to using the session’s current Store. We are also required to include the
topic.partitionsproperties as they are necessary when creating a new Kafka topic. If the source in this query was a Stream backed by a Kafka topic, then the source’s
topic.partitionsvalues would be used by default. But since we are reading from a Kinesis backed Stream, DeltaStream cannot infer the value of these properties, and they must be set explicitly in the query.
By navigating to Stores >
kafka_store> Topics, we will see that a new Kafka topic has been created called
pageviews_kafka. This is where the streaming data for the newly created
pageviews_kafkaStream is stored. By clicking on that Topic, we can print and see the records flowing through. These records are a copy of the
The new Stream copy.
In DeltaStream, once a Stream or Changelog is defined, it can be used as a source for queries. Below we’ll demonstrate how we can join the
pageviews_kinesisStream and the
users_kafkaStream. We can simply use these Streams as sources in our query without any additional specifications related to the Stores that they are backed by. This information is kept as metadata with the Stream. Behind the scenes, DeltaStream will seamlessly read from both Stores, perform the join, and then output the result to the sink Stream. We will once again use a CREATE STREAM AS SELECT query to create the output Stream. Since this is joining two Streams, this query is an interval-join-stream-stream that requires the
CREATE STREAM AS SELECTquery.
Notice in the CSAS above that we specify the output Store to be
WITHclause, similar to what we did in the section above. However, even though we are creating a new Kinesis Stream,
pageviews_enriched, we notably don’t need to provide the
topic.shardsproperty. The default value is inferred from the left-most source when possible. The sink Stream and the left-most source are both backed by Kinesis Streams, so
topic.shardsproperty will be applied to
By navigating to Stores > kinesis_store > Topics, we will see that a new Kinesis Stream has been created called
pageviews_enriched. This is where the streaming data for the newly created
pageviews_enrichedStream is stored. By clicking on that Topic, we can print and see the records flowing through. These records are a the result of the join between