Configuring Deserialization Error Handling
Last updated
Last updated
Data quality is critical with streaming and stream processing applications. A single bad record can cause multiple issues. Ideally, all records in a message broker are well formed. However, issues can occur, such as the accidental publishing of unrecognized payloads into a Kafka topic, or a memory error causing corruption of a record. Streaming applications that attempt to consume these records fail to deserialize them; in turn this usually causes the application to fail. This tutorial shows different ways you can configure your queries to handle such failures in DeltaStream.
To configure deserialization error handling, provide the source.deserialization.error.handling
property in the WITH
clause for a query's sources. Choose from 3 modes (documentation for these: ):
TERMINATE
IGNORE
IGNORE_AND_LOG
The following sections detail each of these modes for a scenario in which a malformed record made its way into a Kafka topic, which will be the source for one of your DSQL queries.
Assume the following pageviews
stream defined by the DDL below:
The pageviews
topic contains the following records:
The TERMINATE
mode is the default option, and the strictest. Any records the query fails to deserialize cause the query to terminate.
Here's a query using the source.deserialization.error.handling
property default:
Here's a query explicitly setting the source.deserialization.error.handling
property to TERMINATE
:
The output of these queries contains the following records:
Note that only one of the three input records shows up in the output. As the first record is well-formed JSON and matches the value types specified in the pageviews
DDL, the query successfully deserializes that record and produces the output.
The second record, however, has an unexpected value for the viewtime
column: the string "malformed_viewtime"
when the query is expected to deserialize that field as BIGINT
. This causes a deserialization error for the source. As per the TERMINATE
error handling mode, this error causes the entire query to fail and halts any further progress.
Use the IGNORE
mode to instruct queries to continue if they encounter deserialization errors. With the IGNORE
mode, the system skips any records that fail to be deserialized from a query's source.
Query setting the source.deserialization.error.handling
property to IGNORE
:
The output of this query contains the following records:
Two of the three input records display in the output. As discussed in the previous section, the first and third records are well-formed JSON records that the query successfully deserialized. The second record failed deserialization. As the source uses the IGNORE
error handling mode, the second record is dropped and does not display in the output. The query meanwhile continues processing records, so the third record displays in the output. Unlike with the TERMINATE
mode, the query doesn't fail; it continues to process any additional records in the source.
The IGNORE_AND_LOG
mode is similar to the IGNORE
mode in that it skips any records the query fails to deserialize. However, in this mode the skipped records are also logged. It's useful to log these records if you wish to know which payloads were problematic, so that you might replay or act upon any skipped records.
The error log for this mode is specified by the required source.deserialization.error.log.topic
and optional source.deserialization.error.log.store
properties, plus any store-specific properties.
DeltaStream uses the error log topic, if that topic already exists. If the topic does not exist, DeltaStream creates it according to the additional store-specific properties listed below. If you do not specify the error log store, then by default DeltaStream uses the store in which the source relation belongs.
The additional properties for Kafka:
source.deserialization.error.log.topic.partitions
source.deserialization.error.log.topic.replicas
The additional properties for Kinesis:
source.deserialization.error.log.topic.shards
This subsection focuses on how IGNORE_AND_LOG
behaves for a query with the source relation backed by a Kafka topic. You can reuse your pageviews
stream in this example, as it is associated with the Kafka store named kafka_store
.
Here's a query setting the source.deserialization.error.handling
property to IGNORE_AND_LOG
:
If in setting the source.deserialization.error.log.store
property you don't specify a particular store for the error log topic to use , the default is to use the same store as the pageviews
stream -- in this case, kafka_store
.
The output of this query contains the following records:
The error log topic pageviews_copy_errlog
contains the following records:
Similarly to the IGNORE
case, only the first and third input records display in the output. The second record failed deserialization and is skipped in the output, but information about this record displays in the error log. This information includes common metadata such as the original record's partition, offset, topic, store, and timestamp.
Note that the value
field has been encoded with base64. Since the query could not deserialize the original record, the Kafka record's entire value byte array is encoded using base64 and that string is included in the JSON error log payload. In this example, the original Kafka record doesn't contain a key; if it did, it would also be encoded using base64 and there would be a similar key
field in the JSON payload.
To validate that the value
in the error log is what you expect, manually decode the field:
This subsection focuses on how IGNORE_AND_LOG
behaves for a query with the source relation backed by a Kinesis stream. Consider a new pageviews_kinesis
stream created by the following DDL:
Here's a query setting the source.deserialization.error.handling
property to IGNORE_AND_LOG
:
Here you're setting the source.deserialization.error.log.store
to alt_kinesis_store
to specify a particular store for the error log topic.
The output of this query contains the following records:
The error log topic pageviews_copy_errlog
contains the following records:
As with the IGNORE
case, only the first and third input records display in the output. The second record failed deserialization and is skipped in the output, but information about that record appears in the error log. This includes common metadata such as the original record's partition key, sequence number, shard ID, store, and timestamp.
Note that the data
field has been encoded with base64. Since the query could not deserialize the original record, the Kinesis stream's entire data byte array is encoded using base64 and that string is included in the JSON error log payload.
To validate that the data
in the error log is what you expect, manually decode the field: