Working with Avro Serialized Data and Schema Registries

In streaming stores such as Apache Kafka and Amazon Kinesis, producers send data events as Bytes that need to be interpreted by the consumers of the data. The most popular formats for data serialization include JSON, ProtoBuf, and Apache Avro, all of which DeltaStream supports. In this tutorial, we will focus on Apache Avro and how to create and use a Schema Registry to access Avro schemas that are necessary for data serialization/deserialization.

Suppose you have a Store with Entitys whose data records are serialized with Avro. Most likely, you also have a schema registry to manage the Avro schemas used to serialize and deserialize these data events. In DeltaStream, you can create a DeltaStream Schema Registry entity, which is a wrapper around your schema registry, and associate it with one or many Stores.

Create a Schema Registry

There are two types of Schema Registries that are currently supported: Confluent Cloud and Confluent Platform (more to come!). To create a Schema Registry in either the CLI or the UI SQL page, use the CREATE SCHEMA_REGISTRY command to create a DeltaStream Schema Registry:

CREATE SCHEMA_REGISTRY "ConfluentCloudSR" WITH (
    'type' = CONFLUENT_CLOUD,
    'access_region' = "AWS us-east-2",
    'uris' = 'https://abcd-efghi.us-east-2.aws.confluent.cloud',
    'confluent_cloud.key' = 'fake_key',
    'confluent_cloud.secret' = 'fake_secret'
);

In the above example, we are creating a CONFLUENT CLOUD type Schema Registry named ConfluentCloudSR in AWS us-east-2. The uris value provided is the URI from the Confluent Cloud dashboard, corresponding with the Schema Registry. Optionally, this Schema Registry can also have a key pair for credentials, which can be supplied with the confluent_cloud.key and confluent_cloud.secret properties, or the properties.file property (see CREATE SCHEMA_REGISTRY for more details).

Once Schema Registries are defined, they can be listed:

demoDB.public/kafka_store# LIST SCHEMA_REGISTRIES;
        Name       |      Type      |                       URI                        |  Owner   |      Created at      |      Updated at
-------------------+----------------+--------------------------------------------------+----------+----------------------+-----------------------
  ConfluentCloudSR | ConfluentCloud | https://abcd-efghi.us-east-2.aws.confluent.cloud | sysadmin | 2022-08-22T16:38:02Z | 2022-08-22T16:38:02Z

Update Store with the Schema Registry

Now that we’ve created a Schema Registry, the next step would be to associate that Schema Registry with the relevant Store. Note that at most one Schema Registry can be attached to a Store, but any number of Stores can use a particular Schema Registry. You can describe a Store to see if there is a Schema Registry attached to the Store:

demoDB.public/kafka_store# DESCRIBE STORE kafka_store;
     Name     | Type  | Availability zone | Metadata |                     URIs                     | TLS | Verify host | TLS protocols | Chiphers | Additional egress URIs | Schema registry |  Owner   |      Created at      |      Updated at
--------------+-------+-------------------+----------+----------------------------------------------+-----+-------------+---------------+----------+------------------------+-----------------+----------+----------------------+-----------------------
  kafka_store | Kafka | AWS us-east-2     | {}       | abc-defjk.us-east-2.aws.confluent.cloud:9092 | ✓   |             |               |          |                        |                 | sysadmin | 2023-02-28T01:55:18Z | 2023-02-28T01:55:18Z

Now that we've determined that there is no Schema Registry attached to our Store, kafka_store, we can use the UPDATE STORE DDL to point our Store to the Schema Registry.

UPDATE STORE kafka_store WITH ( 'schema_registry.name' = ConfluentCloudSR );

Once a Schema Registry has been attached to a Store, that Store will attempt to use the Schema Registry to look up Schemas when a DeltaStream command or query requires serializing or deserializing data from the Store’s Entity. When working with Avro serialized data, we require the Schema Registry with the relevant Avro schemas be attached to the Store containing that data.

Avro-Enabled Printing and Queries

Now we can successfully run commands such as PRINT ENTITY or write queries with relations that have Avro data formats. Below we are printing the pageviews_avro Entity.

demoDB.public/kafka_store# PRINT ENTITY pageviews_avro;
{"userid":"User_1"} | {"viewtime":1660931394412,"userid":"User_1","pageid":"Page_22"}
{"userid":"User_6"} | {"viewtime":1660931395412,"userid":"User_6","pageid":"Page_32"}
{"userid":"User_1"} | {"viewtime":1660931396413,"userid":"User_1","pageid":"Page_96"}

With a Schema Registry set up, we can easily read or write Avro formatted data. In the query below, we show how to easily convert the JSON Stream pageviews_json to a Stream with an Avro key and value format called pageviews_converted_to_avro. See CREATE STREAM AS SELECT for more details.

CREATE STREAM pageviews_converted_to_avro WITH (
  'value.format' = 'avro', 'key.format' = 'AVRO'
) AS 
SELECT * FROM pageviews_json;

When creating Relations using CREATE STREAM AS SELECT or CREATE CHANGELOG AS SELECT and specifying a key or value format such as Avro for the sink Relation, an Avro schema will be automatically generated and added to the Schema Registry attached to the Store. In the example above, two Schemas will be generated for the Entity pageviews_converted_to_avro — one for the key and one for the value. This way, these Schemas will be available if you ever need to consume from these Entities outside of DeltaStream.

In the case of the PRINT ENTITY command, if an Entity in a Store has a Protocol Buffers and Descriptors, the Descriptor will be used for deserialization even if the Store has a Schema Registry. If the Entity does not have a Descriptor, the Store will check if the Schema Registry contains a Schema for the Entity and use it for deserialization. Finally, if the Entity doesn’t have a Descriptor and the Store doesn’t have a Schema Registry (or it has a Schema Registry, but there is no corresponding Schema in the Registry), the data in the Entity will attempted to be deserialized as JSON.

Last updated