Use Multiple Data Stores in Queries
Last updated
Last updated
In DeltaStream, your streaming data resides in a Data Store. Apache Kafka and Amazon Kinesis are two examples of such data stores. DeltaStream reads data from a streaming data store, performs the desired computation, and writes the results of the computation to the same data store (or to another, if you wish).
This article demonstrates how you can easily move data from one of your data stores to another. It also shows how you can perform joins between data in different data stores.
Before you begin, To do so, please review the following:
Before you begin, you should create data stores and DeltaStream objects in DeltaStream, if you haven't already. Set up the following:
Two data stores—a Kafka data store called MSK
and a Kinesis data store called kinesis_store
A stream in kinesis_store
called pageviews_kinesis
A stream in MSK
called users_kafka
For details on setting these up, please see Create Data Stores for Streaming Data and Create DeltaStream Objects to Structure Raw Data.
In the lefthand navigation, click Resources ( ) to display a list of data stores. Here you'll find your Kafka and Kinesis stores.
With your two data stores established, each with an associated relation, you can write your queries.
To migrate data out of Kinesis and into Kafka, create a new stream that selects all the columns from an existing stream. Then you can specify the data store that backs the new stream.
In the setup here, there's a stream belonging to your Kinesis data store called pageviews_kinesis
. Use a CREATE STREAM AS SELECT query (CSAS) to create a new stream that is essentially a copy of the pageviews_kinesis
stream in the Kafka data store. Label this new stream pageviews_kafka
.
Note the CSAS query above includes the store
property in the WITH
clause of the sink stream, with a value of kafka_store
. A CSAS query creates a new stream; specifying this property informs the query about which data store should back this new stream. Leave the store
property empty, and the query defaults to using the session’s current data store. You must include the topic.replicas
and topic.partitions
properties; these are necessary when creating a new Kafka topic.
Tip If the source in this query was a stream backed by a Kafka topic, then DeltaStream would use the source’s topic.replicas
and topic.partitions
values by default. But here you're reading from a Kinesis-backed stream, and DeltaStream cannot infer the value of these properties; you must set them explicitly in the query.
Navigate to Resources > Kafka Store to see that a new Kafka topic, called pageviews_kafka
, has been created and displays in the topic list. The streaming data for the newly-created pageviews_kafka
stream is stored in this topic. Click the topic to print and see the records flowing through. These records are a copy of the pageviews_kinesis
records.
Navigate to Resources > Kinesis_store > Topics to see that there's a new Kinesis entity called pageviews_enriched
. This is where the streaming data for the newly-created pageviews_enriched
stream is stored. Click that entity to print and see the records flowing through. These records result from the join between pageviews_kinesis
and users_kafka
.
The Kinesis data store has a Kinesis data stream called pageviews
. In DeltaStream, use a CREATE STREAM query, as shown below, to create a called pageviews_kinesis
that is backed by the pageviews
Kinesis stream.
The Kafka data store has a Kafka topic called ds_users
. In DeltaStream, use a CREATE STREAM query shown below to create a called users_kafka
that is backed by the ds_users
Kafka topic.
In DeltaStream, when you define a or you can use it as a source for queries. The example below demonstrates how you can join the pageviews_kinesis
stream and the users_kafka
stream. Simply use these streams as sources in your query; you do not need any additional specifications related to the data stores that back them. DeltaStream keeps this information as metadata with the stream. Behind the scenes, DeltaStream seamlessly reads from both data stores, performs the join, and then outputs the result to the sink stream. Again, use a CREATE STREAM AS SELECT query to create the output stream. Since this joins two streams, this query is an that requires the WITHIN
clause.
Note in the CSAS above that in the WITH
clause you specify the output data store to be kinesis_store
, similar to what you did in the . But even though you're creating a new Kinesis stream, called pageviews_enriched
, you don’t need to provide the topic.shards
property. DeltaStream infers the default value from the left-most source when possible. The sink stream and the left-most source are both backed by Kinesis streams, so pageview_kinesis
’s topic.shards
property is applied to pageviews_enriched
.
pageviews_kinesis
Stream.users_kafka
Stream.CREATE STREAM AS SELECT
query.