31 Mayıs 2023 Çarşamba

LATERAL

Giriş
PostgreSQL 9.3 yani 2013 yılından beri var. Açıklaması şöyle
The LATERAL keyword appears after a FROM clause in queries with multiple table references.

JOIN
Açıklaması şöyle
... in the case of table joins, the LATERAL keyword must appear after our JOINclause. The same position holds true for all other forms of table joins.
Örnek
Şu SQL hata verir
SELECT name, email, s.ip_addresses
FROM users u
LEFT JOIN (
 SELECT user_id, string_agg(ip_address, ',') "ip_addresses"
 FROM site_visits
 WHERE country in ('China')
 and user_id = u.id
 GROUP BY user_id
) s
ON u.id = s.user_id
Hata şöyle
ERROR:  invalid reference to FROM-clause entry for table "u" LINE 9:  and user_id = u.id                        
HINT:  There is an entry for table "u", but it cannot be referenced from this part of the query. SQL state: 42P01 Character: 184
Şöyle yaparız
SELECT name, email, s.ip_addresses
FROM users u
LEFT JOIN LATERAL (
 SELECT user_id, string_agg(ip_address, ',') "ip_addresses"
 FROM site_visits
 WHERE country in ('China')
 and user_id = u.id
 GROUP BY user_id
) s
ON u.id = s.user_id
Daha Okunaklı Sorgu İçin
Örnek
Şöyle yaparız
select u.*, last_visited 
from users u,
lateral(
 select concat(date, ' ', time) "last_visited" from site_visits 
 where user_id = u.id
 order by id desc 
 limit 1
) last_visited
where u.email = 'myuryichev2@guardian.co.uk'

18 Mayıs 2023 Perşembe

RisingWave - Streaming Database

Giriş
Aslında hem Stream Processing hem de Streaming Database bileşimidir

PostgreSQL uyumlu SQL kullanır. Açıklaması şöyle
Apache Flink, as a big-data system, has faced criticism regarding its ease-of-use and cost efficiency. Observing this, some vendors are building their own stream processing systems to offer an improved experience in the cloud. RisingWave, for example, is creating a distributed SQL streaming database from scratch, aiming to provide a PostgreSQL-like stream processing experience with a cloud-native architecture.

Stream processing is poised to be the primary battleground in the realm of data streaming. The dominance of Flink as the de-facto standard remains uncertain, as emerging alternatives such as RisingWave and Materialize show potential for capturing larger market shares. Additionally, the competitive strategies employed by Flink-based vendors in their quest for superiority are of great interest to us.

CREATE SOURCE
Örnek
Şöyle yaparız
// the mapping for the driver's Kafka topic
CREATE SOURCE driver_data (
    driver_id BIGINT,
    location VARCHAR,
    speed BIGINT,
) WITH (
    connector = 'kafka',
    topic = 'driver_topic',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
) ROW FORMAT JSON;
CREATE TABLE
Örnek
Şöyle yaparız
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH (
    connector = 'kafka',
    kafka.topic = 'dbserver1.random_data.users',
    kafka.brokers = 'message_queue:29092',
    kafka.scan.startup.mode = 'earliest'
) ROW FORMAT DEBEZIUM_MONGO_JSON;

CREATE MATERIALIZED VIEW
Örnek - Join Kullanan
Şöyle yaparız.San Francisco'da en çok müşteri taşıyan sürücüleri görüntüler
CREATE MATERIALIZED VIEW most_active_drivers AS
SELECT drivers.driver_id, drivers.location, drivers.rating, 
  COUNT(rides.ride_id) as total_rides
FROM drivers
JOIN rides
ON drivers.driver_id = rides.driver_id
WHERE drivers.location = 'San Francisco'
GROUP BY drivers.driver_id, drivers.location, drivers.rating
ORDER BY total_rides DESC
Örnek
Şöyle yaparız
CREATE MATERIALIZED VIEW page_visits_mv AS
SELECT page_id,
       COUNT(*) AS total_visits,
       COUNT(DISTINCT user_id) AS unique_visitors,
       MAX(timestamp) AS last_visit_time
FROM website_visits
GROUP BY page_id;
Örnek
Şöyle yaparız
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH (
    connector = 'kafka',
    kafka.topic = 'dbserver1.random_data.users',
    kafka.brokers = 'message_queue:29092',
    kafka.scan.startup.mode = 'earliest'
) ROW FORMAT DEBEZIUM_MONGO_JSON;

CREATE MATERIALIZED VIEW normalized_users AS
SELECT
    payload ->> 'name' as name,
    payload ->> 'email' as email,
    payload ->> 'address' as address
FROM
    users;

SELECT * FROM normalized_users LIMIT 10;
TUMBLE
Örnek
Şöyle yaparız
CCREATE SOURCE live_stream_metrics (
    client_ip VARCHAR,
    user_agent VARCHAR,
    user_id VARCHAR,
    -- The live room.
    room_id VARCHAR,
    -- Sent bits per second.
    video_bps BIGINT,
    -- Sent frames per second. Typically 30 fps.
    video_fps BIGINT,
    -- Round-trip time (in ms). 200ms is recommended.
    video_rtt BIGINT,
    -- Lost packets per second.
    video_lost_pps BIGINT,
    -- How long was the longest freeze (in ms).
    video_longest_freeze_duration BIGINT,
    -- Total freeze duration.
    video_total_freeze_duration BIGINT,
    report_timestamp TIMESTAMPTZ,
    country VARCHAR
) WITH (
    connector = 'kafka',
    topic = 'live_stream_metrics',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
) ROW FORMAT JSON;

-- A real-time dashboard of the total UV.
CREATE MATERIALIZED VIEW total_user_visit_1min AS
SELECT
    window_start AS report_ts,
    COUNT(DISTINCT user_id) AS uv
FROM
    TUMBLE(
        live_stream_metrics,
        report_timestamp,
        INTERVAL '1' MINUTE
    )
GROUP BY window_start;

SELECT * FROM total_user_visit_1min LIMIT 1;



DENSE_RANK + OVER (ORDER BY ...) - Gruplar ve Her Gruba Sırayla Numara Verir

Giriş
Açıklaması şöyle
In SQL, DENSE_RANK() is a window function that assigns a unique rank to each row within a partition, with no gaps in the ranking values. When used in conjunction with the OVER (ORDER BY ...) clause, it allows you to define the order in which the rows are ranked.

Here's an explanation of each component:

DENSE_RANK(): It calculates the rank of each row within a partition. The rank value represents the position of a row relative to the other rows in the same partition. Unlike RANK(), DENSE_RANK() does not leave gaps in the ranking sequence, meaning rows with equal values are assigned the same rank, and the next rank is incremented by 1.

OVER (ORDER BY ...): This clause defines the ordering of rows within the partition for the ranking calculation. You specify one or more columns by which you want to order the rows. The rows will be ranked based on the specified order.
Kısaca  DENSE_RANK + OVER (ORDER BY ...) ile belirtilen sütuna göre sıralama yapılır. Aynı değere sahip satırlara aynı gruptadır. Her gruba 1'den başlayan bir sayı verilir.

Örnek - SQL query to fetch a second-highest salary
Elimizde şöyle bir tablo olsun, en düşük ikinci maaşa sahip çalışanı bulmak isteyelim
+------+-----------+--------+
| # id | emp_name  | salary |
+------+-----------+--------+
|    1 | anil      |   1000 |
|    2 | ankit     |   1500 |
|    3 | bhavna    |   1100 |
|    4 | christian |   1400 |
|    5 | daniel    |   1400 |
|    6 | hardik    |   1500 |
|    7 | isha      |   1100 |
|    8 | ruchi     |   1000 |
+------+-----------+--------+
Şöyle yaparız
select * from (
  select dense_rank() over (order by salary)  as sno,
  emp_name,salary
from employee ) X where sno = 2;

// To write even better, we can use the CTE feature of SQL. Below is the final query.
with temp as 
(select dense_rank() over (order by salary)  as sno,
  emp_name,salary
  from employee
)
select * from temp where sno=2;
Where kısmı olmasaydı çıktı şöyleydi
+-----+-----------+--------+
| sno | emp_name  | salary |
+-----+-----------+--------+
|   1 | anil      |   1000 |
|   1 | ruchi     |   1000 |
|   2 | bhavna    |   1100 |
|   2 | isha      |   1100 |
|   3 | christian |   1400 |
|   3 | daniel    |   1400 |
|   4 | ankit     |   1500 |
|   4 | hardik    |   1500 |
+-----+-----------+--------+
Where ile çıktı şöyle
+-----+----------+--------+
| sno | emp_name | salary |
+-----+----------+--------+
|   2 | bhavna   |   1100 |
|   2 | isha     |   1100 |
+-----+----------+--------+
Örnek - DENSE_RANK() vs RANK()
Elimizde şöyle bir sorgu olsun. 2020 yılına ait siparişler çekiliyor ve her satıra bir sayı veriliyor. Burada RANK() kullanılıyor
SELECT name, city, RANK() OVER(ORDER BY orderdate DESC) AS RANK FROM ORDERS
WHERE YEAR(orderdate) = 2020
Çıktı olarak şunu alırız. Rank sütunu sırayla artmıyor. 
name city RANK
David Atlanta 1
Mike     Irving 2
Tom         Austin 3
Mick         Dallas 3
Sam         Dallas 5
Sorguyu şöyle yaparız
SELECT name, city, DENSE_RANK() OVER(ORDER BY orderdate DESC) AS RANK FROM ORDERS
WHERE YEAR(orderdate) = 2020
Çıktı olarak şunu alırız. Bu sefer RANK sırası doğrudur
name city RANK
David Atlanta 1
Mike     Irving 2
Tom         Austin 3
Mick         Dallas 4
Sam         Dallas 5
Örnek
Soru şöyle
Find the customer with the highest daily total order cost between 2019-02-01 to 2019-05-01. If customer had more than one order on a certain day, sum the order costs on daily basis. Output customer's first name, total cost of their items, and the date.


For simplicity, you can assume that every first name in the dataset is unique.
Şöyle yaparız
SELECT first_name,
  total_order_cost,
  order_date FROM (
    SELECT first_name,
	  SUM(total_order_cost) AS 
	    total_order_cost,
	  order_date,
	  DENSE_RANK() OVER(ORDER BY SUM
	    (total_order_cost) DESC) AS
		_order_rank
	FROM customers AS c
	JOIN orders AS o
	  ON c.id = o.cust_id
	WHERE order_date BETWEEN '2019-02-01'
	  AND '2019-05-01'
	GROUP BY c.id, order_date
   ) AS orders_ranked
 WHERE order_rank = 1

16 Mayıs 2023 Salı

Jsonb ?| Array Operator - Any Strings Exist

Giriş
Açıklaması şöyle
?| - Checks if any of the strings in the text array exist as top-level keys or array elements. So generally if we have JSON property that contains an array then you can check if it contains at least of elements that you are searching by.
Aynı şeyi SQL ile şöyle yaparızjsonb_any_array_strings_exist kullanılır
CREATE OR REPLACE FUNCTION jsonb_any_array_strings_exist(jsonb, text[]) RETURNS boolean AS $$ SELECT $1 ?| $2; $$ LANGUAGE SQL; SELECT
 item0_.id as id1_0_,
 item0_.jsonb_content as jsonb_co2_0_ 
FROM
  item item0_ 
WHERE
  jsonb_any_array_strings_exist(jsonb_extract_path(item0_.jsonb_content,?), array[?,?])=true

Jsonb ?& Array Operator - All String Exists

Giriş
Açıklaması şöyle
?& - Checks if all of the strings in the text array exist as top-level keys or array elements. So generally if we have JSON property that contains an array then you can check if it contains all elements that you are searching by.
Aynı şeyi SQL ile şöyle yaparız. jsonb_all_array_strings_exist kullanılır
CREATE OR REPLACE FUNCTION jsonb_all_array_strings_exist(jsonb, text[]) RETURNS boolean AS $$ SELECT $1 ?& $2; $$ LANGUAGE SQL; SELECT
item0_.id as id1_0_, item0_.jsonb_content as jsonb_co2_0_ FROM item item0_ WHERE jsonb_all_array_strings_exist(jsonb_extract_path(item0_.jsonb_content,?), array[?,?])=true

7 Mayıs 2023 Pazar

PgPool

Giriş
Şu işleri yapıyor
1. Connection pooling
2. Replication and Load Balancing
3. Automated Failover
4. Query Caching
PgPool ve PgBouncer 
Açıklaması şöyle. Yani PgBouncer  daha basit kalıyor
A lightweight connection pooler for PostgreSQL that can help manage connections and improve performance, but lacks advanced features like replication and failover support. PGBouncer is a good option if you need a simpler solution focused primarily on connection pooling.
Bağlantı
Örnek
Şöyle yaparız
postgres://admin:xxx@pgpool.default:5432/example_db




3 Mayıs 2023 Çarşamba

Process Architecture

Giriş
Şeklen şöyle

İki tane temel process var
1. Postmaster Process
2. Backend Process
3. Auxiliary Processes
Bunlar
- Background Writer (bw)
- Checkpointer (cp)
- Startup Process (st)
- Logger (lg)
- Autovacuum Launcher (avl)
- WAL writer (ww)
- WAL receiver (wr)
Other Processes
- Autovaccume workers
- WAL senders

Backend Process
Açıklaması şöyle
The postmaster process creates a new “backend” process for every connection it accepts. The connection is then handed over to the new backend process to perform the reading of the TCP stream, request parsing, SQL query parsing (yes those are different), planning, execution and returning the results. The process uses its local virtual memory for sorting and parsing logic, this memory is controlled by the work_mem parameter.

Connection Pooling
Açıklaması şöyle
.. the number of backend processes is capped by the number of connections, the max_connections parameter defaults to 100

Açıklaması şöyle
For each client on the server, its own servicing process is spawned. A large number of connections can cause problems:
  • Each process requires memory to store the cache of the system catalog, prepared queries, intermediate query results, and other data. The more connections that are open, the more memory needs to be available.
  • If connections are performed frequently and the sessions are short (i.e., the client executes one small query and disconnects), a prohibitive amount of resources will be spent on establishing connections, spawning new processes, and unnecessary filling of local caches.
  • The more processes that are running, the more time is required to scan their list, and this operation is performed very frequently. As a result, performance can decrease as the number of clients increases.
In such cases, connection pooling is used to limit the number of servicing processes. PostgreSQL does not have a built-in connection pool, so third-party solutions have to be used: pool managers built into the application server or external programs (such as PgBouncer or Odyssey).