Working with Avro Serialized Data and Schema Registries

In streaming stores such as Apache Kafka and Amazon Kinesis, producers send data events as bytes that must be interpreted by the data consumers. The most popular formats for data serialization include JSON, ProtoBuf, and Apache Avro. DeltaStream supports all of these.

This tutorial focuses on Apache Avro. It shows how to create and use a Schema Registry to access Avro schemas that are necessary for data serialization/deserialization.

If you have a Store with #entitys whose data records are serialized with Avro, it's likely you also have a schema registry to manage the Avro schemas that are used to serialize and deserialize these data events. To do this in DeltaStream, you create a DeltaStream Schema Registry entity -- which is a wrapper around your schema registry -- and associate it with one or many stores.

Create a Schema Registry

DeltaStream supports 2 types of schema registries (more are in development):

  1. Confluent Cloud

  2. Confluent Platform

To begin, create a schema registry in either the CLI or the UI SQL page Use the CREATE SCHEMA_REGISTRY command to create a DeltaStream Schema Registry:

CREATE SCHEMA_REGISTRY "ConfluentCloudSR" WITH (
    'type' = CONFLUENT_CLOUD,
    'access_region' = "AWS us-east-2",
    'uris' = 'https://abcd-efghi.us-east-2.aws.confluent.cloud',
    'confluent_cloud.key' = 'fake_key',
    'confluent_cloud.secret' = 'fake_secret'
);

In the above example, you're creating a CONFLUENT_CLOUD type schema registry named ConfluentCloudSR in AWS us-east-2. The uris value provided here is the URI from the Confluent Cloud dashboard, corresponding with the schema registry. Optionally, this schema registry can also have one of the following:

  • a key pair for credentials, which can be supplied with the confluent_cloud.key and confluent_cloud.secret properties

  • the properties.file property (see CREATE SCHEMA_REGISTRY for more details)

When you have defined schema registries, you can list them:

demoDB.public/kafka_store# LIST SCHEMA_REGISTRIES;
        Name       |      Type      |                       URI                        |  Owner   |      Created at      |      Updated at
-------------------+----------------+--------------------------------------------------+----------+----------------------+-----------------------
  ConfluentCloudSR | ConfluentCloud | https://abcd-efghi.us-east-2.aws.confluent.cloud | sysadmin | 2022-08-22T16:38:02Z | 2022-08-22T16:38:02Z

Update Store with the Schema Registry

Next, associate that schema registry with the relevant store.

Note You can attach one schema registry to a store, but any number of stores can use a particular schema registry.

You can describe a store to determine whether it has a schema registry attached:

demoDB.public/kafka_store# DESCRIBE STORE kafka_store;
     Name     | Type  | Availability zone | Metadata |                     URIs                     | TLS | Verify host | TLS protocols | Chiphers | Additional egress URIs | Schema registry |  Owner   |      Created at      |      Updated at
--------------+-------+-------------------+----------+----------------------------------------------+-----+-------------+---------------+----------+------------------------+-----------------+----------+----------------------+-----------------------
  kafka_store | Kafka | AWS us-east-2     | {}       | abc-defjk.us-east-2.aws.confluent.cloud:9092 | ✓   |             |               |          |                        |                 | sysadmin | 2023-02-28T01:55:18Z | 2023-02-28T01:55:18Z

When you determine there is no schema registry attached to your store, (kafka_store, above) you can use the UPDATE STORE DDL to point your store to the schema registry.

UPDATE STORE kafka_store WITH ( 'schema_registry.name' = ConfluentCloudSR );

When you attach a schema registry to a store, then each time a DeltaStream command or query requires serializing or deserializing data from the store’s entity, that store uses the registry to look up schemas. When you work with Avro serialized data, DeltaStream requires that you attach the schema registry with the relevant Avro schemas to the store containing that data.

Avro-Enabled Printing and Queries

Now you can successfully run commands such as PRINT ENTITY or write queries with relations that have Avro data formats. Below we have printed the pageviews_avro entity.

demoDB.public/kafka_store# PRINT ENTITY pageviews_avro;
{"userid":"User_1"} | {"viewtime":1660931394412,"userid":"User_1","pageid":"Page_22"}
{"userid":"User_6"} | {"viewtime":1660931395412,"userid":"User_6","pageid":"Page_32"}
{"userid":"User_1"} | {"viewtime":1660931396413,"userid":"User_1","pageid":"Page_96"}

When you have set up a schema registry, you can easily read or write Avro-formatted data. The query below shows how to easily convert the JSON stream pageviews_json to a stream with an Avro key and value format called pageviews_converted_to_avro. See CREATE STREAM AS SELECT for more details.

CREATE STREAM pageviews_converted_to_avro WITH (
  'value.format' = 'avro', 'key.format' = 'AVRO'
) AS 
SELECT * FROM pageviews_json;

When you create relations using CREATE STREAM AS SELECT or CREATE CHANGELOG AS SELECT, and you specify a key or value format such as Avro for the sink relation, DeltaStream automatically generates an Avro schema and adds it to the schema registry attached to the store. In the example above, two schemas are generated for the entity pageviews_converted_to_avro -- one each for the key and the value. In this way these schemas are available if you ever need to consume from these entities outside of DeltaStream.

Finally, if the entity doesn’t have a descriptor, and the store doesn’t have a schema registry (or it has a schema registry, but there is no corresponding schema in the registry), DeltaStream tries to deserialize the data in the entity as JSON.

Last updated