2 Aralık 2022 Cuma

Debezium Connector

Giriş
PostgreSQL'deki CDC kayıtlarını okur ve Kafka'daki bir topic'e yazılır.

1. Connector'a bir isim verilir
2. connector.class her zaman io.debezium.connector.postgresql.PostgresConnector olarak belirtilir. Bu connector veri tabanından okumak içindir

3. Veri tabanı bağlantısı bilgisi tanımlanır. Bu alanlar şöyle
database.hostname
database.port
database.user
database.password
database.dbname

3. database.server.name ile Kafka'da yaratılacak topic ismi için ön ek tanımlanır. 
4. table.include.list ile okunacak tablolar belirtilir veya schema.whitelist ile schema belirtilir.
5. snapshot.mode değeri always yapılabilir.

6. Eğer veri tabanı değişikliğini yani WAL çıktısını JSON yapmak istersek şöyle yaparız
"plugin.name": "wal2json",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"

key.converter ve value.converter Alanları

Örnek
Elimizde şöyle bir JSON olsun
{
    "name": "warehouse-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "plugin.name": "pgoutput",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "docker",
        "database.password": "docker",
        "database.dbname": "profile_service_dev",
        "database.server.name": "postgres",
        "snapshot.mode": "always",
        "table.include.list": "public.warehouse"
    }
}
Şöyle yaparız
curl -i -X POST -H "Accept:application/json" \
  -H "Content-Type:application/json" \
  127.0.0.1:8083/connectors/ \
  --data "@connector.json"
 Veri tabanındaki değişiklikler "posgres.public.warehouse" isimli Kafka topic'te görülür. Şeklen şöyle


- connect_configsconnect_offsets topicleri debezium'un kendi topicleri. Bunlar alt çizgi kullanıyor çünkü config ve offset topic'leri debezium konfigürasyonunda belirtildi. 
- connect-status yine debezium topic ancak kendisi yarattı
Eğer warehouse tablosunun mesajlarına bakarsak şeklen şöyle


Örnek
Şöyle yaparız. Debezium 8083'e gönderiyoruz.
curl -X POST  http://localhost:8083/connectors/ \
  -H 'content-type: application/json' \
  -d '{
   "name": "student-outbox-connector",
   "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "tasks.max": "1",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "postgres",
      "database.password": "postgres",
      "database.dbname": "user_DB",
      "database.server.name": "pg-outbox-server",
      "tombstones.on.delete": "false",
      "table.whitelist": "public.outbox",
      "transforms": "outbox",
      "transforms.outbox.type": "com.eresh.outbox.OutboxTransformer"
   }
}'

Örnek
Şöyle yaparız. retail.orders_info tablosundaki CDC kayıtlarını myserver.retail.orders_info isimli topic'e yazar. Topic ismi Debezium tarafından serverName.schemaName.tableName şeklinde yaratılır
{
    "name": "pg-orders-source",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname": "postgres",
        "database.server.name": "myserver",
        "plugin.name": "wal2json",
        "table.include.list": "retail.orders_info",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

Örnek
Şöyle yaparız. Veri tabanındaki değişiklikler "postgres.public.shipments" isimli Kafka topic'te görülür
curl -H 'Content-Type: application/json' debezium:8083/connectors --data '
{
  "name": "shipments-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "plugin.name": "pgoutput",
    "database.hostname": "postgres", 
    "database.port": "5432", 
    "database.user": "postgresuser", 
    "database.password": "postgrespw", 
    "database.dbname" : "shipment_db", 
    "database.server.name": "postgres", 
    "table.include.list": "public.shipments" 
  }
}'
Örnek
Şöyle yaparız
{
  "name": "fulfillment-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "database.hostname": "192.168.99.100", 
    "database.port": "5432", 
    "database.user": "postgres", 
    "database.password": "postgres", 
    "database.dbname" : "postgres", 
    "database.server.name": "fulfillment", 
    "table.include.list": "public.inventory" 
  }
}
column.include.list Alanı
Örnek
Şöyle yaparız. debezium.event_store.mt_events isimli topic'e yazar
{
    "name": "postgres-debezium-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "1234",
        "database.dbname": "MartenDB",
        "table.include.list": "event_store.mt_events",
        "column.include.list": "event_store.mt_events.data",
        "plugin.name": "pgoutput",
        "topic.prefix": "debezium",
        "slot.name": "debezium_replication_slot"
    }
}

transforms Alanı
Örnek
Şöyle yaparız
"table.include.list": "debezium_postgres_demo.outbox",


// It routes the events, to the topic based on the value written to the destination field 
// of the Outbox table:
// Debezium defaults the topic name to outbox.event followed by the value in 
// the route.by.field above, or the aggregatetype column if that is not set. 
// In this case as the item is written to the database with a destination value of item, 
// the topic name resolves to outbox.event.item.

"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "destination",

// Events are transformed to use the value of the Outbox id field as the event key, 
// and to use the value of the payload field as the event payload
"transforms.outbox.table.field.event.key": "id",
"transforms.outbox.table.field.event.payload": "payload",



Hiç yorum yok:

Yorum Gönder