# MongoDB

## Description

Change Data Capture (CDC) enables you to ingest data changes from collections in a MongoDB [store](https://docs.deltastream.io/overview/core-concepts/store "mention") and publish them to a sink. In a CDC pipeline, data changes in the source collection are captured at the document-level operations (`INSERT`, `DELETE`, `UPDATE`). Then they're pushed to the downstream sink in real-time.

## CDC Pipeline Overview

### Source Record Structure

DeltaStream uses Flink CDC connector to capture changes in the source collections. The following JSON skeleton can be used to represent a given change:

```
{
 "op": ...,
 "ts_ms": ...,
 "before": {...},
 "after": {...},
 "system": {...}
}
```

* **op** (String)**:** Describes the document-level operation that caused a change in the source. It can be `insert`, `update` or `delete`.
* **ts\_ms** (BigInt) **:** Shows the timestamp at which the change event is processed by the CDC pipeline, in the source.
* **before** and **after**: These specify the state of the document at which the change occurred, before and/or after the change, depending on the semantics of the operation. When defined as `BYTES` or `STRING`, the entire document is captured as raw bytes or as a string, exactly as produced by the CDC library. If all documents share the same schema, meaning they have identical field names and data types, you can define the `before` and `after` columns as a `STRUCT`, where the fields represent the document’s structure.
* **system**: Shows the metadata in the source about the operation and includes these fields:
  * **db** (String): The MongoDB database name containing the source collection.
  * **collection** (String): The MongoDB source collection name.
  * **\_id** (String): The unique identifier of the document.

{% hint style="info" %}
When `before` and `after` are defined as `BYTES` or `STRING` in the CDC source DDL, for various operations (Insert, Update, Delete), the MongoDB CDC source captures the before/after states of MongoDB documents as full serialized documents. These values are not partial or projected structures and contain the complete document image as emitted by MongoDB change streams and serialized using the Extended JSON format used by the MongoDB connector and Debezium.

Any further processin&#x67;**, s**uch as parsing the Extended JSO&#x4E;**,** extracting individual field&#x73;**,** is the responsibility of downstream consumers. Applications consuming the CDC stream should deserialize these values as needed to access specific key–value pairs in MongoDB documents or apply custom transformations.
{% endhint %}

### Requirements of a MongoDB Source Collection

#### Enabling Pre/Post images for a collection

To reliably capture **before/after images** for update and delete operations in a MongoDB collection, the collection must have the MongoDB feature **`changeStreamPreAndPostImages`** enabled. This is required for CDC pipelines that rely on [Debezium](https://debezium.io/) to produce full change events.

**Why is this setting required?**

MongoDB Change Streams emit different kinds of events depending on the operation type.\
By default:

* Insert events already provide the full document.
* Update events only contain the change and do not include the full pre-update or post-update document image.
* Delete events only contain the `_id` of the deleted document.

For many CDC use cases, such as stateful transformations or audit requirements, you need the complete before and/or after images. MongoDB exposes this capability only when **`changeStreamPreAndPostImages`** is enabled at the collection level.

Once enabled, MongoDB emits the full pre-update and/or post-update document image based on the operation type.

**How to enable the setting?**

You must enable this option in the MongoDB instance per collection which is used as a CDC source. This can be done through the [MongoDB command interface](https://www.mongodb.com/resources/basics/command-line-tools#mongodb-shell).

```js
db.runCommand({
  collMod: "your_collection_name",
  changeStreamPreAndPostImages: { enabled: true }
});
```

To verify that the setting is enabled, run below command and look for the `changeStreamPreAndPostImages` field in the response.

```javascript
db.getCollectionInfos({ name: "your_collection_name" })
```

## How to Create a CDC Pipeline

You create a CDC pipeline in 2 steps:

1. Define the CDC source
2. Define the sink and the query to write CDC changes into it

### Step 1. Define DDL for the CDC source

Using [create-stream](https://docs.deltastream.io/reference/sql-syntax/ddl/create-stream "mention"), define a [#stream](https://docs.deltastream.io/overview/core-concepts/databases#stream "mention") backed by one or more MongoDB source collections. The properties below are used as CDC property in the `WITH` clause in [create-stream](https://docs.deltastream.io/reference/sql-syntax/ddl/create-stream "mention") when creating a MongoDB CDC source:

| Parameter Name                | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |
| ----------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `mongodb.cdc.db.list`         | <p>Comma separated list of the database names in MongoDB to capture changes from. It can be the name of a single MongoDB database, or multiple database names separated by <code>,</code> or a regular expression to monitor multiple databases matching that.</p><p><br><strong>Required:</strong> Yes<br><strong>Type:</strong> String<br><strong>Valid values:</strong> See <a data-mention href="../../command/list-entities">list-entities</a></p>                                                                                                            |
| `mongodb.cdc.collection.list` | <p>Comma separated list of the collection names in the MongoDB to capture changes from. Note that the collection name should follow the format: <code>\<database>.\<collection></code> It can be the name of a single MongoDB collection, or multiple collection names separated by <code>,</code> or a regular expression, matching fully-qualified collection identifiers.<br><br><strong>Required:</strong> Yes<br><strong>Type:</strong> String<br><strong>Valid values:</strong> See <a data-mention href="../../command/list-entities">list-entities</a></p> |
| `mongodb.connection.option`   | <p>The ampersand-separated connection options for the MongoDB connection. For more details check <a href="https://www.mongodb.com/docs/manual/reference/connection-string/#std-label-connections-connection-options">here</a>.</p><p><br><strong>Required:</strong> No<br><strong>Type:</strong> String</p>                                                                                                                                                                                                                                                        |

### Step 1. Define the CDC Query

Using [create-stream-as](https://docs.deltastream.io/reference/sql-syntax/query/create-stream-as "mention"), define a [#stream](https://docs.deltastream.io/overview/core-concepts/databases#stream "mention") backed by the sink [#streaming-entity](https://docs.deltastream.io/overview/core-concepts/store#streaming-entity "mention") and the query to insert CDC records into it.

#### Sink parameters

The CDC sink can write into an existing entity in the sink store or create a new entity. The below parameters are used in the WITH clause for the sink in CSAS to create the desired behavior:

| Parameter Name     | Description                                                                                                                                                                                                                                                                                                                                                                                                                  |
| ------------------ | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `topic`            | <p>Name of the <a data-mention href="../../../../../overview/core-concepts/store#streaming-entity">#streaming-entity</a> into which the data for the CDC sink is written. If the entity doesn't exist, the system creates an entity with this name in the corresponding <code>store</code>.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> Lowercase sink name<br><strong>Type:</strong> String</p> |
| `topic.partitions` | <p>The number of partitions to use when creating the sink entity, if applicable. If the entity already exists, this value must equal the number of partitions in the existing entity.<br></p><p><strong>Required:</strong> Yes, unless topic already exists<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                |
| `topic.replicas`   | <p>The number of replicas to use when creating the sink entity, if applicable. If the entity already exists, this value must equal the number of replicas in the existing entity.<br></p><p><strong>Required:</strong> Yes, unless topic already exists<br><strong>Type:</strong> Integer<br><strong>Valid values:</strong> \[1, ...]</p>                                                                                    |

#### Source parameters

The source stream in the CDC pipeline captures the changes from MongoDB collection(s) in the source [store](https://docs.deltastream.io/overview/core-concepts/store "mention"). Here are the parameters you can specify in the WITH clause for the CDC source in the query:

| Parameter Name                       | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                    |
| ------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `scan.startup.option`                | <p>Startup mode for MongoDB CDC consumer. Note that If the <code>timestamp</code> mode is used, <code>mongodb.cdc.startup.timestamp</code> property is required (see below).</p><p><br><strong>Required:</strong> No<br><strong>Default value:</strong> <code>initial</code><br><strong>Type:</strong> String<br><strong>Valid values:</strong> <code>initial</code> , <code>latest-offset</code> , <code>timestamp</code></p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| `mongodb.cdc.startup.timestamp`      | <p>This property is required only if the <code>scan.startup.option</code> property is set to <code>timestamp</code> to skip the initial snapshot phase and start reading oplog events from a specific timestamp.</p><p><br><strong>Required:</strong> Yes when <code>scan.startup.option</code> is <code>timestamp</code><br><strong>Type:</strong> BigInt</p>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
| `mongodb.cdc.cursor.timeout.disable` | <p>By default, the MongoDB CDC connector opens its cursor with no cursor timeout to prevent the server from automatically closing the cursor when it becomes idle, which is the recommended behavior for most long-running production CDC pipelines.</p><p>However, some MongoDB deployments disallow no-timeout cursors for operational or security reasons. In such cases, set this property to <strong><code>false</code></strong> to force the connector to use a standard cursor that is subject to MongoDB’s idle timeout rules.<br></p><p><strong>Important Considerations</strong></p><ul><li>This can introduce interruptions or restarts in CDC pipelines that read from high-volume or large oplog/change-stream sources.</li><li>If the cursor times out, the connector must re-establish the stream using the last known resume token.<br>In environments with large or fast-moving oplogs, this may result in increased latency, restarts, or even resume token invalidation.<br></li></ul><p><strong>Required:</strong> No<br><strong>Type:</strong> String</p> |

#### Passthrough Source parameters

Passthrough CDC properties are supported for the MongoDB CDC source stream. These properties must be specified using the `cdc.` prefix in their names and must be assigned a string value. They are listed in the table below:

<table><thead><tr><th width="414.2314453125">Parameter Name (with prefix)</th><th>Description</th></tr></thead><tbody><tr><td><code>cdc.split.size</code></td><td>The number of rows of collection snapshot. Captured collections are split into multiple splits, when doing the initial snapshot and read in batches.<br><br><strong>Required:</strong> No<br><strong>Type:</strong> String</td></tr><tr><td><code>cdc.fetch.size</code></td><td>The maximum fetch size for per poll when reading snapshot.<br><br><strong>Required:</strong> No<br><strong>Type:</strong> String</td></tr><tr><td><code>cdc.scan.incremental.close-idle-reader.enabled</code></td><td><p>Whether to close idle readers at the end of the snapshot phase.</p><p><br><strong>Required:</strong> No</p><p><strong>Default value:</strong> <code>'false'</code><br><strong>Type:</strong> String</p></td></tr><tr><td><code>cdc.heartbeat.interval.ms</code></td><td><p>The interval of sending heartbeat event for tracing the latest changes offsets.</p><p><br><strong>Required:</strong> No<br><strong>Type:</strong> String</p></td></tr><tr><td><code>cdc.scan.newly-added-table.enabled</code></td><td>Enables automatic discovery of new collections when resuming a CDC pipeline. When set to <code>true</code>, any collection that match the configured collection list patterns, but were not part of the previous run, are automatically detected. When set to <code>false</code> (default), the pipeline only continues reading the collections that were part of the original state and ignores any newly created matching collections.<br><br><strong>Required:</strong> No<br><strong>Default value:</strong> <code>false</code><br><strong>Type:</strong> Boolean</td></tr></tbody></table>

## Example

**Create CDC pipeline capturing changes from a MongoDB collection**

Assume a MongoDB [store](https://docs.deltastream.io/overview/core-concepts/store "mention") is defined with the name `mongostore` , and it contains a collection named `pageviews` within  the `report` database.

The following DDL statement creates a CDC Stream named `pageviews_cdc,` backed by that collection. This stream captures all changes (inserts, updates, and deletes) occurring in the `pageviews` collection:&#x20;

```sql
CREATE STREAM pageviews_cdc(
  op VARCHAR,
  ts_ms BIGINT,
  `before` VARCHAR, 
  `after`  VARCHAR, 
  `source` STRUCT<`db` VARCHAR, `collection` VARCHAR, `_id` VARCHAR>)
WITH (
  'store'='mongostore', 
  'value.format'='json',
  'mongodb.cdc.db.list'='report',
  'mongodb.cdc.collection.list'='report.pageviews');
```

The `pageviews_cdc` stream serves as the source of the CDC pipeline. Whenever an `INSERT`, `DELETE`, or `UPDATE` occurs in the underlying MongoDB collection, the CDC connector emits a corresponding change event into this stream. If the pipeline requires additional source properties, such as a specific startup mode, they can be supplied through the `WITH` clause on `pageviews_cdc`. To write these captured changes to an output stream, define a sink stream using a **CREATE STREAM AS SELECT (CSAS)** statement:

```sql
CREATE STREAM cdc_sink WITH (
  'store' = 'kafka_store',
  'topic.partitions' = 1,
  'topic.replicas' = 1) AS 
SELECT * FROM pageviews_cdc;
```
