Önce DB'ye mi Önce Kafka'ya mı: Dual-Write Sorunu ve Outbox Pattern'i

Bir kullanıcı bir yorum yazdı ve sistemimizin bu yorumun sentiment'ini analiz edip sonucu iki ayrı yere yazması gerekiyor. Bunlardan biri bir REST API üzerinden sorgular yapılabilmesi için PostgreSQL, diğeriyse pipeline'daki diğer analytic, dashboard gibi servislerin dinleyebilmesi için bir Kafka topic'i. Yani elimizde bir mesaj ve onu yazmamız gereken iki yer var. İlk akla gelen önce DB'ye yazmak sonra da Kafka'ya publish etmek ya da tam tersi. Önce hangisini yaparsak yapalım kulağa oldukça basit ve işe yarar gibi geliyor.

Yaramıyor. Bu yazıda iki sistem arasındaki "iki yazma işlemini sırasıyla yap" gibi bir kararın production'da nasıl veri tutarsızlığına dönüşebileceği ve outbox pattern ile nasıl çözüldüğünü anlatacağım.

Önce ilk akla gelen sıralamayı deneyelim. DB'yi güncelle sonra Kafka'ya yaz. Yorumu kalıcı olarak kayda geçirdikten sonra pipeline'a duyur.

func (w *Worker) Process(ctx context.Context, c Comment) error {
    sentiment, err := w.sentimentClient.Analyze(ctx, c.Text)
    if err != nil {
        return err
    }
    if err := w.repo.MarkProcessed(ctx, c.ID, sentiment); err != nil {
        return err
    }
    return w.kafka.Publish(ctx, processedTopic, c.ID, buildEvent(c, sentiment))
}

Düşük yük altında, kimse crash etmiyorken bu kod işini güzelce yapıyor. Sorun bu iki çağrının arasındaki o küçücük zaman diliminde meydana geliyor.

Diyelim ki MarkProcessed methodu başarıyla çalıştı. DB'de artık comment "processed" ve sentiment kolonu dolu. Ardından kafka.Publish methodu çalışıyor ama tam o anda process crash ediyor. Worker tekrar ayağa kalktığında ne yapacak? MarkProcessed zaten yapıldığı için yorum zaten DB'de "processed" durumunda. Dolayısıyla sonraki seferlerde servis idempotent davranıp bu yorumu atlayacak ve Kafka'ya bu yorumun işlendiği duyurusu hiçbir zaman gitmemiş olacak. DB ve Kafka'yı dinleyen pipeline artık birbirini yalanlıyor.

İlk akla gelen çözüm sıralamayı tersine çevirmek. Kafka'ya yaz sonra DB'yi güncelle.

func (w *Worker) Process(ctx context.Context, c Comment) error {
    sentiment, err := w.sentimentClient.Analyze(ctx, c.Text)
    if err != nil {
        return err
    }
    if err := w.kafka.Publish(ctx, processedTopic, c.ID, buildEvent(c, sentiment)); err != nil {
        return err
    }
    return w.repo.MarkProcessed(ctx, c.ID, sentiment)
}

Şimdi de tam tersine kafka.Publish başarıyla çalıştı, broker event'i kabul etti, pipeline'daki consumer'lar bu yorumun işlendiğini görmek üzere sıraya girdiler. Sonra MarkProcessed çalışmaya başladı ama fail etti. DB connection pool'u tıkandı, retry budget tükendi, ne olduysa kalıcı bir hata oluştu. Kafka'ya bu yorumun işlendiğine dair duyuru çoktan gitti. Ama API'da yorum hâlâ "pending" görünüyor. Tutarsızlık çözülmedi, sadece yönü değişti.
Birinci sıralamada DB ileride, Kafka geride kalıyordu. İkinci sıralamadaysa Kafka ileride, DB geride kalmış oldu. Hangi yönden bakarsak bakalım, bu stratejide iki sistemin senkronize kalması pamuk ipliğine bağlı ve herhangi bir failure noktasında kırılıyor. Çünkü sorun sıralamada değil, iki sistemin aynı anda ve atomik olarak güncellenememesinde.

İlk akla gelen bu iki yazma işlemini bir transaction'a koymak olacaktır. PostgreSQL transaction'ını başlatıp içinde hem comment'i update edelim hem de Kafka'ya publish edelim. İkisi de başarılı olursa commit, biri fail ederse rollback. Olur mu?

Olmaz. PostgreSQL transaction'ı Kafka'yı rollback edemez. Kafka tarafındaki transaction desteği de birden fazla event yazma işleminin atomikliğini sağlamaya yarar. Sonuçta ortak bir transaction kuramazlar. Bunu yapmak için Two-phase commit gibi bir distributed transaction protokolü kullanabiliriz ama bu da pratikte bazı istenmeyen sonuçlara yol açar. Two-phase commit'in koordinatörü, yani süreci başlatıp yöneten servis, göçerse tüm sistemi etkileyecek tek bir hata noktası yaratmış olur. Fazladan locking maliyeti yaratır. Yani iki sistem arasında bir atomik transaction olmayacak, bunu kabul edip devam etmemiz gerekiyor.

Atomicity yoksa bir pattern de mi yok? Aslında var. İki sistem arasında atomiklik kuramıyorsak atomicity'yi tek bir sisteme indirgeriz. PostgreSQL'de zaten elimizde mevcut. Kafka'ya gönderilecek event'i de aynı PostgreSQL transaction'ında ayrı bir tabloya yazarız. Sonra ayrı bir process bu tabloyu okuyup Kafka'ya gönderme işini üstlenir.

func (r *Repository) MarkProcessed(ctx context.Context, c Comment, sentiment string) error {
    return r.db.WithTx(ctx, func(tx pgx.Tx) error {
        if _, err := tx.Exec(ctx, `
            UPDATE comments
            SET sentiment = $2, status = 'processed', processed_at = now()
            WHERE comment_id = $1
        `, c.ID, sentiment); err != nil {
            return err
        }
        _, err := tx.Exec(ctx, `
            INSERT INTO outbox_processed (comment_id, payload_json, topic)
            VALUES ($1, $2, $3)
        `, c.ID, buildPayload(c, sentiment), processedTopic)
        return err
    })
}

Bu tablo genelde posta kutusu metaforuyla outbox olarak isimlendiriliyor. Bir mektup yazıp posta kutusuna attığında onunla işin biter. Sonrasında bir postacının gelip o mektubu alıp dağıtmasıyla sen ilgilenmezsin. Bizde de worker outbox'a yazdığı anda aslında işi bitiyor. Kafka'ya yazma sorumluluğunu başka bir servise devrediyoruz.

Bu başka servis outbox publisher dediğimiz ayrı bir process. Tek işi var o da outbox tablosunu okumak, henüz publish edilmemiş row'ları Kafka'ya yazmak, başarılı olanları publish edilmiş olarak işaretlemek.

func (p *Publisher) Run(ctx context.Context) error {
    ticker := time.NewTicker(p.loopInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            rows, err := p.repo.FetchUnpublished(ctx, p.batchSize)
            if err != nil {
                log.Error("fetch failed", "err", err)
                continue
            }
            for _, row := range rows {
                if err := p.kafka.Publish(ctx, row.Topic, row.CommentID, row.Payload); err != nil {
                    p.repo.MarkPublishError(ctx, row.ID, err)
                    continue
                }
                p.repo.MarkPublished(ctx, row.ID)
            }
        }
    }
}

FetchUnpublished basitçe SELECT ... WHERE published_at IS NULL ORDER BY created_at LIMIT n çalıştırıyor. Her bir satırı Kafka'ya publish etmeyi deneyip başarılıysa published_at'i now() yapıyoruz, başarısızsa last_error'a yazıp bırakıyoruz.

Pattern temiz görünüyor ama sorulabilecek bazı ayrıntılar var.

Örneğin publisher Kafka'ya yazdı ama MarkPublished çağrısından önce crash etti. Restart sonrası publisher aynı row'u tekrar çekecek çünkü published_at hâlâ NULL, Kafka'ya tekrar yazacak. Aynı event Kafka'ya iki kez gitmiş olacak. Öyleyse dual-write sorununu sadece daha ileri ötelemiş olmuyor muyuz? Aslında tam olarak olmuyoruz. Birincisi Kafka zaten at-least-once delivery garantisi veriyor yani duplication consumer'ın çözmesi gereken verili bir problem. İkincisiyse baştaki naive dual-write'ın yarattığı sorun DB ve dış dünyanın kalıcı olarak birbirlerini yalanlıyor oluşuyken burada en kötü ihtimalle aynı doğru bilgi iki kez söyleniyor.

Bir başka senaryo: publisher Kafka'ya yazamadı, hata aldı. Outage boyunca her iterasyon fail edecek ama outage bitince row'lar olduğu yerde duruyor olacak. Eninde sonunda gönderilecek.

Aslında failure bile sayılmayan bir senaryodaysa worker outbox'a yazdı, transaction commit oldu ama publisher henüz row'u görmedi. Bu bir failure değil, sadece publisher'ın bir sonraki polling iterasyonuna kadarki gecikmesi. LOOP_INTERVAL=200ms ise worst-case latency 200ms civarında.
Kritik nokta şu: worker bir kez outbox'a yazıp transaction'ı commit ettiyse event'in Kafka'ya gideceği garanti. Publisher crash olsa, restart olsa, Kafka 10 dakika down kalsa fark etmez. Row durduğu sürece publisher onu eninde sonunda alır ve gönderir. Worker artık tek bir sistemle yani PostgreSQL ile konuşuyor.

Pattern teoride tamamlandı. Production'da iki tane daha incelikli soru var, ikisi de pattern'i kıracak nitelikte değil ama bilsek iyi olur.

İlki publisher'ı kaç instance çalıştıracağınız sorusu. Yukarıdaki kod tek başına çalıştığını varsayıyor. Eğer iki publisher olursa ikisi de aynı anda FetchUnpublished'i çağırdığında aynı satırları görüp Kafka'ya yazmaya başlayacaktır.

Çözüm PostgreSQL'in FOR UPDATE SKIP LOCKED mekanizması:

SELECT ...
FROM outbox_processed
WHERE published_at IS NULL
ORDER BY created_at
LIMIT 100
FOR UPDATE SKIP LOCKED

Birinci publisher row'ları seçerken üzerlerine row-level lock alıyor. İkinci publisher aynı sorguyu çalıştırınca PostgreSQL ona "kilitli olanları atla" diyor dolayısıyla ikinci publisher farklı row'ları görüyor. Lock'lar transaction sonunda otomatik release ediliyor, yani publisher Kafka publish'i ve MarkPublished UPDATE'ini transaction kapanmadan yapmalı. Aksi halde başka bir publisher aynı row'u alıp tekrar publish edebilir. Bu sayede birden çok publisher paralel olarak çalışıp farklı row alt kümelerini işler. Race kaynaklı duplicate yaşanmaz.

Comment-processing-service şu an için bunu kullanmıyor, tek instance çalışıyor. Tek publisher şu an throughput ihtiyacını karşılıyor. Horizontal scale gerektiğinde tek satırlık değişiklikle ve tabii ekstra lock maliyetini göze alarak eklenebilir. Ama o zamana kadar 'tek instance çalışmalı' kuralını deployment'ta unutmamak lazım çünkü kazara iki instance açıldığında pattern sessiz sedasız bozulur, log'da hiçbir uyarı çıkmaz.

İkinci sorun publisher'ın Kafka outage'ında nasıl davrandığı. Şu anki kod her LOOP_INTERVAL'da unpublished row'ları çekiyor, hepsini Kafka'ya yazmaya çalışıyor, fail edenleri olduğu yerde bırakıyor. LOOP_INTERVAL=200ms ise Kafka 10 dakika down kalsa publisher 3000 iterasyon yapıyor, her birinde batch'teki tüm row'lar fail ediyor.

Bu bir correctness sorunu değil, Kafka geri gelince row'lar zaten gönderilecek. Ama log şişiyor (3000 iterasyon × 100 row = 300k hata satırı), ve daha kötüsü Kafka cluster'ı kısmi recovery'deyse publisher'ın hızlı poll'u recovery'yi zorlaştırabilir.

Doğru çözüm backoff eklemek. Outbox tablosuna next_attempt_at TIMESTAMPTZ kolonu ekleyip sorguyu WHERE published_at IS NULL AND (next_attempt_at IS NULL OR next_attempt_at <= now()) yapmak. Bu sayede publish fail edince next_attempt_at = now() + backoff(publish_attempts) set ediliyor. Backoff exponential, bir önceki makaledeki retry gibi jittered. Dolayısıyla outage süresince publisher row'ları görmüyor.

Bu da implement edilmedi. Production'da Kafka outage'ı yeterince nadir ve kısa ki şu anki implementasyon kabul edilebilir.

Sonuç olarak outbox pattern'i temelde atomicity'yi tek sisteme indirgeyip diğerini async besleme stratejisi. Worker'ın iki sistemle birden uğraşmasını ortadan kaldırıyor ve naive dual-write'ın kalıcı inconsistency riskini at-least-once delivery'nin tolere edilebilir duplicate riskine indiriyor. Pattern'in çekirdeği üç şey: aynı transaction'da iş kaydı + niyet kaydı, ayrı bir process'in bu niyetleri okuyup gerçekleştirmesi ve gerçekleştirilen niyetlerin kalıcı olarak işaretlenmesi. Hangi veritabanı, hangi broker, hangi dil önemli değil. Hepsi bu üç parçanın etrafında değişebilir.

Bu prensibin tersi de mevcut: dış dünyadan gelen event'leri DB'ye yazarken aynı atomicity sorunuyla karşılaşıyorsunuz ve onu çözen pattern'e inbox deniyor. Aynı fikrin dağıtık iş akışlarına yayılmış hali ise saga pattern'ine doğru gidiyor. Üçü de "iki sistem arası transaction kuramazsın, o yüzden transactional sınırı tek sistemde tut" prensibinin farklı bağlamlardaki uygulamaları.


Bu yazıda anlattığım pattern'in ve trade-off'ların implementasyonunu, yorum işleyen bir data pipeline'ı bağlamında comment-processing-service reposunda bulabilirsiniz. İnceleyip geri bildirim vermek isterseniz issue veya PR açmaktan çekinmeyin.