Using Multiple Stores in Queries

In DeltaStream, your streaming data resides in a Store. Apache Kafka and Amazon Kinesis are two examples of such stores. DeltaStream reads data from a streaming store, performs the desired computation, and writes the results of the computation to the same store (or to another, if you wish).

This tutorial demonstrates how you can easily move data from one of your stores to another. It also shows how you can perform joins between data in different stores.

Before you begin, you should create stores and relations in DeltaStream, if you haven't already. To do so, please review these tutorials:

Setup

For this tutorial, have the following set up:

  • Two stores—a Kafka Store called MSK and a Kinesis Store called kinesis_store

  • A stream in kinesis_store called pageviews_kinesis

  • A stream in MSK called users_kafka

In the lefthand navigation, click Resources ( ) to display a list of stores. Here you'll find your Kafka and Kinesis stores.

Kafka and Kinesis Stores.

The Kinesis Store has a Kinesis data stream called pageviews. In DeltaStream, use a CREATE STREAM query, as shown below, to create a Stream called pageviews_kinesis that is backed by the pageviews Kinesis Stream.

Creating the pageviews_kinesisStream.
Kinesis Stream Results

The Kafka store has a Kafka topic called ds_users. In DeltaStream, use a CREATE STREAM query shown below to create a Stream called users_kafka that is backed by the ds_users Kafka topic.

Creating the users_kafkaStream.
Kafka Stream Results

With your two stores established, each with an associated relation, you can write your queries.

Move Data from One Store to Another

To migrate data out of Kinesis and into Kafka, you can easily create a new stream that selects all the columns from an existing stream. Then you can specify the store that backs the new stream.

In the setup here, there's a stream belonging to your Kinesis store called pageviews_kinesis. Use a CREATE STREAM AS SELECT query (CSAS) to create a new stream that is essentially a copy of the pageviews_kinesis stream in the Kafka store. Label this new stream pageviews_kafka.

Creating a Kafka Stream that is a Copy of a Kinesis Stream.

Note the CSAS query above includes the store property in the WITH clause of the sink stream, with a value of kafka_store. A CSAS query creates a new stream; specifying this property informs the query about which store should back this new stream. Leave the store property empty, and the query defaults to using the session’s current store. You must include the topic.replicas and topic.partitions properties; these are necessary when creating a new Kafka topic. If the source in this query was a stream backed by a Kafka topic, then DeltaStream would use the source’s topic.replicas and topic.partitions values by default. But here you're reading from a Kinesis-backed stream, and DeltaStream cannot infer the value of these properties; you must set them explicitly in the query.

Navigate to Resources > Kafka Store to see that a new Kafka topic, called pageviews_kafka, has been created and displays in the topic list. The streaming data for the newly-created pageviews_kafka stream is stored in this topic. Click the topic to print and see the records flowing through. These records are a copy of the pageviews_kinesis records.

The New Stream Copy.

Join Data from Sources Belonging to Different Stores

Using a CREATE STREAM AS SELECT Query.

Note in the CSAS above that you specify the output store to be kinesis_store in the WITH clause, similar to what you did in the section above. But even though you're creating a new Kinesis stream, called pageviews_enriched, you don’t need to provide the topic.shards property. DeltaStream infers the default value from the left-most source when possible. The sink stream and the left-most source are both backed by Kinesis streams, so pageview_kinesis’s topic.shards property is applied to pageviews_enriched.

Navigate to Resources > Kinesis_store > Topics to see that there's a new Kinesis entity called pageviews_enriched. This is where the streaming data for the newly-created pageviews_enriched stream is stored. Click on that entity to print and see the records flowing through. These records result from the join between pageviews_kinesis and users_kafka.

Last updated