18 Mayıs 2023 Perşembe

RisingWave - Streaming Database

Giriş
Aslında hem Stream Processing hem de Streaming Database bileşimidir

PostgreSQL uyumlu SQL kullanır. Açıklaması şöyle
Apache Flink, as a big-data system, has faced criticism regarding its ease-of-use and cost efficiency. Observing this, some vendors are building their own stream processing systems to offer an improved experience in the cloud. RisingWave, for example, is creating a distributed SQL streaming database from scratch, aiming to provide a PostgreSQL-like stream processing experience with a cloud-native architecture.

Stream processing is poised to be the primary battleground in the realm of data streaming. The dominance of Flink as the de-facto standard remains uncertain, as emerging alternatives such as RisingWave and Materialize show potential for capturing larger market shares. Additionally, the competitive strategies employed by Flink-based vendors in their quest for superiority are of great interest to us.

CREATE SOURCE
Örnek
Şöyle yaparız
// the mapping for the driver's Kafka topic
CREATE SOURCE driver_data (
    driver_id BIGINT,
    location VARCHAR,
    speed BIGINT,
) WITH (
    connector = 'kafka',
    topic = 'driver_topic',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
) ROW FORMAT JSON;
CREATE TABLE
Örnek
Şöyle yaparız
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH (
    connector = 'kafka',
    kafka.topic = 'dbserver1.random_data.users',
    kafka.brokers = 'message_queue:29092',
    kafka.scan.startup.mode = 'earliest'
) ROW FORMAT DEBEZIUM_MONGO_JSON;

CREATE MATERIALIZED VIEW
Örnek - Join Kullanan
Şöyle yaparız.San Francisco'da en çok müşteri taşıyan sürücüleri görüntüler
CREATE MATERIALIZED VIEW most_active_drivers AS
SELECT drivers.driver_id, drivers.location, drivers.rating, 
  COUNT(rides.ride_id) as total_rides
FROM drivers
JOIN rides
ON drivers.driver_id = rides.driver_id
WHERE drivers.location = 'San Francisco'
GROUP BY drivers.driver_id, drivers.location, drivers.rating
ORDER BY total_rides DESC
Örnek
Şöyle yaparız
CREATE MATERIALIZED VIEW page_visits_mv AS
SELECT page_id,
       COUNT(*) AS total_visits,
       COUNT(DISTINCT user_id) AS unique_visitors,
       MAX(timestamp) AS last_visit_time
FROM website_visits
GROUP BY page_id;
Örnek
Şöyle yaparız
CREATE TABLE users (_id JSONB PRIMARY KEY, payload JSONB) WITH (
    connector = 'kafka',
    kafka.topic = 'dbserver1.random_data.users',
    kafka.brokers = 'message_queue:29092',
    kafka.scan.startup.mode = 'earliest'
) ROW FORMAT DEBEZIUM_MONGO_JSON;

CREATE MATERIALIZED VIEW normalized_users AS
SELECT
    payload ->> 'name' as name,
    payload ->> 'email' as email,
    payload ->> 'address' as address
FROM
    users;

SELECT * FROM normalized_users LIMIT 10;
TUMBLE
Örnek
Şöyle yaparız
CCREATE SOURCE live_stream_metrics (
    client_ip VARCHAR,
    user_agent VARCHAR,
    user_id VARCHAR,
    -- The live room.
    room_id VARCHAR,
    -- Sent bits per second.
    video_bps BIGINT,
    -- Sent frames per second. Typically 30 fps.
    video_fps BIGINT,
    -- Round-trip time (in ms). 200ms is recommended.
    video_rtt BIGINT,
    -- Lost packets per second.
    video_lost_pps BIGINT,
    -- How long was the longest freeze (in ms).
    video_longest_freeze_duration BIGINT,
    -- Total freeze duration.
    video_total_freeze_duration BIGINT,
    report_timestamp TIMESTAMPTZ,
    country VARCHAR
) WITH (
    connector = 'kafka',
    topic = 'live_stream_metrics',
    properties.bootstrap.server = 'message_queue:29092',
    scan.startup.mode = 'earliest'
) ROW FORMAT JSON;

-- A real-time dashboard of the total UV.
CREATE MATERIALIZED VIEW total_user_visit_1min AS
SELECT
    window_start AS report_ts,
    COUNT(DISTINCT user_id) AS uv
FROM
    TUMBLE(
        live_stream_metrics,
        report_timestamp,
        INTERVAL '1' MINUTE
    )
GROUP BY window_start;

SELECT * FROM total_user_visit_1min LIMIT 1;



Hiç yorum yok:

Yorum Gönder