Working with ProtoBuf Serialized Data and DeltaStream Descriptors

In streaming stores such as Apache Kafka and Amazon Kinesis, producers send data events as Bytes that need to be interpreted by the consumers of the data. The most popular formats for data serialization include JSON, ProtoBuf, and Apache Avro, all of which DeltaStream supports. In this tutorial, we will focus on ProtoBuf and how to create and use a Descriptor for data serialization/deserialization.
Suppose you have a Store with Topics whose data records are serialized with ProtoBuf. This means you also have ProtoBuf messages and file descriptors to serialize and deserialize these data events. In DeltaStream, you can create a DeltaStream Broken link entity, which is a wrapper around your ProtoBuf file descriptor, and associate it with any Topics that require the ProtoBuf file descriptor for serialization/deserialization.

Create a Descriptor

When working with ProtoBuf, you first define a ProtoBuf message and then generate a ProtoBuf file descriptor from that message. This ProtoBuf file descriptor is then used by DeltaStream to generate any necessary code for serializing and deserializing data that conforms to the ProtoBuf message’s structure. Let’s assume for this tutorial that our ProtoBuf message, which lives in the file p.proto, looks like the following:
message Pageviews {
int64 viewtime = 1;
string userid = 2;
string pageid = 3;
We can generate a ProtoBuf descriptor in the file pageviews_value.desc from this ProtoBuf message in the file p.proto (see ProtoBuf documentation for more details):
$ protoc --descriptor_set_out pageviews_value.desc p.proto
Now we can create a DeltaStream Broken link from this ProtoBuf file descriptor. In the CLI, this can be done using the CREATE DESCRIPTOR_SOURCE DDL. In the UI, we can add a descriptor with the following steps:
  1. 1.
    From the left menu, click Descriptors > UPLOAD:
You can select Upload, which is located in the upper right corner of the interface.
  1. 2.
    After choosing the file containing your ProtoBuf file descriptor (pageviews_value.desc in our example), you have the option to name your descriptor. Then press UPLOAD.
You can name your descriptor before uploading it.
  1. 3.
    Done! Now that we have successfully added a Broken link, when you click on it you can see the message names contained in the Descriptor (e.g. Pageviews in this example).
Your Descriptor is now added.

Update a Topic with the Descriptor

After we created a Broken link, we can now associate it with any relevant Topics that need this Descriptor for serialization/deserialization. To do this in the CLI, see UPDATE TOPIC. In the UI, we can add Descriptors to Topics with the following steps:
  1. 1.
    From the left menu, click Stores > Select the Store > Select the ProtoBuf Topic. In our example, we select the KafkaStore containing the ProtoBuf Topic pageviews_pb:
  1. 2.
    Navigate to the DESCRIPTOR tab. Note that since this is a Kafka Store, which allows for keys, this page allows the user to assign a Key Descriptor and/or a Value Descriptor. For a Kinesis Store and other Stores that don’t allow for keys, there would only be a Value Descriptor option available.
The Descriptor tab, located within Stores.
  1. 3.
    Select the dropdown menu, and choose the relevant Descriptors to assign to this Topic. In our example, we choose to assign the Pageviews Descriptor for the Value Descriptor and leave the Key Descriptor empty.
Assigning the PageviewsDescriptor.
  1. 4.
    Done! We’ve assigned our Descriptor to the relevant Topic, and now we can successfully run commands, such as PRINT TOPIC, and run queries with Relations using this Topic.

Queries with Descriptors and ProtoBuf

With Descriptors added to our Topic, we can now create a Relation for our Topic specifying a key.format or value.format of PROTOBUF as shown in the below DDL example. See CREATE STREAM for more details.
CREATE STREAM "pageviewsPB" (viewtime BIGINT, userid VARCHAR, pageid VARCHAR)
WITH ('topic'='pageviews_pb', 'value.format'='PROTOBUF');
We can also create new Relations using CREATE STREAM AS SELECT or CREATE CHANGELOG AS SELECT specifying PROTOBUF as the data format for the sink Relation. In the below example, we show how we can easily convert the JSON Stream pageviews_json to a Stream with a ProtoBuf key and value format called pageviews_converted_to_proto.
CREATE STREAM pageviews_converted_to_proto WITH (
'value.format' = 'protobuf', 'key.format' = 'PROTOBUF'
) AS
SELECT * FROM pageviews_json;
When the sink Relation has a key or value format of PROTOBUF, the Broken link for the sink Relation will automatically be created and assigned to the Topic. You can easily view your Descriptors in the left menu’s Descriptors tab or use the LIST DESCRIPTORS command in the CLI. Then if you want to use the Descriptor outside of DeltaStream, you can download the ProtoBuf descriptor by using the COPY DESCRIPTOR_SOURCE command.
One last quick note. In the case of the PRINT TOPIC command, if a Topic in a Store has a _descriptor, the Descriptor will be used for deserialization even if the Store has a Schema Registry. If the Topic does not have a Descriptor, the Store will check if the Schema Registry contains a Schema for the Topic and use it for deserialization. Finally, if the Topic doesn’t have a Descriptor and the Store doesn’t have a Schema Registry—or it has a Schema Registry, but there is no corresponding schema in the registry—the data in the Topic will attempted to be deserialized as JSON.