# Working with Avro Serialized Data and Schema Registries

In streaming data stores such as [Apache Kafka](https://kafka.apache.org/) and [Amazon Kinesis](https://aws.amazon.com/kinesis/), producers send data events as bytes that must be interpreted by the data consumers. The most popular formats for data serialization include [JSON](https://www.json.org/json-en.html), [ProtoBuf](https://protobuf.dev/), and [Apache Avro](https://avro.apache.org/docs/). DeltaStream supports all of these.

This article focuses on [Apache Avro](https://avro.apache.org/docs/). It shows how to create and use a [Data Store](/overview/core-concepts/store.md#schema-registry) to access [Avro schemas](https://avro.apache.org/docs/#schemas) that are necessary for data serialization/deserialization.

If you have a [Data Store](/overview/core-concepts/store.md) with **entities** whose data records are serialized with Avro, it's likely you also have a schema registry to manage the Avro schemas that are used to serialize and deserialize these data events. To do this in DeltaStream, you create a DeltaStream [Data Store](/overview/core-concepts/store.md#schema-registry) entity -- which is a wrapper around your schema registry -- and associate it with one or many data stores.

## Create a Schema Registry

DeltaStream supports 2 types of schema registries (more are in development):

1. Confluent Cloud
2. Confluent Platform

To begin, create a schema registry in either the CLI or the UI SQL page. In the CLI, use the [CREATE SCHEMA\_REGISTRY](/reference/sql-syntax/ddl/create-schema_registry.md) command to create a DeltaStream [Data Store](/overview/core-concepts/store.md#schema-registry):

```sql
CREATE SCHEMA_REGISTRY "ConfluentCloudSR" WITH (
    'type' = CONFLUENT_CLOUD,
    'uris' = 'https://abcd-efghi.us-east-2.aws.confluent.cloud',
    'confluent_cloud.key' = 'fake_key',
    'confluent_cloud.secret' = 'fake_secret'
);
```

In the above example, you're creating a `CONFLUENT_CLOUD` type schema registry named `ConfluentCloudSR` in `AWS us-east-2`. The `uris` value provided here is the URI from the Confluent Cloud dashboard, corresponding with the schema registry. Optionally, this schema registry can also have one of the following:

* a key pair for credentials, which can be supplied with the `confluent_cloud.key` and `confluent_cloud.secret` properties
* the `properties.file` property (see [CREATE SCHEMA\_REGISTRY](/reference/sql-syntax/ddl/create-schema_registry.md) for more details)

When you have defined schema registries, you can list them:

```sh
demoDB.public/kafka_store# LIST SCHEMA_REGISTRIES;
+------------------+----------------+--------+--------------------------------------------------+----------+-----------------------------------+-----------------------------------+----------------------+
|       Name       |      Type      |  State |                       Uris                       |   Owner  |             Created At            |             Updated At            |         Path         |
+------------------+----------------+--------+--------------------------------------------------+----------+-----------------------------------+-----------------------------------+----------------------+
| ConfluentCloudSR | ConfluentCloud | ready  | https://abcd-efghi.us-east-2.aws.confluent.cloud | sysadmin | 2025-05-02 16:25:27.771 +0000 UTC | 2025-05-02 16:25:27.771 +0000 UTC | ["ConfluentCloudSR"] |
+------------------+----------------+--------+--------------------------------------------------+----------+-----------------------------------+-----------------------------------+----------------------+
```

## Update Data Store with the Schema Registry

Next, associate that schema registry with the relevant data store.

{% hint style="info" %}
**Note** You can attach one schema registry to a data store, but any number of data stores can use a particular schema registry.
{% endhint %}

You can describe a data store to determine whether it has a schema registry attached:

```sh
demoDB.public/kafka_store# DESCRIBE STORE kafka_store;
+-------------+-------------------------------------------------+----------+--------------+------------------+------------------+-----------------+
|  Properties |                      Uri                        |  Details |  Tls Enabled |  Verify Hostname |  Schema Registry |       Path      |
+-------------+-------------------------------------------------+----------+--------------+------------------+------------------+-----------------+
| {}          | abcd.edghijk.kafka.us-east-1.amazonaws.com:9196 | {}       | true         | false            | <null>           | ["kafka_store"] |
+-------------+-------------------------------------------------+----------+--------------+------------------+------------------+-----------------+
```

When you determine there is no schema registry attached to your data store, (`kafka_store`, above) you can use the [UPDATE STORE](/reference/sql-syntax/ddl/update-store.md) DDL to point your data store to the schema registry.

```sql
UPDATE STORE kafka_store WITH ( 'schema_registry.name' = ConfluentCloudSR );
```

When you attach a schema registry to a data store, then each time a DeltaStream command or query requires serializing or deserializing data from the data store’s entity, that data store uses the registry to look up schemas. When you work with Avro serialized data, DeltaStream requires that you attach the schema registry with the relevant Avro schemas to the data store containing that data.

## Avro-Enabled Printing and Queries

Now you can successfully run commands such as [PRINT ENTITY](/reference/sql-syntax/command/print-entity.md) or write queries with DeltaStream objects that have Avro data formats. Below we have printed the `pageviews_avro` entity.

```sh
demoDB.public/kafka_store# PRINT ENTITY pageviews_avro;
+----------------------+----------------------------------------------------------------------+
| key                  | value                                                                |
+======================+======================================================================+
| {"userid": "User_1"} | {"viewtime": 1746203009403, "userid": "User_1", "pageid": "Page_64"} |
+----------------------+----------------------------------------------------------------------+
| {"userid": "User_4"} | {"viewtime": 1746203010404, "userid": "User_4", "pageid": "Page_45"} |
+----------------------+----------------------------------------------------------------------+
| {"userid": "User_2"} | {"viewtime": 1746203011404, "userid": "User_2", "pageid": "Page_23"} |
+----------------------+----------------------------------------------------------------------+
| {"userid": "User_9"} | {"viewtime": 1746203012404, "userid": "User_9", "pageid": "Page_48"} |
+----------------------+----------------------------------------------------------------------+
| {"userid": "User_7"} | {"viewtime": 1746203013405, "userid": "User_7", "pageid": "Page_60"} |
```

When you have set up a schema registry, you can easily read or write Avro-formatted data. The query below shows how to easily convert the JSON stream `pageviews_json` to a stream with an Avro key and value format called `pageviews_converted_to_avro`. See [CREATE STREAM AS SELECT](/reference/sql-syntax/query/create-stream-as.md) for more details.

```sql
CREATE STREAM pageviews_converted_to_avro
WITH (
  'key.format' = 'avro',
  'value.format' = 'avro'
) AS SELECT *
FROM pageviews_json;
```

When you create DeltaSTream objects using [CREATE STREAM AS SELECT](/reference/sql-syntax/query/create-stream-as.md) or [CREATE CHANGELOG AS SELECT](/reference/sql-syntax/query/create-changelog-as.md), and you specify a key or value format such as Avro for the sink object, DeltaStream automatically generates an Avro schema and adds it to the schema registry attached to the data store. In the example above, two schemas are generated for the entity `pageviews_converted_to_avro` -- one each for the key and the value. In this way these schemas are available if you ever need to consume from these entities outside of DeltaStream.

In the case of the `PRINT ENTITY` command, if an entity in a data store has a [Data Formats (Serialization)](/reference/sql-syntax/data-format-serialization.md#protocol-buffers-and-descriptors), the descriptor is used for deserialization even if the data store has a schema registry. If the entity does not have a descriptor, the data store verifies that the schema registry contains a schema for the entity and then uses it for deserialization.

Finally, if the entity doesn’t have a descriptor, and the data store doesn’t have a schema registry (or it has a schema registry, but there is no corresponding schema in the registry), DeltaStream tries to deserialize the data in the entity as JSON.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.deltastream.io/how-do-i.../serialization/working-with-avro-serialized-data-and-schema-registries.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
