Configuring Deserialization Error Handling

Data quality is critical with streaming and stream processing applications. A single bad record can cause multiple issues. Ideally, all records in a message broker are well formed. However, issues can occur, such as the accidental publishing of unrecognized payloads into a Kafka topic, or a memory error causing corruption of a record. Streaming applications that attempt to consume these records fail to deserialize them; in turn this usually causes the application to fail. This tutorial shows different ways you can configure your queries to handle such failures in DeltaStream.

To configure deserialization error handling, provide the source.deserialization.error.handling property in the WITH clause for a query's sources. Choose from 3 modes (documentation for these: #_source_properties):

  • TERMINATE

  • IGNORE

  • IGNORE_AND_LOG

The following sections detail each of these modes for a scenario in which a malformed record made its way into a Kafka topic, which will be the source for one of your DSQL queries.

Assume the following pageviews stream defined by the DDL below:

CREATE STREAM pageviews (
    viewtime BIGINT,
    userid VARCHAR,
    pageid VARCHAR
) WITH ('store'='kafka_store', 'topic'='pageviews', 'value.format'='JSON');

The pageviews topic contains the following records:

{ "viewtime":1660931394412, "userid":"User_1", "pageid":"Page_22" }
{ "viewtime":"malformed_viewtime", "userid":"User_6", "pageid":"Page_32" }
{ "viewtime":1660931396413, "userid":"User_2", "pageid":"Page_96" }

TERMINATE Mode

The TERMINATE mode is the default option, and the strictest. Any records the query fails to deserialize cause the query to terminate.

Here's a query using the source.deserialization.error.handling property default:

CREATE STREAM pageviews_copy AS
SELECT * FROM pageviews;

Here's a query explicitly setting the source.deserialization.error.handling property to TERMINATE:

CREATE STREAM pageviews_copy AS
SELECT * FROM pageviews
WITH ('source.deserialization.error.handling'='TERMINATE');

The output of these queries contains the following records:

{ "viewtime":1660931394412, "userid":"User_1", "pageid":"Page_22" }

Note that only one of the three input records shows up in the output. As the first record is well-formed JSON and matches the value types specified in the pageviews DDL, the query successfully deserializes that record and produces the output.

The second record, however, has an unexpected value for the viewtime column: the string "malformed_viewtime" when the query is expected to deserialize that field as BIGINT. This causes a deserialization error for the source. As per the TERMINATE error handling mode, this error causes the entire query to fail and halts any further progress.

IGNORE Mode

Use the IGNORE mode to instruct queries to continue if they encounter deserialization errors. With the IGNORE mode, the system skips any records that fail to be deserialized from a query's source.

Query setting the source.deserialization.error.handling property to IGNORE:

CREATE STREAM pageviews_copy AS
SELECT * FROM pageviews
WITH ('source.deserialization.error.handling'='IGNORE');

The output of this query contains the following records:

{ "viewtime":1660931394412, "userid":"User_1", "pageid":"Page_22" }
{ "viewtime":1660931396413, "userid":"User_2", "pageid":"Page_96" }

Two of the three input records display in the output. As discussed in the previous section, the first and third records are well-formed JSON records that the query successfully deserialized. The second record failed deserialization. As the source uses the IGNORE error handling mode, the second record is dropped and does not display in the output. The query meanwhile continues processing records, so the third record displays in the output. Unlike with the TERMINATE mode, the query doesn't fail; it continues to process any additional records in the source.

IGNORE_AND_LOG Mode

The IGNORE_AND_LOG mode is similar to the IGNORE mode in that it skips any records the query fails to deserialize. However, in this mode the skipped records are also logged. It's useful to log these records if you wish to know which payloads were problematic, so that you might replay or act upon any skipped records.

The error log for this mode is specified by the required source.deserialization.error.log.topic and optional source.deserialization.error.log.store properties, plus any store-specific properties.

DeltaStream uses the error log topic, if that topic already exists. If the topic does not exist, DeltaStream creates it according to the additional store-specific properties listed below. If you do not specify the error log store, then by default DeltaStream uses the store in which the source relation belongs.

The additional properties for Kafka:

  • source.deserialization.error.log.topic.partitions

  • source.deserialization.error.log.topic.replicas

The additional properties for Kinesis:

  • source.deserialization.error.log.topic.shards

Kafka source relation

This subsection focuses on how IGNORE_AND_LOG behaves for a query with the source relation backed by a Kafka topic. You can reuse your pageviews stream in this example, as it is associated with the Kafka store named kafka_store.

Here's a query setting the source.deserialization.error.handling property to IGNORE_AND_LOG:

CREATE STREAM pageviews_copy AS
SELECT * FROM pageviews
WITH (
    'source.deserialization.error.handling'='IGNORE_ANG_LOG', 
    'source.deserialization.error.log.topic'='pageviews_copy_errlog',
    'source.deserialization.error.log.topic.partitions'='1',
    'source.deserialization.error.log.topic.replicas'='3'
);

If in setting the source.deserialization.error.log.store property you don't specify a particular store for the error log topic to use , the default is to use the same store as the pageviews stream -- in this case, kafka_store.

The output of this query contains the following records:

{ "viewtime":1660931394412, "userid":"User_1", "pageid":"Page_22" }
{ "viewtime":1660931396413, "userid":"User_2", "pageid":"Page_96" }

The error log topic pageviews_copy_errlog contains the following records:

{
  "partition": 0,
  "offset": 1,
  "topic": "pageviews",
  "store": "store_test",
  // "key" field would also be here if the source record contained a key
  "value": "eyAidmlld3RpbWUiOiJtYWxmb3JtZWRfdmlld3RpbWUiLCAidXNlcmlkIjoiVXNlcl82IiwgInBhZ2VpZCI6IlBhZ2VfMzIiIH0=",
  "timestamp": 1693341135576
}

Similarly to the IGNORE case, only the first and third input records display in the output. The second record failed deserialization and is skipped in the output, but information about this record displays in the error log. This information includes common metadata such as the original record's partition, offset, topic, store, and timestamp.

Note that the value field has been encoded with base64. Since the query could not deserialize the original record, the Kafka record's entire value byte array is encoded using base64 and that string is included in the JSON error log payload. In this example, the original Kafka record doesn't contain a key; if it did, it would also be encoded using base64 and there would be a similar key field in the JSON payload.

To validate that the value in the error log is what you expect, manually decode the field:

$ echo "eyAidmlld3RpbWUiOiJtYWxmb3JtZWRfdmlld3RpbWUiLCAidXNlcmlkIjoiVXNlcl82IiwgInBhZ2VpZCI6IlBhZ2VfMzIiIH0=" | base64 -d | jq .
{
  "viewtime": "malformed_viewtime",
  "userid": "User_6",
  "pageid": "Page_32"
}

Kinesis source relation

This subsection focuses on how IGNORE_AND_LOG behaves for a query with the source relation backed by a Kinesis stream. Consider a new pageviews_kinesis stream created by the following DDL:

CREATE STREAM pageviews_kinesis (
    viewtime BIGINT,
    userid VARCHAR,
    pageid VARCHAR
) WITH ('store'='kinesis_store', 'topic'='pageviews', 'value.format'='JSON');

Here's a query setting the source.deserialization.error.handling property to IGNORE_AND_LOG:

CREATE STREAM pageviews_copy AS
SELECT * FROM pageviews_kinesis
WITH (
    'source.deserialization.error.handling'='IGNORE_ANG_LOG', 
    'source.deserialization.error.log.store'='alt_kinesis_store', 
    'source.deserialization.error.log.topic'='pageviews_copy_errlog',
    'source.deserialization.error.log.topic.shards'='1'
);

Here you're setting the source.deserialization.error.log.store to alt_kinesis_store to specify a particular store for the error log topic.

The output of this query contains the following records:

{ "viewtime":1660931394412, "userid":"User_1", "pageid":"Page_22" }
{ "viewtime":1660931396413, "userid":"User_2", "pageid":"Page_96" }

The error log topic pageviews_copy_errlog contains the following records:

{
  "partitionKey": "some_partition_key",
  "seqNum": "49644062644797676009557574763794547223012948382276648962",
  "shardId": "shardId-000000000000",
  "store": "store_test",
  "stream": "pageviews_kinesis",
  "data": "eyAidmlld3RpbWUiOiJtYWxmb3JtZWRfdmlld3RpbWUiLCAidXNlcmlkIjoiVXNlcl82IiwgInBhZ2VpZCI6IlBhZ2VfMzIiIH0=",
  "timestamp": 1693341135576
}

As with the IGNORE case, only the first and third input records display in the output. The second record failed deserialization and is skipped in the output, but information about that record appears in the error log. This includes common metadata such as the original record's partition key, sequence number, shard ID, store, and timestamp.

Note that the data field has been encoded with base64. Since the query could not deserialize the original record, the Kinesis stream's entire data byte array is encoded using base64 and that string is included in the JSON error log payload.

To validate that the data in the error log is what you expect, manually decode the field:

$ echo "eyAidmlld3RpbWUiOiJtYWxmb3JtZWRfdmlld3RpbWUiLCAidXNlcmlkIjoiVXNlcl82IiwgInBhZ2VpZCI6IlBhZ2VfMzIiIH0=" | base64 -d | jq .
{
  "viewtime": "malformed_viewtime",
  "userid": "User_6",
  "pageid": "Page_32"
}

Last updated