Working with ProtoBuf Serialized Data and DeltaStream Descriptors

In streaming data stores such as Apache Kafka and Amazon Kinesis, producers send data events as bytes that consumers of the data must interpret. The most popular formats for data serialization include JSON, ProtoBuf, and Apache Avro, and DeltaStream supports all of these. This article focuses on ProtoBuf and how to create and use a Descriptor for data serialization/deserialization.

Begin with a Data Store with entities whose data records are serialized with ProtoBuf. This means you also have ProtoBuf messages and file descriptors to serialize and deserialize these data events. In DeltaStream, you can create a DeltaStream Descriptor -- which is a wrapper around your ProtoBuf file descriptor -- and associate it with any Data Store that requires the ProtoBuf file descriptor for serialization/deserialization.

Create a descriptor

When working with ProtoBuf, you first define a ProtoBuf message and then generate a ProtoBuf file descriptor from that message. DeltaStream then uses this ProtoBuf file descriptor to generate any code necessary for serializing and deserializing data that conforms to the ProtoBuf message structure.

In this example, the ProtoBuf message, which lives in the file p.proto, resembles the following:

message Pageviews {
  int64 viewtime = 1;
  string userid = 2;
  string pageid = 3;
}

You can generate a ProtoBuf descriptor in the file pageviews_value.desc from this ProtoBuf message in the file p.proto (see ProtoBuf documentation for more details):

$ protoc --descriptor_set_out pageviews_value.desc p.proto

Now create a DeltaStream Descriptor from this ProtoBuf file descriptor. In the CLI you can do this using the CREATE DESCRIPTOR_SOURCE DDL. In the UI, follow these steps to add a descriptor:

  1. In the lefthand navigation click Resources ( ). When the Resources page displays, click Descriptor Sources and then click + Add Descriptor Source.

  2. Choose the file containing your ProtoBuf file descriptor (pageviews-descriptor in this example). When prompted, name your descriptor, and then click UPLOAD.

  3. Now you can click on the descriptor to view the message names it contains (in this example, Pageviews).

Update an Entity with the Descriptor

Now you can associate your descriptor with any relevant Data Store that needs it for serialization/deserialization. To do this in the CLI, see UPDATE ENTITY. For the UI, follow these steps:

  1. In the lefthand navigation, click Resources ( ) to display the Resources page

  2. Click the data store you want, and when the data store page displays, click the topic you want. In the example below, we selected the KafkaStore containing the ProtoBuf entity pageviews_key_descriptor:

  3. As this is a Kafka data store, it allows for keys and enables you to assign a Key Descriptor and/or a Value Descriptor. For data stores that don’t allow for keys, including Kinesis data stores, you can only add a value descriptor.

  4. Click + Add Descriptors, and from the menus that display click the relevant descriptors to assign to this entity. In this example the Pageviews value descriptor is assigned, and the Key Descriptor is empty.

  5. That's it. You've assigned your descriptor to the relevant entity; now you can use this entity to run commands (such as PRINT ENTITY) and queries with DeltaStream objects.

For more information, see Serializing with Protobuf.

Queries with Descriptors and ProtoBuf

With descriptors added, you can now create a DeltaStream object that specifies a key.format or value.format of PROTOBUF as shown in the below DDL example. See CREATE STREAM for more details.

CREATE STREAM "pageviewsPB" (viewtime BIGINT, userid VARCHAR, pageid VARCHAR)
    WITH ('topic'='pageviews_pb', 'value.format'='PROTOBUF');

You can also create new objects using CREATE STREAM AS SELECT or CREATE CHANGELOG AS SELECT, specifying PROTOBUF as the data format for the sink object. The below example shows how you can easily convert the JSON stream pageviews_json to a stream called pageviews_converted_to_proto with a ProtoBuf key and value format.

CREATE STREAM pageviews_converted_to_proto WITH (
  'value.format' = 'protobuf', 'key.format' = 'PROTOBUF'
) AS 
SELECT * FROM pageviews_json;

When the sink object has a key or value format of PROTOBUF, the descriptor for the sink object is automatically created and assigned to the entity. You can easily view your descriptors in the Descriptors tab or use the LIST DESCRIPTORS command in the CLI. To use the descriptor outside of DeltaStream, you can download the ProtoBuf descriptor via the COPY DESCRIPTOR_SOURCE command.

Finally, with regard to the PRINT ENTITY command:

  • If an entity in a data store has a descriptor, the descriptor is for deserialization even if the data store has a schema registry.

  • If the entity does not have a descriptor, the data store checks whether the schema registry contains a schema for the entity, and uses it for deserialization.

  • If the entity doesn’t have a descriptor and the data store doesn’t have a schema registry—or it has a schema registry, but there is no corresponding schema in the registry—DeltaStream attempts to deserialize the data in the entity as JSON.

Last updated