Configuring Deserialization Error Handling
With streaming and stream processing applications, data quality is very important and a single bad record can cause a number of issues. In the ideal case, all records in a message broker are well formed. However, issues can occur such as the accidentally publishing of unrecognized payloads into a Kafka topic or a memory error causing corrupting a record for example. Streaming applications that attempt to consume these records will fail to deserialize them, causing the application to fail in most cases. In this tutorial, we'll show the different ways users can configure their queries to handle such failures in DeltaStream.
Configuring deserialization error handling is done by providing the source.deserialization.error.handling
property in the WITH
clause for a query's sources. There are 3 modes to choose from (documentation for these #_source_properties):
TERMINATE
IGNORE
IGNORE_AND_LOG
In the following sections, we'll cover each of these modes in detail for the scenario where a malformed record has made its way into a Kafka topic, which will be the source for one of our DSQL queries.
Assume we have the following pageviews
Stream defined by the DDL below:
The pageviews
topic contains the following records:
TERMINATE
The TERMINATE
mode is the default option and the strictest. With this mode, any records that the query fails to deserialize will cause the query to terminate.
Query using the source.deserialization.error.handling
property default:
Query explicitly setting the source.deserialization.error.handling
property to TERMINATE
:
The output of these queries will contain the following records:
Note that only one of the three input records shows up in the output. Since the first record is well formed JSON and matches the value types specified in the pageviews
DDL, the query is successfully able to deserialize that record and produce the output. The second record however has an unexpected value for the viewtime
column, the string "malformed_viewtime"
when the query is expected to deserialize that field as BIGINT
. This causes a deserialization error for the source and since we chose the TERMINATE
error handling mode, this error will cause the entire query to fail and stop any further progress.
IGNORE
The IGNORE
mode can be used to instruct queries to continue if they encounter deserialization errors. Any records that fail to be deserialized from a query's source with IGNORE
mode will simply be skipped.
Query setting the source.deserialization.error.handling
property to IGNORE
:
The output of this query will contain the following records:
Two of the three input records shows up in the output. As explained in the previous section, the first and third records are well formed JSON records that the query successfully deserialized. The second record failed deserialization. Since the source is using the IGNORE
error handling mode, the second record is dropped and doesn't appear in the output. The query meanwhile continues processing records, hence the third record appearing in the output. Note that unlike the TERMINATE
mode, the query doesn't fail and continues to process any additional records that appear in the source.
IGNORE_AND_LOG
The IGNORE_AND_LOG
mode is similar to the IGNORE
mode in that it will skip any records that the query fails deserialize, but in this mode the skipped records are also logged. Logging these records can be useful for users who want to know which payloads were problematic and potentially replay or act upon any skipped records.
The error log for this mode is specified by the required source.deserialization.error.log.topic
and optional source.deserialization.error.log.store
properties, as well as store specific properties. If the error log topic already exists, then it will be used. If the topic does not exist, then the topic will be created according to the additional store specific properties listed below. If the error log store is not specified, then the store in which the source relation belongs will be used by default.
The additional properties for Kafka:
source.deserialization.error.log.topic.partitions
source.deserialization.error.log.topic.replicas
The additional properties for Kinesis:
source.deserialization.error.log.topic.shards
Kafka source Relation
This subsection will focus on how IGNORE_AND_LOG
behaves for query with the source Relation being backed by a Kafka topic. We can reuse our pageviews
Stream in this example as it is associated with the Kafka Store named kafka_store
.
Query setting the source.deserialization.error.handling
property to IGNORE_AND_LOG
:
Since we are not specifying a particular store for the error log topic to use by setting the source.deserialization.error.log.store
property, we default to using the same store as the pageviews
Stream, kafka_store
.
The output of this query will contain the following records:
The error log topic pageviews_copy_errlog
will contain the following records:
Similarly to the IGNORE
case, only the first and third input records shows up in the output. The second record failed deserialization and is skipped in the output, but information about the second record appears in the error log. Common metadata such as the original record's partition, offset, topic, store, and timestamp are included. Note that the value
field has been encoded with base64. Since the query could not deserialize the original record, the Kafka record's entire value byte array is encoded using base64 and that String is included in the JSON error log payload. In this example, the original Kafka record doesn't contain a key, but if it were to contain a key, it would also be encoded using base64 and there would be a similar key
field in the JSON payload.
We can validate that the value
in the error log is what we expect by manually decoding the field:
Kinesis source Relation
This subsection will focus on how IGNORE_AND_LOG
behaves for query with the source Relation being backed by a Kinesis stream. Consider a new pageviews_kinesis
Stream created by the following DDL:
Query setting the source.deserialization.error.handling
property to IGNORE_AND_LOG
:
Notice that here we are specifying a particular store for the error log topic to use by setting the source.deserialization.error.log.store
to alt_kinesis_store
.
The output of this query will contain the following records:
The error log topic pageviews_copy_errlog
will contain the following records:
Similarly to the IGNORE
case, only the first and third input records shows up in the output. The second record failed deserialization and is skipped in the output, but information about the second record appears in the error log. Common metadata such as the original record's partition key, sequence number, shard ID, store, and timestamp are included. Note that the datae
field has been encoded with base64. Since the query could not deserialize the original record, the Kinesis stream's entire data byte array is encoded using base64 and that String is included in the JSON error log payload.
We can validate that the data
in the error log is what we expect by manually decoding the field:
Last updated