Using Multiple Stores in Queries
Last updated
Last updated
In DeltaStream, your streaming data resides in a Store. Apache Kafka and Amazon Kinesis are two examples of such stores. DeltaStream reads data from a streaming store, performs the desired computation, and writes the results of the computation to the same store (or to another, if you wish).
This tutorial demonstrates how you can easily move data from one of your stores to another. It also shows how you can perform joins between data in different stores.
Before you begin, you should create stores and relations in DeltaStream, if you haven't already. To do so, please review these tutorials:
For this tutorial, have the following set up:
Two stores—a Kafka Store called MSK
and a Kinesis Store called kinesis_store
A stream in kinesis_store
called pageviews_kinesis
A stream in MSK
called users_kafka
In the lefthand navigation, click Resources ( ) to display a list of stores. Here you'll find your Kafka and Kinesis stores.
With your two stores established, each with an associated relation, you can write your queries.
To migrate data out of Kinesis and into Kafka, you can easily create a new stream that selects all the columns from an existing stream. Then you can specify the store that backs the new stream.
In the setup here, there's a stream belonging to your Kinesis store called pageviews_kinesis
. Use a CREATE STREAM AS SELECT query (CSAS) to create a new stream that is essentially a copy of the pageviews_kinesis
stream in the Kafka store. Label this new stream pageviews_kafka
.
Note the CSAS query above includes the store
property in the WITH
clause of the sink stream, with a value of kafka_store
. A CSAS query creates a new stream; specifying this property informs the query about which store should back this new stream. Leave the store
property empty, and the query defaults to using the session’s current store. You must include the topic.replicas
and topic.partitions
properties; these are necessary when creating a new Kafka topic. If the source in this query was a stream backed by a Kafka topic, then DeltaStream would use the source’s topic.replicas
and topic.partitions
values by default. But here you're reading from a Kinesis-backed stream, and DeltaStream cannot infer the value of these properties; you must set them explicitly in the query.
Navigate to Resources > Kafka Store to see that a new Kafka topic, called pageviews_kafka
, has been created and displays in the topic list. The streaming data for the newly-created pageviews_kafka
stream is stored in this topic. Click the topic to print and see the records flowing through. These records are a copy of the pageviews_kinesis
records.
Note in the CSAS above that you specify the output store to be kinesis_store
in the WITH
clause, similar to what you did in the section above. But even though you're creating a new Kinesis stream, called pageviews_enriched
, you don’t need to provide the topic.shards
property. DeltaStream infers the default value from the left-most source when possible. The sink stream and the left-most source are both backed by Kinesis streams, so pageview_kinesis
’s topic.shards
property is applied to pageviews_enriched
.
Navigate to Resources > Kinesis_store > Topics to see that there's a new Kinesis entity called pageviews_enriched
. This is where the streaming data for the newly-created pageviews_enriched
stream is stored. Click on that entity to print and see the records flowing through. These records result from the join between pageviews_kinesis
and users_kafka
.
The Kinesis Store has a Kinesis data stream called pageviews
. In DeltaStream, use a CREATE STREAM query, as shown below, to create a called pageviews_kinesis
that is backed by the pageviews
Kinesis Stream.
The Kafka store has a Kafka topic called ds_users
. In DeltaStream, use a CREATE STREAM query shown below to create a called users_kafka
that is backed by the ds_users
Kafka topic.
In DeltaStream, when you define a or you can use it as a source for queries. The example below demonstrates how you can join the pageviews_kinesis
stream and the users_kafka
stream. Simply use these streams as sources in your query; you do not need any additional specifications related to the stores that back them. DeltaStream keeps this information as metadata with the stream. Behind the scenes, DeltaStream seamlessly reads from both stores, performs the join, and then outputs the result to the sink stream. Again, use a CREATE STREAM AS SELECT query to create the output stream. Since this joins two streams, this query is an that requires the WITHIN
clause.