5 Haziran 2023 Pazartesi

Debezium Connector key.converter ve value.converter Alanları

Örnek - JsonConverter
Şöyle yaparız. Burada key ve value alanları Json olarak Kafka'ya yazılıyor
$ echo '
{
    "name": "arctype-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "plugin.name": "wal2json",
        "database.hostname": "db",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "arctype",
        "database.dbname": "postgres",
        "database.server.name": "ARCTYPE",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",
        "snapshot.mode": "always"
    }
}
' > debezium.json

$ curl -i -X POST \
         -H "Accept:application/json" \
         -H "Content-Type:application/json" \
         127.0.0.1:8083/connectors/ \ccySQL Connector
Örnek - JsonConverter
Şöyle yaparız. Burada key için StringConverter, value için AvroConverter kullanılıyor ve Kafka'ya yazılıyor
{
  "name": "postgres-source",
  "config": {"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max":"1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "students",
    "database.server.name": "dbserver1",
    "database.whitelist": "students",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.students",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "true",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
  }
}
transforms.unwrap.type için açıklama şöyle
The value of transforms.unwrap.type can be set to various predefined transformation types depending on the desired behavior. Some common values include:

- org.apache.kafka.connect.transforms.UnwrapFromEnvelope: This transformation is used to extract the payload from an envelope structure. It is often used when messages are wrapped in an envelope format, such as Apache Avro's envelope or Confluent's schema registry envelope.

- org.apache.kafka.connect.transforms.ExtractField$Key: This transformation extracts a specific field from the key of the message. It is useful when you want to perform further processing based on a specific key field.

- org.apache.kafka.connect.transforms.ExtractField$Value: This transformation extracts a specific field from the value of the message. It is useful when you want to extract a specific value field for further processing or downstream operations.

Please note that these are just a few examples, and the available transformation types may vary depending on the version of Kafka Connect and the connectors you are using.

When using transforms.unwrap.type, you will typically need to configure additional properties specific to the chosen transformation type, such as transforms.unwrap.field to specify the field to be extracted or manipulated.

Hiç yorum yok:

Yorum Gönder