Giriş
Şu iki satırı yazmak gerekir.
1. transforms": "unwrap"2. "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
Böylece çıktıda before alanı hep null gelir. Yani şöyledir
{"source": { ... }, "before": null, "after": { "id": 1, "name": "John Doe", "age": 30 }, "op": "c", "ts_ms": 1654316585000 }
Açıklaması şöyle
By default, Debezium sends all events in an envelope that includes many pieces of information about the change captured. I’m only interested in reading the changed value here, so the command tells Kafka Connect to keep this information and discard the rest.
Örnek
Şöyle yaparız
"name": "source-productcategory-connector","config": {"connector.class": "io.debezium.connector.postgresql.PostgresConnector","database.hostname": "your-host-ip","database.port": "5432","database.user": "user","database.password": "password","database.dbname": "AdventureWorks","plugin.name": "pgoutput","database.server.name": "source","key.converter.schemas.enable": "false","value.converter.schemas.enable": "false","transforms": "unwrap","transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState","value.converter": "org.apache.kafka.connect.json.JsonConverter","key.converter": "org.apache.kafka.connect.json.JsonConverter","table.include.list": "public.factinternetsales_streaming","slot.name" : "dbz_sales_transaction_slot"}
Örnek - add.fields
Şöyle yaparız
{ "name": "postgres-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "localhost", "database.port": "5432", "database.user": "postgres", "topic.prefix": "postgres", "database.password": "postgres", "database.dbname" : "postgres", "database.server.name": "postgres", "slot.name": "debezium", "plugin.name": "pgoutput", "table.include.list": "public.dbz_test", "transforms" : "unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.add.fields":"op,table,lsn,source.ts_ms,db", "transforms.unwrap.drop.tombstones":"true", "transforms.unwrap.delete.handling.mode":"rewrite", "drop.tombstones": "true" }
Hiç yorum yok:
Yorum Gönder