The YugabyteDB gRPC Connector comes bundled with Single Message Transformers (SMTs). SMTs are applied to messages as they flow through Kafka Connect so that sinks understand the format in which data is sent. SMTs transform inbound messages after a source connector has produced them, but before they are written to Kafka. SMTs transform outbound messages before they are sent to a sink connector.

The following SMTs are bundled with the connector jar file available on GitHub releases:

  • YBExtractNewRecordState
  • PGCompatible

Example

For simplicity, only before and after fields of the payload of the message published by the connector are mentioned in the following examples. Any information pertaining to the record schema, if it is the same as the standard Debezium connector for PostgreSQL, is skipped.

Consider a table created using the following statement:

CREATE TABLE test (id INT PRIMARY KEY, name TEXT, aura INT);

The following DML statements will be used to demonstrate payload in case of individual replica identities:

-- statement 1
INSERT INTO test VALUES (1, 'Vaibhav', 9876);

-- statement 2
UPDATE test SET aura = 9999 WHERE id = 1;

-- statement 3
UPDATE test SET name = 'Vaibhav Kushwaha', aura = 10 WHERE id = 1;

-- statement 4
UPDATE test SET aura = NULL WHERE id = 1;

-- statement 5
DELETE FROM test WHERE id = 1;

PGCompatible

Transformer class: io.debezium.connector.yugabytedb.transforms.PGCompatible

By default, the YugabyteDB CDC service publishes events with a schema that only includes columns that have been modified. The source connector then sends the value as null for columns that are missing in the payload. Each column payload includes a set field that is used to signal if a column has been set to null because it wasn't present in the payload from YugabyteDB.

However, some sink connectors may not understand the preceding format. PGCompatible transforms the payload to a format that is compatible with the format of the standard change data events. Specifically, it transforms column schema and value to remove the set field and collapse the payload such that it only contains the data type schema and value.

PGCompatible differs from YBExtractNewRecordState by recursively modifying all the fields in a payload.

The following examples show what the payload would look like for each before image mode.

CHANGE

-- statement 1
"before":null,"after":{"id":1,"name":"Vaibhav","aura":9876}

-- statement 2
"before":null,"after":{"id":1,"name":null,"aura":9999}

-- statement 3
"before":null,"after":{"id":1,"name":"Vaibhav Kushwaha","aura":10}

-- statement 4
"before":null,"after":{"id":1,"name":null,"aura":null}

-- statement 5
"before":{"id":1,"name":null,"aura":null},"after":null

Note that for statement 2 and 4, the columns that were not updated as a part of the UPDATE statement are null in the output field.

FULL_ROW_NEW_IMAGE

-- statement 1
"before":null,"after":{"id":1,"name":"Vaibhav","aura":9876}

-- statement 2
"before":null,"after":{"id":1,"name":"Vaibhav","aura":9999}

-- statement 3
"before":null,"after":{"id":1,"name":"Vaibhav Kushwaha","aura":10}

-- statement 4
"before":null,"after":{"id":1,"name":"Vaibhav Kushwaha","aura":null}

-- statement 5
"before":{"id":1,"name":"Vaibhav Kushwaha","aura":null},"after":null

MODIFIED_COLUMNS_OLD_AND_NEW_IMAGES

-- statement 1
"before":null,"after":{"id":1,"name":"Vaibhav","aura":9876}

-- statement 2
"before":{"id":1,"name":null,"aura":9876},"after":{"id":1,"name":null,"aura":9999}

-- statement 3
"before":{"id":1,"name":"Vaibhav","aura":9999},"after":{"id":1,"name":"Vaibhav Kushwaha","aura":10}

-- statement 4
"before":{"id":1,"name":null,"aura":10},"after":{"id":1,"name":null,"aura":null}

-- statement 5
"before":{"id":1,"name":null,"aura":null},"after":null

ALL

-- statement 1
"before":null,"after":{"id":1,"name":"Vaibhav","aura":9876}

-- statement 2
"before":{"id":1,"name":"Vaibhav","aura":9876},"after":{"id":1,"name":"Vaibhav","aura":9999}

-- statement 3
"before":{"id":1,"name":"Vaibhav","aura":9999},"after":{"id":1,"name":"Vaibhav Kushwaha","aura":10}

-- statement 4
"before":{"id":1,"name":"Vaibhav Kushwaha","aura":10},"after":{"id":1,"name":"Vaibhav Kushwaha","aura":null}

-- statement 5
"before":{"id":1,"name":"Vaibhav Kushwaha","aura":null},"after":null