Oracle

Description

Change Data Capture (CDC) enables you to ingest data changes from tables in an Oracle Data Store and publish them to a sink. In a CDC pipeline, data changes in the source relational table are captured at the row-level operations (INSERT, DELETE, UPDATE) based on the table's PRIMARY KEY. Then they're pushed to the downstream sink in real-time.

circle-info

Note Oracle stores support CDC flows, and changes can be captured from Oracle tables and streamed into DeltaStream for real-time processing.

How a CDC Pipeline Works

Source Record Structure

DeltaStream uses Debeziumarrow-up-right to capture changes in the source relational table via Oracle LogMiner. This means when you define a CDC source backed by a given table in an ORACLE store, you can use this JSON skeleton:

{
 "op": ...,
 "ts_ms": ...,
 "before": {...},
 "after": {...},
 "source": {...}
}
  • op: Describes the row-level operation that caused a change in the source. It can be c for CREATE, u for UPDATE, d for DELETE or r for READ.

  • ts_ms: Shows the time at which the change event is processed by the CDC pipeline, in the source.

  • before and after: These specify the state of the row at which the change occurred, before and/or after the change, depending on the semantics of the operation.

  • source: Shows the metadata in the source about the operation. Some of its included fields are:

    • db (String): The Oracle database name containing the source table.

    • schema (String): The Oracle schema name containing the source table.

    • table (String): The Oracle source table name.

    • scn (BigInt): The Oracle System Change Number (SCN) of the change.

Requirements for an Oracle Source Store

DeltaStream uses Oracle LogMiner to stream changes from the source relational table.

  • When you create the source Oracle Data Store:

    • the Oracle instance must be running in ARCHIVELOG mode, which is required for LogMiner to access historical redo data.

    • supplemental logging must be enabled at the database level (at minimum, minimal supplemental logging). For tables captured by CDC, enable supplemental logging for all columns to ensure complete before/after images.

    • the username you provide must have sufficient privileges to use Oracle LogMiner and read change events from the source tables (e.g. CREATE SESSION, SELECT on source tables, LOGMINING, SELECT ANY TRANSACTION, etc.).

    • the URI must use the oracle:// scheme and include exactly one host and port (e.g. oracle://oracle-host:1521).

    • oracle.service_name, oracle.username, and oracle.password are required.

To verify that ARCHIVELOG mode is enabled in the Oracle instance, connect as SYSDBA and run:

To enable minimal supplemental logging at the database level:

To enable supplemental logging for all columns on a specific table:

circle-info

Note For more information on Oracle LogMiner and supplemental logging, please see:

How to Create a CDC Pipeline

You create a CDC pipeline in 2 steps:

  1. Define the CDC source

  2. Define the sink and the query to write CDC changes into it

Step 1. Define DDL for the CDC source

Using CREATE STREAM, define a Stream backed by the source relational table. The parameters below are used as CDC parameters in the WITH clause in CREATE STREAM:

Parameter Name
Description

oracle.cdc.table.list

Comma-separated list of fully qualified source tables to capture using CDC. Each entry must be specified as schema_name.table_name. If a table name contains special characters (such as a dot) or uppercase letters, enclose it in double quotes — for example, HR."orders.v1". Regex patterns are supported to match multiple tables dynamically (for example, HR.ORDERS_.* matches all tables in the HR schema with names starting with ORDERS_).

Note: This property can be specified either in the DDL when defining the CDC source, or as a source property at query time. If it is provided in both places, the value specified at query time takes precedence.

Required: No Type: String Valid values: See LIST ENTITIES

value.format

Format of the CDC record coming from the source.

Required: Yes Type: String Valid values: JSON

store

Name of the ORACLE store that hosts the relational table backing the CDC source.

Required: Yes Type: String Valid values: See LIST STORES

Step 2. Define the CREATE STREAM AS SELECT (CSAS) for CDC Sink Stream and Query

Using CREATE STREAM AS SELECT, define a Stream backed by the sink Streaming Entity and the query to insert CDC records into it.

Sink parameters

The CDC sink can write into an existing entity in the sink store or create a new entity. The below parameters are used in the WITH clause for the sink in CSAS to create the desired behavior:

Parameter Name
Description

topic

Name of the Streaming Entity into which the data for the CDC sink is written. If the entity doesn't exist, the system creates an entity with this name in the corresponding store. Required: No Default value: Lowercase sink name Type: String

topic.partitions

The number of partitions to use when creating the sink entity, if applicable. If the entity already exists, this value must equal the number of partitions in the existing entity.

Required: Yes, unless topic already exists Type: Integer Valid values: [1, ...]

topic.replicas

The number of replicas to use when creating the sink entity, if applicable. If the entity already exists, this value must equal the number of replicas in the existing entity.

Required: Yes, unless topic already exists Type: Integer Valid values: [1, ...]

Source parameters

The source stream in the CDC pipeline captures the changes from a relational table in the source ORACLE Data Store. Here are the parameters you can specify in the WITH clause for the source in CSAS:

Parameter Name
Description

scan.startup.option

Startup mode for Oracle CDC consumer when starting. Required: No Default value: initial Type: String Valid values: initial, latest-offset

oracle.cdc.table.list

Comma-separated list of fully qualified source tables to capture using CDC. Each entry must be specified as schema_name.table_name. If a table name contains special characters (such as a dot) or uppercase letters, enclose it in double quotes — for example, HR."orders.v1". Regex patterns are supported to match multiple tables dynamically (for example, HR.ORDERS_.* matches all tables in the HR schema with names starting with ORDERS_).

Note: This property can be specified either in the DDL when defining the CDC source, or as a source property at query time. If it is provided in both places, the value specified at query time takes precedence.

Required: No Type: String Valid values: See LIST ENTITIES

Passthrough Source parameters

Passthrough CDC properties

Passthrough CDC properties are supported for the CDC source stream. These properties must be specified using the cdc. prefix in their names and must be assigned a string value. They are listed in the table below. Refer to the CDC connector optionsarrow-up-right for more details.

Parameter Name (with prefix)
Description

cdc.split.size

The number of rows of table snapshot. Captured tables are split into multiple splits when read the snapshot of table. Required: No Default value: '1000' Type: String

cdc.fetch.size

The maximum fetch size for per poll when read table snapshot. Required: No Default value: '1024' Type: String

cdc.scan.incremental.close-idle-reader.enabled

Whether to close idle readers at the end of the snapshot phase. Required: No Default value: 'false' Type: String

cdc.chunk-key.even-distribution.factor.lower-bound

The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the table is evenly distributed or not. The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven. The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount. Required: No Default value: '0.05d' Type: String

cdc.chunk-key.even-distribution.factor.upper-bound

The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the table is evenly distributed or not. The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven. The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount. Required: No Default value: '1000.0d' Type: String

cdc.connect.pool.size

The connection pool size. Required: No Default value: '30' Type: String

cdc.connect.max-retries

The max retry times that the connector should retry to build database server connection. Required: No Default value: '3' Type: String

cdc.connect.timeout

The maximum time that the connector should wait after trying to connect to the Oracle database server before timing out. Required: No Default value: '30s' Type: String

cdc.scan.newly-added-table.enabled

Enables automatic discovery of new tables when resuming an Oracle CDC pipeline. When set to true, any tables that match the configured tables list patterns, but were not part of the previous run, are automatically detected. When set to false (default), the pipeline only continues reading the tables that were part of the original state and ignores any newly created matching tables. This option has an effect only when resuming a CDC pipeline from a previous run's state and does not change the behavior of a fresh initial run. Required: No Default value: 'false' Type: String

Passthrough Debezium properties

CDC source also supports passthrough Debezium properties. These properties must be specified using the cdc.debezium. prefix. For additional details on Debezium passthrough options, refer to Debezium's Oracle Connector propertiesarrow-up-right

Supported Data Types in an Oracle CDC Pipeline

NUMBER(p, s <= 0), p - s < 3

TINYINT

NUMBER(p, s <= 0), p - s < 5

SMALLINT

NUMBER(p, s <= 0), p - s < 10

INT

NUMBER(p, s <= 0), p - s < 19

BIGINT

NUMBER(p, s <= 0), 19 <= p - s <= 38

DECIMAL(p - s, 0)

NUMBER(p, s > 0)

DECIMAL(p, s)

NUMBER(p, s <= 0), p - s > 38

STRING

FLOAT BINARY_FLOAT

FLOAT

DOUBLE PRECISION BINARY_DOUBLE

DOUBLE

NUMBER(1)

BOOLEAN

DATE TIMESTAMP [(p)]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] WITH TIME ZONE

TIMESTAMP [(p)] WITH TIME ZONE

TIMESTAMP [(p)] WITH LOCAL TIME ZONE

TIMESTAMP_LTZ [(p)]

CHAR(n) NCHAR(n) NVARCHAR2(n) VARCHAR(n) VARCHAR2(n) CLOB NCLOB XMLType SYS.XMLTYPE

STRING

BLOB ROWID

BYTES

INTERVAL DAY TO SECOND INTERVAL YEAR TO MONTH

BIGINT

Attention: Oracle data types that are not listed in the table above are not supported by the DeltaStream Flink CDC Connector.

Example

Create CDC pipeline capturing changes from an Oracle table

Assume an ORACLE Data Store is defined with the name oracle_store and it has a table named PAGEVIEWS under the schema named HR in the database accessed via service FREEPDB1. Here is how the data displays in PAGEVIEWS. Each row has 3 columns — VIEWTIME, USERID, and PAGEID — and shows when a given page was visited by a specific user:

First, create the Oracle store:

Create CDC pipeline capturing changes from multiple tables

Now consider a scenario similar to the previous example, except that besides the PAGEVIEWS table there is another table called USERS under the STAFF schema within the same Oracle database in the oracle_store store.

Suppose you want to create a CDC pipeline to capture changes for multiple tables. First, define a stream as the CDC source. When capturing changes from multiple tables with different schemas, the before and after fields are defined as BYTES for flexibility:

The cdc_source stream acts as the CDC source in your pipeline. Whenever an INSERT, DELETE, or UPDATE happens on either the PAGEVIEWS or USERS table in the source database, a corresponding change event is captured and written into this stream.

Now, use the following CSAS statement to define a downstream stream as the sink and run a query to write these changes into it. Each output record includes the operation type and timestamp (op and ts_ms), along with the source schema and table name, which indicate where the changes came from.

You can use these fields to filter or route changes for each table in subsequent queries that read from cdc_sink.

Run one query per table to write the captured changes into their own dedicated Kafka topic.

You can verify the per-table streams are receiving the expected CDC events using PRINT ENTITY:

Now run a DDL per topic to define a table-specific CDC source for the table.

You could write further queries on the above source relations, again defined per table on dedicated topics, to process and write results into the sink of your choice, such as a Snowflake table.

Last updated