LogoLogo
Start Trial
  • Overview
    • What is DeltaStream?
    • Core Concepts
      • Access Control
      • Compute Pools
      • Data Store
      • Database
      • Function
      • Query
      • SQL
      • Visualizing Data Lineage
  • Getting Started
    • Free Trial Quick Start
    • Starting with the Web App
    • Starting with the CLI
  • How do I...?
    • Create and Manage Data Stores
      • Create Data Stores for Streaming Data
      • Explore Data Store and Topic Details
      • Use Multiple Data Stores in Queries
    • Manage Users and User Roles
      • Inviting Users to an Organization
      • Administering Users in your Organization
      • Using the CLI to Manage User Roles
      • Example: Setting Up Custom Roles for Production and Stage
    • Create DeltaStream Objects to Structure Raw Data
    • Use Namespacing for Organizing Data
    • Create and Query Materialized Views
    • Create a Compute Pool to Work with Iceberg
    • Create a Function
    • Secure my Connection to a Data Store
      • Introducing DeltaStream Private Links
      • Creating an AWS Private Link from DeltaStream to your Confluent Kafka Dedicated Cluster
      • Enabling Private Link Connectivity to Confluent Enterprise Cluster and Schema Registry
      • Creating a Private Link from DeltaStream to Amazon MSK
      • Creating a Private Link for RDS Databases
      • Deleting a Private Link
    • Serialize my Data
      • Working with ProtoBuf Serialized Data and DeltaStream Descriptors
      • Working with Avro Serialized Data and Schema Registries
      • Configuring Deserialization Error Handling
  • Integrations
    • Setting up Data Store Integrations
      • AWS S3
      • ClickHouse
      • Confluent Cloud
      • Databricks
      • Iceberg REST Catalog
      • PostgreSQL
      • Snowflake
      • WarpStream
  • Setting up Enterprise Security Integrations
    • Okta SAML Integration
    • Okta SCIM Integration
  • use cases
    • Using an AWS S3 Store as a Source to Feed an MSK Topic
  • Reference
    • Metrics
      • Prometheus Integration
      • Built-In Metrics
      • Custom Metrics in Functions
    • SQL Syntax
      • Data Formats (Serialization)
        • Serializing with JSON
        • Serializing with Primitive Data Types
        • Serializing with Protobuf
      • Data Types
      • Identifiers and Keywords
      • Command
        • ACCEPT INVITATION
        • CAN I
        • COPY DESCRIPTOR_SOURCE
        • COPY FUNCTION_SOURCE
        • DESCRIBE ENTITY
        • DESCRIBE QUERY
        • DESCRIBE QUERY METRICS
        • DESCRIBE QUERY EVENTS
        • DESCRIBE QUERY STATE
        • DESCRIBE RELATION
        • DESCRIBE RELATION COLUMNS
        • DESCRIBE ROLE
        • DESCRIBE SECURITY INTEGRATION
        • DESCRIBE <statement>
        • DESCRIBE STORE
        • DESCRIBE USER
        • GENERATE COLUMNS
        • GENERATE TEMPLATE
        • GRANT OWNERSHIP
        • GRANT PRIVILEGES
        • GRANT ROLE
        • INVITE USER
        • LIST API_TOKENS
        • LIST COMPUTE_POOLS
        • LIST DATABASES
        • LIST DESCRIPTORS
        • LIST DESCRIPTOR_SOURCES
        • LIST ENTITIES
        • LIST FUNCTIONS
        • LIST FUNCTION_SOURCES
        • LIST INVITATIONS
        • LIST METRICS INTEGRATIONS
        • LIST ORGANIZATIONS
        • LIST QUERIES
        • LIST RELATIONS
        • LIST ROLES
        • LIST SCHEMAS
        • LIST SCHEMA_REGISTRIES
        • LIST SECRETS
        • LIST SECURITY INTEGRATIONS
        • LIST STORES
        • LIST USERS
        • PRINT ENTITY
        • REJECT INVITATION
        • REVOKE INVITATION
        • REVOKE PRIVILEGES
        • REVOKE ROLE
        • SET DEFAULT
        • USE
        • START COMPUTE_POOL
        • STOP COMPUTE_POOL
      • DDL
        • ALTER API_TOKEN
        • ALTER SECURITY INTEGRATION
        • CREATE API_TOKEN
        • CREATE CHANGELOG
        • CREATE COMPUTE_POOL
        • CREATE DATABASE
        • CREATE DESCRIPTOR_SOURCE
        • CREATE ENTITY
        • CREATE FUNCTION_SOURCE
        • CREATE FUNCTION
        • CREATE INDEX
        • CREATE METRICS INTEGRATION
        • CREATE ORGANIZATION
        • CREATE ROLE
        • CREATE SCHEMA_REGISTRY
        • CREATE SCHEMA
        • CREATE SECRET
        • CREATE SECURITY INTEGRATION
        • CREATE STORE
        • CREATE STREAM
        • CREATE TABLE
        • DROP API_TOKEN
        • DROP CHANGELOG
        • DROP COMPUTE_POOL
        • DROP DATABASE
        • DROP DESCRIPTOR_SOURCE
        • DROP ENTITY
        • DROP FUNCTION_SOURCE
        • DROP FUNCTION
        • DROP METRICS INTEGRATION
        • DROP RELATION
        • DROP ROLE
        • DROP SCHEMA
        • DROP SCHEMA_REGISTRY
        • DROP SECRET
        • DROP SECURITY INTEGRATION
        • DROP STORE
        • DROP STREAM
        • DROP USER
        • START/STOP COMPUTE_POOL
        • UPDATE COMPUTE_POOL
        • UPDATE ENTITY
        • UPDATE SCHEMA_REGISTRY
        • UPDATE SECRET
        • UPDATE STORE
      • Query
        • APPLICATION
        • Change Data Capture (CDC)
        • CREATE CHANGELOG AS SELECT
        • CREATE STREAM AS SELECT
        • CREATE TABLE AS SELECT
        • Function
          • Built-in Functions
          • Row Metadata Functions
        • INSERT INTO
        • Materialized View
          • CREATE MATERIALIZED VIEW AS
          • SELECT (FROM MATERIALIZED VIEW)
        • Query Name and Version
        • Resume Query
        • RESTART QUERY
        • SELECT
          • FROM
          • JOIN
          • MATCH_RECOGNIZE
          • WITH (Common Table Expression)
        • TERMINATE QUERY
      • Sandbox
        • START SANDBOX
        • DESCRIBE SANDBOX
        • STOP SANDBOX
      • Row Key Definition
    • DeltaStream OpenAPI
      • Deltastream
      • Models
Powered by GitBook
On this page
  • Create a Schema Registry
  • Update Data Store with the Schema Registry
  • Avro-Enabled Printing and Queries
  1. How do I...?
  2. Serialize my Data

Working with Avro Serialized Data and Schema Registries

PreviousWorking with ProtoBuf Serialized Data and DeltaStream DescriptorsNextConfiguring Deserialization Error Handling

Last updated 1 month ago

In streaming data stores such as and , producers send data events as bytes that must be interpreted by the data consumers. The most popular formats for data serialization include , , and . DeltaStream supports all of these.

This article focuses on . It shows how to create and use a to access that are necessary for data serialization/deserialization.

If you have a Data Store with entities whose data records are serialized with Avro, it's likely you also have a schema registry to manage the Avro schemas that are used to serialize and deserialize these data events. To do this in DeltaStream, you create a DeltaStream entity -- which is a wrapper around your schema registry -- and associate it with one or many data stores.

Create a Schema Registry

DeltaStream supports 2 types of schema registries (more are in development):

  1. Confluent Cloud

  2. Confluent Platform

To begin, create a schema registry in either the CLI or the UI SQL page. In the CLI, use the CREATE SCHEMA_REGISTRY command to create a DeltaStream :

CREATE SCHEMA_REGISTRY "ConfluentCloudSR" WITH (
    'type' = CONFLUENT_CLOUD,
    'uris' = 'https://abcd-efghi.us-east-2.aws.confluent.cloud',
    'confluent_cloud.key' = 'fake_key',
    'confluent_cloud.secret' = 'fake_secret'
);

In the above example, you're creating a CONFLUENT_CLOUD type schema registry named ConfluentCloudSR in AWS us-east-2. The uris value provided here is the URI from the Confluent Cloud dashboard, corresponding with the schema registry. Optionally, this schema registry can also have one of the following:

  • a key pair for credentials, which can be supplied with the confluent_cloud.key and confluent_cloud.secret properties

  • the properties.file property (see CREATE SCHEMA_REGISTRY for more details)

When you have defined schema registries, you can list them:

demoDB.public/kafka_store# LIST SCHEMA_REGISTRIES;
+------------------+----------------+--------+--------------------------------------------------+----------+-----------------------------------+-----------------------------------+----------------------+
|       Name       |      Type      |  State |                       Uris                       |   Owner  |             Created At            |             Updated At            |         Path         |
+------------------+----------------+--------+--------------------------------------------------+----------+-----------------------------------+-----------------------------------+----------------------+
| ConfluentCloudSR | ConfluentCloud | ready  | https://abcd-efghi.us-east-2.aws.confluent.cloud | sysadmin | 2025-05-02 16:25:27.771 +0000 UTC | 2025-05-02 16:25:27.771 +0000 UTC | ["ConfluentCloudSR"] |
+------------------+----------------+--------+--------------------------------------------------+----------+-----------------------------------+-----------------------------------+----------------------+

Update Data Store with the Schema Registry

Next, associate that schema registry with the relevant data store.

Note You can attach one schema registry to a data store, but any number of data stores can use a particular schema registry.

You can describe a data store to determine whether it has a schema registry attached:

demoDB.public/kafka_store# DESCRIBE STORE kafka_store;
+-------------+-------------------------------------------------+----------+--------------+------------------+------------------+-----------------+
|  Properties |                      Uri                        |  Details |  Tls Enabled |  Verify Hostname |  Schema Registry |       Path      |
+-------------+-------------------------------------------------+----------+--------------+------------------+------------------+-----------------+
| {}          | abcd.edghijk.kafka.us-east-1.amazonaws.com:9196 | {}       | true         | false            | <null>           | ["kafka_store"] |
+-------------+-------------------------------------------------+----------+--------------+------------------+------------------+-----------------+

When you determine there is no schema registry attached to your data store, (kafka_store, above) you can use the UPDATE STORE DDL to point your data store to the schema registry.

UPDATE STORE kafka_store WITH ( 'schema_registry.name' = ConfluentCloudSR );

When you attach a schema registry to a data store, then each time a DeltaStream command or query requires serializing or deserializing data from the data store’s entity, that data store uses the registry to look up schemas. When you work with Avro serialized data, DeltaStream requires that you attach the schema registry with the relevant Avro schemas to the data store containing that data.

Avro-Enabled Printing and Queries

Now you can successfully run commands such as PRINT ENTITY or write queries with DeltaStream objects that have Avro data formats. Below we have printed the pageviews_avro entity.

demoDB.public/kafka_store# PRINT ENTITY pageviews_avro;
+----------------------+----------------------------------------------------------------------+
| key                  | value                                                                |
+======================+======================================================================+
| {"userid": "User_1"} | {"viewtime": 1746203009403, "userid": "User_1", "pageid": "Page_64"} |
+----------------------+----------------------------------------------------------------------+
| {"userid": "User_4"} | {"viewtime": 1746203010404, "userid": "User_4", "pageid": "Page_45"} |
+----------------------+----------------------------------------------------------------------+
| {"userid": "User_2"} | {"viewtime": 1746203011404, "userid": "User_2", "pageid": "Page_23"} |
+----------------------+----------------------------------------------------------------------+
| {"userid": "User_9"} | {"viewtime": 1746203012404, "userid": "User_9", "pageid": "Page_48"} |
+----------------------+----------------------------------------------------------------------+
| {"userid": "User_7"} | {"viewtime": 1746203013405, "userid": "User_7", "pageid": "Page_60"} |

When you have set up a schema registry, you can easily read or write Avro-formatted data. The query below shows how to easily convert the JSON stream pageviews_json to a stream with an Avro key and value format called pageviews_converted_to_avro. See CREATE STREAM AS SELECT for more details.

CREATE STREAM pageviews_converted_to_avro
WITH (
  'key.format' = 'avro',
  'value.format' = 'avro'
) AS SELECT *
FROM pageviews_json;

When you create DeltaSTream objects using CREATE STREAM AS SELECT or CREATE CHANGELOG AS SELECT, and you specify a key or value format such as Avro for the sink object, DeltaStream automatically generates an Avro schema and adds it to the schema registry attached to the data store. In the example above, two schemas are generated for the entity pageviews_converted_to_avro -- one each for the key and the value. In this way these schemas are available if you ever need to consume from these entities outside of DeltaStream.

Finally, if the entity doesn’t have a descriptor, and the data store doesn’t have a schema registry (or it has a schema registry, but there is no corresponding schema in the registry), DeltaStream tries to deserialize the data in the entity as JSON.

In the case of the PRINT ENTITY command, if an entity in a data store has a , the descriptor is used for deserialization even if the data store has a schema registry. If the entity does not have a descriptor, the data store verifies that the schema registry contains a schema for the entity and then uses it for deserialization.

Apache Kafka
Amazon Kinesis
JSON
ProtoBuf
Apache Avro
Apache Avro
Avro schemas
Schema Registry
Schema Registry
Schema Registry
Protocol Buffers and Descriptors