2 Aralık 2022 Cuma

Debezium Connector

Giriş
PostgreSQL'deki CDC kayıtlarını okur ve Kafka'daki bir topic'e yazılır.

1. Connector'a bir isim verilir
2. connector.class her zaman io.debezium.connector.postgresql.PostgresConnector olarak belirtilir. Bu connector veri tabanından okumak içindir

3. Veri tabanı bağlantısı bilgisi tanımlanır. Bu alanlar şöyle
database.hostname
database.port
database.user
database.password
database.dbname

3. database.server.name ile Kafka'da yaratılacak topic ismi için ön ek tanımlanır. 
4. table.include.list ile okunacak tablolar belirtilir veya schema.whitelist ile schema belirtilir.
5. snapshot.mode değeri always yapılabilir.

6. Eğer veri tabanı değişikliğini yani WAL çıktısını JSON yapmak istersek şöyle yaparız
"plugin.name": "wal2json",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"

key.converter ve value.converter Alanları

Örnek
Elimizde şöyle bir JSON olsun
{
    "name": "warehouse-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "plugin.name": "pgoutput",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "docker",
        "database.password": "docker",
        "database.dbname": "profile_service_dev",
        "database.server.name": "postgres",
        "snapshot.mode": "always",
        "table.include.list": "public.warehouse"
    }
}
Şöyle yaparız
curl -i -X POST -H "Accept:application/json" \
  -H "Content-Type:application/json" \
  127.0.0.1:8083/connectors/ \
  --data "@connector.json"
 Veri tabanındaki değişiklikler "posgres.public.warehouse" isimli Kafka topic'te görülür. Şeklen şöyle


- connect_configsconnect_offsets topicleri debezium'un kendi topicleri. Bunlar alt çizgi kullanıyor çünkü config ve offset topic'leri debezium konfigürasyonunda belirtildi. 
- connect-status yine debezium topic ancak kendisi yarattı
Eğer warehouse tablosunun mesajlarına bakarsak şeklen şöyle


Örnek
Şöyle yaparız. Debezium 8083'e gönderiyoruz.
curl -X POST  http://localhost:8083/connectors/ \
  -H 'content-type: application/json' \
  -d '{
   "name": "student-outbox-connector",
   "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "tasks.max": "1",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "postgres",
      "database.password": "postgres",
      "database.dbname": "user_DB",
      "database.server.name": "pg-outbox-server",
      "tombstones.on.delete": "false",
      "table.whitelist": "public.outbox",
      "transforms": "outbox",
      "transforms.outbox.type": "com.eresh.outbox.OutboxTransformer"
   }
}'

Örnek
Şöyle yaparız. retail.orders_info tablosundaki CDC kayıtlarını myserver.retail.orders_info isimli topic'e yazar. Topic ismi Debezium tarafından serverName.schemaName.tableName şeklinde yaratılır
{
    "name": "pg-orders-source",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname": "postgres",
        "database.server.name": "myserver",
        "plugin.name": "wal2json",
        "table.include.list": "retail.orders_info",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

Örnek
Şöyle yaparız. Veri tabanındaki değişiklikler "postgres.public.shipments" isimli Kafka topic'te görülür
curl -H 'Content-Type: application/json' debezium:8083/connectors --data '
{
  "name": "shipments-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "plugin.name": "pgoutput",
    "database.hostname": "postgres", 
    "database.port": "5432", 
    "database.user": "postgresuser", 
    "database.password": "postgrespw", 
    "database.dbname" : "shipment_db", 
    "database.server.name": "postgres", 
    "table.include.list": "public.shipments" 
  }
}'
Örnek
Şöyle yaparız
{
  "name": "fulfillment-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "database.hostname": "192.168.99.100", 
    "database.port": "5432", 
    "database.user": "postgres", 
    "database.password": "postgres", 
    "database.dbname" : "postgres", 
    "database.server.name": "fulfillment", 
    "table.include.list": "public.inventory" 
  }
}
column.include.list Alanı
Örnek
Şöyle yaparız. debezium.event_store.mt_events isimli topic'e yazar
{
    "name": "postgres-debezium-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "localhost",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "1234",
        "database.dbname": "MartenDB",
        "table.include.list": "event_store.mt_events",
        "column.include.list": "event_store.mt_events.data",
        "plugin.name": "pgoutput",
        "topic.prefix": "debezium",
        "slot.name": "debezium_replication_slot"
    }
}

transforms Alanı
Örnek
Şöyle yaparız
"table.include.list": "debezium_postgres_demo.outbox",


// It routes the events, to the topic based on the value written to the destination field 
// of the Outbox table:
// Debezium defaults the topic name to outbox.event followed by the value in 
// the route.by.field above, or the aggregatetype column if that is not set. 
// In this case as the item is written to the database with a destination value of item, 
// the topic name resolves to outbox.event.item.

"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "destination",

// Events are transformed to use the value of the Outbox id field as the event key, 
// and to use the value of the payload field as the event payload
"transforms.outbox.table.field.event.key": "id",
"transforms.outbox.table.field.event.payload": "payload",



1 Aralık 2022 Perşembe

MERGE - Postgres 15 İle Geliyor

Giriş
Açıklaması şöyle
The MERGE command is one of the newest additions to the Postgres DB with the v15 release. It has been a standard feature on other SQL variants like SQL Server before. Take a look at the RFC on graphql-engine to see how the various similar operations are supported through the Hasura GraphQL API on SQL Server.

Merge statements are a combination of INSERT, UPDATE and DELETE.

Merge statement allows bringing in data from a source table into a target table.
Kaynak tabloyu, belirtilen tablo ile birleştirir. MERGE ve INSERT ... ON CONFLICT yani UPSERT farklı şeyler. Açıklaması şöyle
Note: MERGE is often (incorrectly) used interchangeably with the term UPSERT.
Açıklaması şöyle
What about MERGE?

SQL-standard MERGE actually has poorly defined concurrency semantics and is not suitable for upserting without locking a table first.

It's a really useful OLAP statement for data merging, but it's not actually a useful solution for concurrency-safe upsert. There's lots of advice to people using other DBMSes to use MERGE for upserts, but it's actually wrong.

Örnek
Şöyle yaparız
MERGE INTO customer_history c
  USING daily_orders d
  ON (c.customer_id = d.customer_id)

  WHEN MATCHED THEN
    UPDATE SET     -- Existing customer, update the order count and the timestamp of order.
      order_count = c.order_count + 1,
      last_order_id = d.order_id

  WHEN NOT MATCHED THEN       -- New entry, record it.
    INSERT (customer_id, last_order_id, order_center, order_count, last_order)
      VALUES (customer_id, d.order_id, d.order_center, 1, d.order_time);
Örnek
Şöyle yaparız
MERGE INNTO wines w
USING wine_stock_changes s
ON s.winename = w.winename
 WHEN NOT MATCHED AND s.stock_delta > 0 THEN
   INSERT VALUES(s.winename, s.stock_delta)
 WHEN MATCHED AND w.stock + s.stock_delta > 0 THEN
   UPDATE SET stock = w.stock + s.stock_delta
 WHEN MATCHED THEN
  DELETE;






20 Kasım 2022 Pazar

EVERY - Aggregate Metodu

Örnek
Şöyle yaparız
SELECT author_id, EVERY( title LIKE '%a')
FROM book
GROUP BY author_id


author_id every
--- ---
1 false
2 true


Analytic Functions / Window Functions - ROWS BETWEEN

Örnek
Soru şöyle
For example, if we want to sum up the revenue from all previous years till this year, we can use this window function!
Şöyle yaparız
SELECT year, SUM (revenue) 
OVER ( 
  ROWS BETWEEN 
    UNBOUNDED PRECEEDING 
    AND 
    CURRENT ROW
) AS running_sum
FROM revenue_table

8 Kasım 2022 Salı

Debezium Kullanımı İçin Hazırlık

Giriş

1. WAL Seviyesi
var/lib/postgresql/data/postgresql.conf dosyasındaki wal_level alanını değeri logical yapılır

2. Grup/Kullanıcı ve Publication + Slot Yaratma
Açıklaması şöyle
1. To stream changes your user needs the Replication attribute. Without it, you'll hit permission errors. 
2. After updating the parameters and granting the right permissions, the next steps are to create a publication, set up a replication slot, and start streaming changes.
Açıklaması şöyle
A publication defines which tables' changes you're publishing. 
Açıklaması şöyle
 A replication slot ensures changes stick around in the WAL until your consumer reads them.
Örnek
Şöyle yaparız. Burada önce Replication ö zelliğine sahip replication_role grubu yaratılıyor. Daha sonra bu rol bir kullanıcıya atanıyor 
CREATE ROLE replication_role WITH REPLICATION LOGIN;
CREATE USER replicator WITH PASSWORD 'your-secure-password';
GRANT replication_role TO replicator;
-- or: ALTER USER replicator REPLICATION;
Şöyle yaparızpg_recvlogical komutu hem publication hem de slot yaratabilir
# Let's create s publication for users table
CREATE PUBLICATION users_pub FOR TABLE users;

# Let's create a slot
pg_recvlogical -h $SERVER_NAME -U replicator -d postgres --slot users_slot 
--create-slot -P wal2json

# start streaming changes and see what they look like:
pg_recvlogical -h $SERVER_NAME -U replicator -d postgres --slot users_slot 
--start -o pretty-print=1 -f -
Örnek
Şöyle yaparız. Burada önce Replication özelliğine sahip cdcuser isimli bir kullanıcı yaratılıyor. Daha sonra replication_group isimli bir grup yaratılıyor. Daha sonra replication_group grubuna yetkiler grant ediliyor
CREATE USER cdcuser WITH PASSWORD 'cdcpassword' REPLICATION LOGIN;

CREATE ROLE replication_group WITH USER foouser, cdcuser;
GRANT CREATE ON DATABASE quant_core TO replication_group;
GRANT USAGE ON SCHEMA foo TO replication_group;
GRANT CREATE ON SCHEMA foo TO replication_group;

CREATE TABLE foo.cdc_heartbeat (
    heartbeat bit
);
ALTER TABLE foo.cdc_heartbeat OWNER TO cdcuser;



20 Ekim 2022 Perşembe

SQL Komutları

Giriş
SQL komutları şöyle gruplanabilir
1. DDL – Data Definition Language
2. DQL – Data Query Language
3. DML – Data Manipulation Language
4. DCL – Data Control Language

Şeklen şöyle



INFORMATION_SCHEMA.COLUMNS Sistem Tablosu

Giriş
Açıklaması şöyle
It's a part of the SQL-92 standard, and it's implemented by most major database engines (with the notable exception of Oracle).
data_type Sütunu
Örnek
Eğer OID tipindeki sütunları görmek istersek şöyle yaparız
SELECT * FROM information_schema.columns WHERE data_type = 'oid';
table_name Sütunu
Örnek
Şöyle yaparız
SELECT column_name, column_external_name, ordinal_position, is_nullable, data_type
FROM information_schema.columns WHERE table_name='...'