Giriş
Şu iki satırı yazmak gerekir.
1. transforms": "unwrap"
2. "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
Böylece çıktıda before alanı hep null gelir. Yani şöyledir
{
"source": {
...
},
"before": null,
"after": {
"id": 1,
"name": "John Doe",
"age": 30
},
"op": "c",
"ts_ms": 1654316585000
}
By default, Debezium sends all events in an envelope that includes many pieces of information about the change captured. I’m only interested in reading the changed value here, so the command tells Kafka Connect to keep this information and discard the rest.
Örnek"name": "source-productcategory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "your-host-ip",
"database.port": "5432",
"database.user": "user",
"database.password": "password",
"database.dbname": "AdventureWorks",
"plugin.name": "pgoutput",
"database.server.name": "source",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"table.include.list": "public.factinternetsales_streaming",
"slot.name" : "dbz_sales_transaction_slot"
}
Örnek - add.fields
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "localhost",
"database.port": "5432",
"database.user": "postgres",
"topic.prefix": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "postgres",
"slot.name": "debezium",
"plugin.name": "pgoutput",
"table.include.list": "public.dbz_test",
"transforms" : "unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.fields":"op,table,lsn,source.ts_ms,db",
"transforms.unwrap.drop.tombstones":"true",
"transforms.unwrap.delete.handling.mode":"rewrite",
"drop.tombstones": "true"
}