7 Haziran 2023 Çarşamba

Debezium Connector transforms - unwrap ExtractNewRecordState - Sadece Değişen Alan Gelir

Giriş
Şu iki satırı yazmak gerekir.
1. transforms": "unwrap"
2. "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
Böylece çıktıda  before alanı hep null gelir. Yani şöyledir
{
"source": { ... }, "before": null, "after": { "id": 1, "name": "John Doe", "age": 30 }, "op": "c", "ts_ms": 1654316585000 }
Açıklaması şöyle
By default, Debezium sends all events in an envelope that includes many pieces of information about the change captured. I’m only interested in reading the changed value here, so the command tells Kafka Connect to keep this information and discard the rest.

Örnek
Şöyle yaparız
"name": "source-productcategory-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "your-host-ip",
        "database.port": "5432",
        "database.user": "user",
        "database.password": "password",
        "database.dbname": "AdventureWorks",
        "plugin.name": "pgoutput",
        "database.server.name": "source",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "table.include.list": "public.factinternetsales_streaming",
        "slot.name" : "dbz_sales_transaction_slot"
    }
Örnek - add.fields
Şöyle yaparız
{
  "name": "postgres-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "database.hostname": "localhost", 
    "database.port": "5432", 
    "database.user": "postgres",
    "topic.prefix": "postgres", 
    "database.password": "postgres", 
    "database.dbname" : "postgres", 
    "database.server.name": "postgres",
    "slot.name": "debezium",
    "plugin.name": "pgoutput",
    "table.include.list": "public.dbz_test",
    "transforms" : "unwrap",
    "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.add.fields":"op,table,lsn,source.ts_ms,db",
    "transforms.unwrap.drop.tombstones":"true",
    "transforms.unwrap.delete.handling.mode":"rewrite",
    "drop.tombstones": "true"
  }


Hiç yorum yok:

Yorum Gönder