MongoDB
Description
Change Data Capture (CDC) enables you to ingest data changes from collections in a MongoDB Data Store and publish them to a sink. In a CDC pipeline, data changes in the source collection are captured at the document-level operations (INSERT, DELETE, UPDATE). Then they're pushed to the downstream sink in real-time.
CDC Pipeline Overview
Source Record Structure
DeltaStream uses Flink CDC connector to capture changes in the source collections. The following JSON skeleton can be used to represent a given change:
{
"op": ...,
"ts_ms": ...,
"before": {...},
"after": {...},
"system": {...}
}op (String): Describes the document-level operation that caused a change in the source. It can be
insert,updateordelete.ts_ms (BigInt) : Shows the timestamp at which the change event is processed by the CDC pipeline, in the source.
before and after (BYTES or STRING): These specify the state of the document at which the change occurred, before and/or after the change, depending on the semantics of the operation.
system: Shows the metadata in the source about the operation and includes these fields:
db (String): The MongoDB database name containing the source collection.
collection (String): The MongoDB source collection name.
_id (String): The unique identifier of the document.
Requirements of a MongoDB Source Collection
Enabling Pre/Post images for a collection
To reliably capture before/after images for update and delete operations in a MongoDB collection, the collection must have the MongoDB feature changeStreamPreAndPostImages enabled. This is required for CDC pipelines that rely on Debezium to produce full change events.
Why is this setting required?
MongoDB Change Streams emit different kinds of events depending on the operation type. By default:
Insert events already provide the full document.
Update events only contain the change and do not include the full pre-update or post-update document image.
Delete events only contain the
_idof the deleted document.
For many CDC use cases, such as stateful transformations or audit requirements, you need the complete before and/or after images. MongoDB exposes this capability only when changeStreamPreAndPostImages is enabled at the collection level.
Once enabled, MongoDB emits the full pre-update and/or post-update document image based on the operation type.
How to enable the setting?
You must enable this option in the MongoDB instance per collection which is used as a CDC source. This can be done through the MongoDB command interface.
db.runCommand({
collMod: "your_collection_name",
changeStreamPreAndPostImages: { enabled: true }
});To verify that the setting is enabled, run below command and look for the changeStreamPreAndPostImages field in the response.
db.getCollectionInfos({ name: "your_collection_name" })How to Create a CDC Pipeline
You create a CDC pipeline in 2 steps:
Define the CDC source
Define the sink and the query to write CDC changes into it
Step 1. Define DDL for the CDC source
Using CREATE STREAM, define a Stream backed by one or more MongoDB source collections. The properties below are used as CDC property in the WITH clause in CREATE STREAM when creating a MongoDB CDC source:
mongodb.cdc.db.list
Comma separated list of the database names in MongoDB to capture changes from. It can be the name of a single MongoDB database, or multiple database names separated by , or a regular expression to monitor multiple databases matching that.
Required: Yes Type: String Valid values: See LIST ENTITIES
mongodb.cdc.collection.list
Comma separated list of the collection names in the MongoDB to capture changes from. Note that the collection name should follow the format: <database>.<collection> It can be the name of a single MongoDB collection, or multiple collection names separated by , or a regular expression, matching fully-qualified collection identifiers.
Required: Yes
Type: String
Valid values: See LIST ENTITIES
mongodb.connection.option
The ampersand-separated connection options for the MongoDB connection. For more details check here.
Required: No Type: String
Step 1. Define the CDC Query
Using CREATE STREAM AS SELECT, define a Stream backed by the sink Streaming Entity and the query to insert CDC records into it.
Sink parameters
The CDC sink can write into an existing entity in the sink store or create a new entity. The below parameters are used in the WITH clause for the sink in CSAS to create the desired behavior:
topic
Name of the Streaming Entity into which the data for the CDC sink is written. If the entity doesn't exist, the system creates an entity with this name in the corresponding store.
Required: No
Default value: Lowercase sink name
Type: String
topic.partitions
The number of partitions to use when creating the sink entity, if applicable. If the entity already exists, this value must equal the number of partitions in the existing entity.
Required: Yes, unless topic already exists Type: Integer Valid values: [1, ...]
topic.replicas
The number of replicas to use when creating the sink entity, if applicable. If the entity already exists, this value must equal the number of replicas in the existing entity.
Required: Yes, unless topic already exists Type: Integer Valid values: [1, ...]
Source parameters
The source stream in the CDC pipeline captures the changes from MongoDB collection(s) in the source Data Store. Here are the parameters you can specify in the WITH clause for the CDC source in the query:
mongodb.startup.option
Startup mode for MongoDB CDC consumer. If not specified, the initial mode is used. Note that:
If the
timestampmode is used,mongodb.startup.timestampproperty is required (see below).If the
offsetmode is used,mongodb.startup.offset.fileandmongodb.startup.offset.positionproperties are required (see below).
Required: No
Type: String
Valid values: initial , snapshot , earliest , latest , committed , timestamp , offset
mongodb.startup.timestamp
This property is required only if the startup option is set to timestamp to skip the initial snapshot phase and start reading oplog events from a specific timestamp.
Required: Yes when startup option is timestamp
Type: BigInt
mongodb.startup.offset.file
This property is required only if the startup option is set to offset to skip the initial snapshot phase and start reading oplog events from a specific offset. This property points to a file that contains serialized MongoDB resume tokens.
Required: Yes when startup option is offset
Type: String
mongodb.startup.offset.position
This property is required only if the startup option is set to offset to skip the initial snapshot phase and start reading oplog events from a specific offset. This property shows the line number (0-based) of the resume token in the offset file (see above) that should be used as the starting point.
Required: Yes when startup option is offset
Type: Integer
mongodb.cdc.cursor.timeout.disable
By default, the MongoDB CDC connector opens its cursor with no cursor timeout to prevent the server from automatically closing the cursor when it becomes idle, which is the recommended behavior for most long-running production CDC pipelines.
However, some MongoDB deployments disallow no-timeout cursors for operational or security reasons. In such cases, set this property to false to force the connector to use a standard cursor that is subject to MongoDB’s idle timeout rules.
Important Considerations
This can introduce interruptions or restarts in CDC pipelines that read from high-volume or large oplog/change-stream sources.
If the cursor times out, the connector must re-establish the stream using the last known resume token. In environments with large or fast-moving oplogs, this may result in increased latency, restarts, or even resume token invalidation.
Required: No Type: String
Example
Create CDC pipeline capturing changes from a MongoDB collection
Assume a MongoDB Data Store is defined with the name mongostore , and it contains a collection named pageviews within the report database.
The following DDL statement creates a CDC Stream named pageviews_cdc, backed by that collection. This stream captures all changes (inserts, updates, and deletes) occurring in the pageviews collection:
CREATE STREAM pageviews_cdc(
op VARCHAR,
ts_ms BIGINT,
`before` VARCHAR,
`after` VARCHAR,
`source` STRUCT<`db` VARCHAR, `collection` VARCHAR, `_id` VARCHAR>)
WITH (
'store'='mongostore',
'value.format'='json',
'mongodb.cdc.db.list'='report',
'mongodb.cdc.collection.list'='report.pageviews');The pageviews_cdc stream serves as the source of the CDC pipeline. Whenever an INSERT, DELETE, or UPDATE occurs in the underlying MongoDB collection, the CDC connector emits a corresponding change event into this stream. If the pipeline requires additional source properties, such as a specific startup mode, they can be supplied through the WITH clause on pageviews_cdc. To write these captured changes to an output stream, define a sink stream using a CREATE STREAM AS SELECT (CSAS) statement:
CREATE STREAM cdc_sink WITH (
'store' = 'kafka_store',
'topic.partitions' = 1,
'topic.replicas' = 1) AS
SELECT * FROM pageviews_cdc;Last updated

