22 Haziran 2023 Perşembe

Debezium Connector transforms - route

Giriş
Değişen tablo ismine göre çıktının yönlendirileceği topic ismini belirtir

Örnek
Şöyle yaparız
{
  "name": "pg_user_data-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.server.id": "184055",
    "database.server.name": "dbserver2",
    "database.include": "user_data",
    "database.dbname": "user_data",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.user_data",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"
  }
}
public.customers.orders tablosunu orders isimli topic'e yazar

Örnek
Şöyle yaparız
"config": {
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "database.hostname": "localhost",
  "database.port": "5432",
  "database.user": "sample_user",
  "database.password": "sample_pass",
  "database.dbname": "sample_db",
  "database.server.name": "sample_servername",
  "table.include.list": "sample_schema.sample_table",
  "topic.prefix": "sample.topic.prefix",
  "plugin.name": "pgoutput",
  "slot.name": "debezium_slot",
  "schema.include.list": "sample_schema",
  "transforms": "unwrap,reroute_topic",
  
  "transforms.reroute_topic.type": "io.debezium.transforms.ByLogicalTableRouter",
  "transforms.reroute_topic.key.enforce.uniqueness": "false",
  "transforms.reroute_topic.topic.regex": "sample_reroute_source_topic",
  "transforms.reroute_topic.topic.replacement": "sample_reroute_target_topic",
  
  "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  "transforms.unwrap.drop.tombstones": "false",
  
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter.schemas.enable": "false",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false",
  "snapshot.mode": "initial",
  "decimal.format": "NUMERIC",
  "json.output.decimal.format": "NUMERIC",
  "decimal.handling.mode": "string"
}
Burada sample_schema.sample_table tablosu takip ediliyor ve değişiklikler sample_reroute_target_topic isimli topic'e yazılıyor



Hiç yorum yok:

Yorum Gönder