Series MapLesson 13 / 35
Build CoreOrdered learning track

Learn Java Messaging Event Streaming Part 013 Rabbitmq Streams Log Based Messaging

19 min read3629 words
PrevNext
Lesson 1335 lesson track0719 Build Core

title: Learn Java Messaging and Event Streaming - Part 013 description: RabbitMQ Streams deep dive: append-only log semantics, stream declaration, retention, producer/consumer API, offset tracking, deduplication, batching, and operational boundaries. series: learn-java-messaging-event-streaming seriesTitle: Learn Java Messaging and Event Streaming order: 13 partTitle: RabbitMQ Streams Log-Based Messaging tags:

  • java
  • rabbitmq
  • rabbitmq-streams
  • messaging
  • event-streaming
  • distributed-systems
  • reliability
  • offset
  • retention date: 2026-06-28

Part 013 — RabbitMQ Streams: Log-Based Messaging Inside RabbitMQ

Tujuan Bagian Ini

Di part sebelumnya kita sudah membahas RabbitMQ sebagai broker berbasis exchange, queue, binding, routing key, consumer acknowledgement, publisher confirm, DLX, TTL, dan retry topology. Bagian ini tidak mengulang itu. Fokus kita sekarang adalah RabbitMQ Streams: fitur RabbitMQ untuk model komunikasi yang lebih dekat ke append-only log daripada queue tradisional.

Setelah menyelesaikan bagian ini, kamu harus bisa:

  1. Menjelaskan mengapa RabbitMQ Streams bukan pengganti semua queue.
  2. Membedakan destructive queue consumption dengan non-destructive stream consumption.
  3. Mendesain stream untuk fan-out besar, replay, large backlog, dan high-throughput ingestion.
  4. Memilih kapan memakai AMQP 0-9-1 access dan kapan memakai RabbitMQ Stream Java Client.
  5. Menggunakan mental model offset, retention, segment, producer confirmation, deduplication, batching, dan compression.
  6. Membaca failure mode RabbitMQ Streams sebelum menjadi incident produksi.

Prinsip utama: RabbitMQ Streams memberi RabbitMQ kemampuan log-based messaging, tetapi tidak otomatis membuat RabbitMQ menjadi Kafka. Streams menambah model baru di dalam RabbitMQ; ia tetap memiliki trade-off, batasan, dan operational personality sendiri.


Kaufman Deconstruction

Mengikuti pendekatan Josh Kaufman, skill besar “menguasai RabbitMQ Streams” harus dipecah menjadi sub-skill kecil yang bisa dilatih cepat.

Sub-skillYang Harus DikuasaiLatihan Efektif
Semantic modelStream sebagai append-only log, bukan queue kosong-akhirBandingkan queue consume vs stream replay dengan payload sama
Declarationx-queue-type=stream, retention, segment sizingBuat stream dengan batas size/time lalu observasi truncation
ProducerPublish confirmation, message properties, batch, dedupKirim 100k event dengan confirm handler dan retry bounded
ConsumerOffset specification, replay, named consumerConsume dari first, next, timestamp, explicit offset
Offset trackingAutomatic, manual, external storeSimulasi crash setelah proses tapi sebelum store offset
Reliabilityduplicate, replay, retention expiry, producer recoveryMatikan koneksi producer/consumer di tengah traffic
Performancebatch size, sub-entry batching, compressionUkur throughput, latency, CPU, disk write, duplicate risk
Operationleader/replica locality, disk, retention, metricsJalankan load test sambil memantau disk dan consumer lag

Target kita bukan “tahu syntax”, tetapi mampu menjawab:

  • Apakah stream ini harus replayable?
  • Berapa lama event harus disimpan?
  • Apakah consumer boleh melihat duplicate?
  • Offset disimpan kapan?
  • Apakah consumer harus bisa restart dari titik terakhir?
  • Apakah event bisa expired sebelum compliance replay selesai?
  • Apakah stream ini seharusnya satu stream atau superstream?

1. Mental Model: Queue Mengosongkan, Stream Menyimpan

RabbitMQ classic/quorum queue biasanya dipakai untuk work distribution. Message masuk ke queue, satu consumer memproses, lalu message di-ack dan keluar dari queue. Goal normalnya adalah queue mendekati kosong.

RabbitMQ Stream berbeda. Stream adalah log yang disimpan, dibaca, dan bisa dibaca ulang sampai retention menghapusnya.

Perbedaan konseptualnya:

DimensiQueueStream
ConsumeDestructive setelah ackNon-destructive
Normal stateKosong atau mendekati kosongMenyimpan backlog hingga retention
ReplayTidak naturalNatural
Fan-out banyak consumerPerlu queue per subscriber/topology tambahanBanyak consumer bisa membaca stream yang sama
Progress consumerAck/delivery tagOffset
RetentionMessage hilang setelah consumed atau TTL/limitMessage hilang karena retention size/time
Cocok untukWork queue, task distribution, command processingEvent history, replay, audit, high fan-out, large backlog

Konsekuensinya besar: di queue, “sudah diproses” sering berarti message tidak ada lagi; di stream, “sudah diproses” berarti consumer sudah menyimpan progress-nya.


2. Kapan RabbitMQ Streams Masuk Akal

RabbitMQ Streams paling masuk akal ketika satu atau lebih kondisi ini benar.

2.1 Large Fan-Out

Misalnya event CaseStatusChanged perlu dibaca oleh:

  • SLA projection service
  • audit trail service
  • notification service
  • enforcement scoring service
  • analytics pipeline
  • investigation dashboard cache

Dengan queue biasa, kamu sering membuat banyak queue terpisah untuk tiap subscriber. Itu valid, tetapi makin banyak subscriber berarti makin banyak queue, binding, storage, replication, dan operational overhead.

Dengan stream, banyak consumer bisa membaca data yang sama dari log yang sama.

2.2 Replay / Time Travel

Replay dibutuhkan ketika:

  • projection rusak dan harus dibangun ulang,
  • bug transformation sudah diperbaiki dan event lama harus diproses ulang,
  • audit/compliance membutuhkan rekonstruksi urutan kejadian,
  • model scoring baru perlu dihitung dari event historis,
  • consumer baru perlu bootstrap dari event lama.

Queue biasa tidak cocok untuk replay karena message yang sudah di-ack sudah hilang. Stream cocok karena consumer bisa attach dari offset/timestamp lama selama data belum dihapus retention.

2.3 Throughput Performance

Streams dirancang untuk throughput tinggi dengan model append, batching, segment file, dan stream protocol. Ini cocok untuk ingestion event volume besar, terutama ketika consumer tidak perlu routing granular seperti exchange/queue topology tradisional.

2.4 Large Backlogs

Queue biasanya optimal ketika backlog tidak tumbuh terus-menerus. Streams lebih cocok untuk backlog besar karena memang dirancang menyimpan data dalam struktur log/segment.

Contoh regulatory platform:

  • case-events disimpan 90 hari untuk operational replay,
  • case-audit-events disimpan 7 tahun di storage compliance terpisah,
  • stream lokal RabbitMQ dipakai sebagai high-throughput operational log, bukan satu-satunya archive final.

3. Kapan RabbitMQ Streams Tidak Masuk Akal

RabbitMQ Streams bukan default untuk semua hal.

Jangan gunakan stream hanya karena terlihat “lebih modern”. Gunakan queue biasa jika:

  1. Setiap message adalah task yang hanya boleh dikerjakan satu worker.
  2. Message tidak perlu replay.
  3. Backlog normalnya kecil.
  4. Kamu butuh DLX semantics per rejected message seperti queue biasa.
  5. Kamu butuh message priority.
  6. Kamu butuh queue-level features yang tidak tersedia di stream.
  7. Consumer processing lebih mirip command execution daripada event observation.

Contoh yang lebih cocok queue:

  • generate PDF report,
  • send email job,
  • execute external API call,
  • OCR document task,
  • payment retry command,
  • long-running enrichment task dengan DLQ per job.

Contoh yang lebih cocok stream:

  • CaseOpened, CaseAssigned, EvidenceReceived, EnforcementActionIssued,
  • audit event feed,
  • high-volume telemetry,
  • immutable domain event history,
  • projection rebuild source.

4. RabbitMQ Streams di Dalam RabbitMQ

Secara operasional, stream tetap hidup di ekosistem RabbitMQ. Ia bisa dideklarasikan sebagai queue dengan tipe stream. Namun semantik penyimpanan dan konsumsi berbeda dari classic/quorum queue.

Ada dua cara umum berinteraksi:

CaraKapan DipakaiTrade-off
AMQP 0-9-1 clientIntegrasi cepat dengan library RabbitMQ biasaTidak mendapatkan seluruh fitur stream-specific dan performa terbaik
RabbitMQ Stream Java ClientThroughput tinggi, offset tracking, dedup, stream-native featuresPerlu model API berbeda dan operational understanding baru

Untuk sistem Java production yang memang memakai stream sebagai primitive utama, gunakan RabbitMQ Stream Java Client. AMQP access cocok untuk compatibility atau transisi, bukan untuk memaksimalkan stream-specific capabilities.


5. Deklarasi Stream dengan AMQP 0-9-1

Stream bisa dideklarasikan sebagai queue dengan argument x-queue-type=stream.

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Collections;

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {

    channel.queueDeclare(
        "case-events",
        true,          // durable
        false,         // not exclusive
        false,         // not auto-delete
        Collections.singletonMap("x-queue-type", "stream")
    );
}

Beberapa hal penting:

  • Tipe queue harus ditentukan saat declaration.
  • Queue type tidak bisa diubah belakangan dengan policy dinamis.
  • Stream tetap bisa dibind ke exchange seperti queue RabbitMQ lain.
  • Stream selalu persistent.
  • Stream punya retention policy sendiri.

6. Retention: Ukuran, Waktu, dan Bahaya Salah Asumsi

Retention adalah batas hidup data di stream. Karena stream tidak menghapus message setelah consumer ack, maka data harus dipotong berdasarkan size atau age.

Contoh declaration dengan argument retention:

import java.util.HashMap;
import java.util.Map;

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-queue-type", "stream");

// Maximum stream size: 20 GB
arguments.put("x-max-length-bytes", 20_000_000_000L);

// Maximum age: 7 days
arguments.put("x-max-age", "7D");

// Segment size: 100 MB
arguments.put("x-stream-max-segment-size-bytes", 100_000_000L);

channel.queueDeclare(
    "case-events",
    true,
    false,
    false,
    arguments
);

Retention Invariant

Retention harus lebih panjang dari waktu maksimum consumer tertinggal plus waktu maksimum recovery plus waktu maksimum investigasi operasional.

Jika consumer lag 3 hari, retention 2 hari, dan consumer crash, maka sebagian event mungkin sudah tidak tersedia untuk replay. Ini bukan “message loss” dari perspektif broker; ini kesalahan desain retention.

Retention Formula Praktis

retention_age >= max_expected_lag
               + max_incident_detection_time
               + max_recovery_time
               + replay_safety_margin

Contoh:

max_expected_lag              = 12 jam
max_incident_detection_time   = 4 jam
max_recovery_time             = 8 jam
replay_safety_margin          = 24 jam
retention_age minimum         = 48 jam

Untuk domain regulatory, biasanya retention operasional perlu jauh lebih panjang karena investigasi issue bisa terjadi beberapa hari setelah incident.


7. Offset Specification: Dari Mana Consumer Mulai Membaca

Consumer stream tidak “mengambil message berikutnya dari queue kosong”. Consumer memilih titik mulai di log.

Pilihan mental model:

Offset SpecificationMaknaUse Case
firstMulai dari message pertama yang masih tersediaFull replay, rebuild projection
lastMulai dari chunk terakhirWarm start mendekati tail
nextMulai dari message baru setelah consumer startLive-only consumer
explicit offsetMulai dari offset numerikRecovery presisi, replay terkontrol
timestampMulai dari waktu arrival tertentuReplay sejak incident time
intervalMulai relatif dari waktu sekarangDebug atau partial replay

Contoh AMQP-style consume dari awal stream:

channel.basicQos(100); // required for stream consumption via AMQP

channel.basicConsume(
    "case-events",
    false,
    Map.of("x-stream-offset", "first"),
    (consumerTag, delivery) -> {
        try {
            byte[] body = delivery.getBody();
            process(body);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        } catch (Exception ex) {
            // Do not blindly requeue in stream processing.
            // Prefer idempotent recovery or explicit failure handling.
            channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
        }
    },
    consumerTag -> {}
);

Catatan penting: dalam stream via AMQP, ack bekerja sebagai credit/progress mechanism, bukan “hapus message dari queue” seperti classic queue.


8. Stream Java Client: Environment, Producer, Consumer

RabbitMQ Stream Java Client memberi API stream-native.

Minimal flow:

import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.OffsetSpecification;
import com.rabbitmq.stream.Producer;

import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class CaseEventStreamSmokeTest {

    public static void main(String[] args) throws Exception {
        Environment environment = Environment.builder().build();

        String stream = "case-events-" + UUID.randomUUID();
        environment.streamCreator().stream(stream).create();

        int messageCount = 10_000;
        CountDownLatch publishConfirmLatch = new CountDownLatch(messageCount);

        Producer producer = environment.producerBuilder()
            .stream(stream)
            .build();

        IntStream.range(0, messageCount).forEach(i -> {
            byte[] payload = ("{\"eventNo\":" + i + "}")
                .getBytes(StandardCharsets.UTF_8);

            producer.send(
                producer.messageBuilder()
                    .addData(payload)
                    .build(),
                confirmationStatus -> {
                    if (!confirmationStatus.isConfirmed()) {
                        // In production: route to retry/outbox/reconciliation path.
                    }
                    publishConfirmLatch.countDown();
                }
            );
        });

        publishConfirmLatch.await(10, TimeUnit.SECONDS);

        CountDownLatch consumeLatch = new CountDownLatch(messageCount);

        Consumer consumer = environment.consumerBuilder()
            .stream(stream)
            .offset(OffsetSpecification.first())
            .messageHandler((offset, message) -> {
                byte[] body = message.getBodyAsBinary();
                process(body, offset);
                consumeLatch.countDown();
            })
            .build();

        consumeLatch.await(10, TimeUnit.SECONDS);

        consumer.close();
        producer.close();
        environment.deleteStream(stream);
        environment.close();
    }

    private static void process(byte[] body, long offset) {
        // Domain processing goes here.
    }
}

Perhatikan hal berikut:

  • Environment adalah object untuk koneksi, stream management, producer, consumer.
  • Producer#send asynchronous dan menerima confirmation callback.
  • MessageBuilder sebaiknya dipakai untuk satu message saja.
  • Confirmation callback harus pendek; jangan blocking connection thread.
  • Consumer bisa mulai dari OffsetSpecification.first(), next(), explicit offset, atau strategi lain.

9. Message Anatomy: Jangan Kirim Byte Tanpa Kontrak

RabbitMQ Stream message bisa membawa:

  • standard properties,
  • application properties,
  • body,
  • annotations.

Contoh message dengan metadata:

Message message = producer.messageBuilder()
    .properties()
        .messageId(UUID.randomUUID())
        .correlationId(correlationId)
        .contentType("application/json")
    .messageBuilder()
    .applicationProperties()
        .entry("eventType", "CaseStatusChanged")
        .entry("schemaVersion", "1")
        .entry("tenantId", tenantId)
    .messageBuilder()
    .addData(payloadBytes)
    .build();

producer.send(message, confirmation -> {
    if (!confirmation.isConfirmed()) {
        scheduleReconciliation(message);
    }
});

Gunakan metadata untuk hal yang benar-benar infrastruktur atau routing/observability:

FieldIsi yang Direkomendasikan
messageIdUnique event/message id untuk idempotency
correlationIdTrace/correlation antar command-event-side-effect
contentTypeapplication/json, application/avro, application/protobuf
application property eventTypeNama semantic event
application property schemaVersionVersi kontrak payload
application property tenantIdTenant atau boundary multi-tenant bila aman
application property producerNama service producer

Hindari payload tanpa envelope jika event akan hidup lama. Stream cenderung membuat data lebih lama tersedia, sehingga kontrak event buruk akan menjadi utang jangka panjang.


10. Offset Tracking: Progress Bukan Ack Biasa

Dalam stream, consumer progress biasanya dinyatakan sebagai offset. RabbitMQ Stream Java Client menyediakan server-side offset tracking.

10.1 Named Consumer

Offset tracking membutuhkan nama consumer dari perspektif aplikasi.

Consumer consumer = environment.consumerBuilder()
    .stream("case-events")
    .name("case-audit-projection")
    .messageHandler((context, message) -> {
        process(message);
    })
    .build();

Dengan nama consumer, client dapat menyimpan offset agar instance berikutnya bisa resume.

10.2 Automatic Offset Tracking

Automatic tracking menyimpan offset secara periodik berdasarkan jumlah message atau interval.

Consumer consumer = environment.consumerBuilder()
    .stream("case-events")
    .name("sla-projection")
    .autoTrackingStrategy()
        .messageCountBeforeStorage(50_000)
        .flushInterval(Duration.ofSeconds(10))
    .builder()
    .messageHandler((context, message) -> {
        process(message);
    })
    .build();

Trade-off:

  • Semakin sering store offset, semakin kecil duplicate window saat restart.
  • Semakin sering store offset, semakin besar overhead karena offset tracking juga dipersist.
  • Semakin jarang store offset, semakin tinggi kemungkinan reprocessing setelah crash.

10.3 Manual Offset Tracking

Manual tracking memberi kontrol eksplisit setelah domain processing selesai.

Consumer consumer = environment.consumerBuilder()
    .stream("case-events")
    .name("enforcement-score-projection")
    .manualTrackingStrategy()
    .builder()
    .messageHandler((context, message) -> {
        processIdempotently(message);

        if (safeToCheckpoint()) {
            context.storeOffset();
        }
    })
    .build();

Manual tracking cocok ketika:

  • processing punya batch transaction,
  • offset hanya boleh disimpan setelah database commit,
  • consumer melakukan side effect eksternal,
  • kamu ingin mengontrol duplicate window.

10.4 External Offset Store

Kadang offset disimpan di database sendiri agar atomic dengan projection state.

Ini lebih kompleks, tetapi sering lebih aman untuk projection yang harus konsisten.

Rule:

Jika offset disimpan di broker tetapi projection commit gagal, kamu bisa skip event. Jika projection commit berhasil tetapi offset belum tersimpan, kamu bisa duplicate event. Duplicate lebih mudah ditangani daripada skip, selama consumer idempotent.


11. Idempotency di Stream Consumer

RabbitMQ Streams membuat replay lebih mudah, dan replay berarti duplicate harus dianggap normal.

Consumer yang benar:

void handle(CaseEvent event, long offset) {
    if (inboxRepository.alreadyProcessed(event.messageId(), consumerName)) {
        return;
    }

    transactionTemplate.executeWithoutResult(tx -> {
        projection.apply(event);
        inboxRepository.markProcessed(
            consumerName,
            event.messageId(),
            "case-events",
            offset
        );
    });
}

Invariant:

Applying the same event more than once must not corrupt business state.

Contoh buruk:

caseCounter.increment(event.caseType());

Jika event duplicate, counter bertambah dua kali.

Contoh lebih aman:

projection.upsertCaseStatus(
    event.caseId(),
    event.status(),
    event.occurredAt(),
    event.eventId()
);

Tetap perlu rule ordering: jangan update status lama mengalahkan status baru.


12. Producer Confirmation dan Duplicate Window

Stream producer menerima confirmation dari broker. Confirmation memberi tahu apakah message sudah diterima/persist oleh broker.

Tetapi ada window klasik:

RabbitMQ Stream mendukung deduplication berbasis:

  • producer name,
  • publishing ID yang strictly increasing.

Contoh producer dengan explicit publishing ID:

Producer producer = environment.producerBuilder()
    .name("case-event-outbox-publisher")
    .stream("case-events")
    .build();

long publishingId = outboxRow.sequence();

Message message = producer.messageBuilder()
    .publishingId(publishingId)
    .properties()
        .messageId(outboxRow.eventId())
    .messageBuilder()
    .addData(outboxRow.payload())
    .build();

producer.send(message, confirmation -> {
    if (confirmation.isConfirmed()) {
        outbox.markPublished(outboxRow.id());
    } else {
        outbox.markPublishUnknown(outboxRow.id());
    }
});

Ingat:

  • publishing ID bukan pengganti business event id,
  • publishing ID harus strictly increasing per producer name,
  • jangan pakai producer name yang sama dari banyak instance aktif kecuali sequence dikontrol benar,
  • dedup producer tidak menghapus kebutuhan idempotent consumer.

13. Batching, Sub-Entry Batching, dan Compression

RabbitMQ Stream Java Client memiliki beberapa level throughput tuning.

MechanismEfekRisiko
producer batchMengurangi overhead publishLatency naik
max unconfirmed messagesMengontrol in-flight publishProducer bisa block
sub-entry batchingThroughput naik signifikanLatency naik, duplicate risk bisa bertambah
compressionBandwidth/storage turunCPU naik
batch publishing delayMengatur flush cadenceTail latency naik

Contoh:

Producer producer = environment.producerBuilder()
    .stream("case-events")
    .batchSize(100)
    .subEntrySize(10)
    .compression(Compression.ZSTD)
    .build();

Gunakan sub-entry batching hanya jika kamu sudah mengukur:

  • p50/p95/p99 publish latency,
  • end-to-end event latency,
  • broker disk write throughput,
  • CPU producer dan consumer,
  • duplicate behavior saat failure,
  • consumer compatibility.

Throughput vs Latency Trade-off

Untuk regulatory case event, jangan hanya optimize throughput. Banyak event seperti escalation, SLA breach, enforcement action, atau legal deadline memiliki latency SLO dan audit importance.


14. Consumer Design Pattern untuk Projection

Stream consumer sering dipakai untuk membangun projection.

Pattern:

  1. Consumer membaca event.
  2. Consumer parse envelope dan payload.
  3. Consumer cek idempotency/inbox.
  4. Consumer apply event ke projection dalam transaction.
  5. Consumer simpan offset atau processed marker.
  6. Consumer checkpoint secara aman.

Pseudo-code:

void onMessage(Message message, long offset) {
    CaseEventEnvelope envelope = decoder.decode(message.getBodyAsBinary());

    transactionTemplate.executeWithoutResult(tx -> {
        if (inbox.exists(consumerName, envelope.eventId())) {
            return;
        }

        switch (envelope.eventType()) {
            case "CaseOpened" -> projection.onCaseOpened(envelope);
            case "CaseStatusChanged" -> projection.onCaseStatusChanged(envelope);
            case "EvidenceReceived" -> projection.onEvidenceReceived(envelope);
            default -> unknownEventPolicy.handle(envelope);
        }

        inbox.insert(consumerName, envelope.eventId(), offset);
        offsets.store("case-events", consumerName, offset);
    });
}

Unknown event policy harus eksplisit:

PolicyKapan Dipakai
Fail fastConsumer harus selalu paham semua event
Skip with metricEvent irrelevant untuk projection
QuarantineEvent type/schema tidak valid
Dead-letter equivalent custom streamPerlu investigasi dan replay manual

Streams tidak memberi DLX semantics yang sama seperti queue. Jadi desain failure output-mu sendiri.


15. Failure Modes RabbitMQ Streams

15.1 Retention Expired Before Replay

Gejala:

  • consumer ingin replay dari offset lama,
  • offset sudah tidak tersedia,
  • projection rebuild tidak bisa full dari stream.

Penyebab:

  • retention terlalu pendek,
  • consumer lag tidak dipantau,
  • event archive tidak tersedia,
  • compliance replay expectation tidak cocok dengan stream retention.

Mitigasi:

  • retention formula eksplisit,
  • monitor oldest available offset/timestamp,
  • external archival pipeline,
  • periodic projection snapshot,
  • replay drill.

15.2 Offset Stored Too Early

Sequence:

Mitigation:

  • store offset after successful processing,
  • prefer external offset store in same transaction as projection,
  • make replay procedure available.

15.3 Offset Stored Too Late

Sequence:

Mitigation:

  • idempotent projection,
  • inbox table,
  • deterministic upsert,
  • duplicate metrics.

This is usually preferable to skip.

15.4 Producer Dedup Misconfigured

Problem:

  • two producer instances use same producer name,
  • publishing IDs overlap or go backward,
  • broker filters valid events as duplicate or stores duplicates unexpectedly.

Mitigation:

  • one active publisher per producer name,
  • producer name includes shard identity,
  • sequence generated from outbox primary key,
  • alert on negative confirmation and duplicate rate.

15.5 Confirmation Callback Blocking

Problem:

  • confirmation handler calls database synchronously,
  • callback blocks connection thread,
  • producer/consumer becomes sluggish.

Mitigation:

  • keep callback short,
  • push work to executor,
  • use bounded queue,
  • separate publish path from reconciliation path.

15.6 Stream Churn

Problem:

  • app creates/deletes streams per request/tenant/job,
  • cluster does filesystem and metadata housekeeping continuously,
  • stream management becomes bottleneck.

Mitigation:

  • streams are long-lived infrastructure resources,
  • manage centrally via IaC/admin process,
  • avoid per-entity streams unless explicitly justified.

16. Operational Invariants

Untuk production, tulis invariants ini di architecture decision record.

Invariant 1 — Stream Retention Must Cover Recovery

No consumer is allowed to depend on replay older than configured retention
unless an external archive exists and the replay path is tested.

Invariant 2 — Consumer Must Be Idempotent

Every RabbitMQ Stream consumer must tolerate duplicate delivery and replay.

Invariant 3 — Offset Must Reflect Durable Processing

A stored offset means all business effects up to that offset are durably applied
or safely ignored through idempotency policy.

Invariant 4 — Producer Confirmation Is Not Business Completion

Broker confirmation means the broker accepted the event. It does not mean
any downstream consumer has processed it.

Invariant 5 — Streams Are Not DLQ Topologies

Poison event handling must be modelled explicitly with quarantine streams,
error projections, or operator workflow.

17. RabbitMQ Streams vs Kafka: Jangan Salah Membandingkan

RabbitMQ Streams dan Kafka sama-sama memberi log-like model, tetapi keputusan tidak boleh hanya berdasarkan kata “stream”.

DimensiRabbitMQ StreamsKafka
Ecosystem fitCocok jika organisasi sudah RabbitMQ-heavyCocok jika event log adalah platform utama
Routing integrationBisa hidup dengan exchange/queue model RabbitMQTopic/partition-first
Stream unitStream, superstreamTopic partitions
Consumer progressOffset tracking via stream client/server/externalConsumer group offset
Scale-outSuperstreamsPartitions
Stream processing ecosystemLebih terbatasKafka Streams, ksqlDB, Connect, ecosystem luas
Operational personalityRabbitMQ cluster semanticsKafka broker/log/controller semantics
Best useRabbitMQ-native event log, replay, fan-outLarge-scale event platform, durable event backbone

Rule praktis:

  • Jika kamu sudah memakai RabbitMQ dan butuh replay/fan-out/high backlog untuk beberapa flow, RabbitMQ Streams bisa sangat masuk akal.
  • Jika seluruh organisasi butuh event backbone, schema registry, stream processing, connect ecosystem, long retention, dan cross-domain data platform, Kafka sering lebih natural.
  • Jika use case hanya task queue, jangan pakai stream hanya karena bisa.

18. Case Study: Regulatory Case Event Feed

Problem

Sebuah platform case-management regulatory butuh:

  • audit trail near real-time,
  • SLA projection,
  • dashboard case status,
  • notification trigger,
  • replay projection setelah bug,
  • event fan-out ke beberapa service.

Design

Event Envelope

{
  "eventId": "01J...",
  "eventType": "CaseStatusChanged",
  "schemaVersion": 1,
  "aggregateType": "Case",
  "aggregateId": "CASE-2026-000912",
  "tenantId": "regulator-id",
  "occurredAt": "2026-06-28T10:15:30Z",
  "producer": "case-service",
  "correlationId": "trace-...",
  "payload": {
    "fromStatus": "UNDER_REVIEW",
    "toStatus": "ESCALATED",
    "reasonCode": "SLA_RISK"
  }
}

Key Decisions

DecisionRationale
Use stream instead of queueMultiple consumers need same event and replay
Use outbox publisherAvoid dual-write bug between DB and broker
Use messageId = eventIdIdempotency across consumers
Use named consumersServer-side or external offset tracking
Use projection-specific inboxDuplicate-safe processing
Retention = 14 days operationalSupports bug fix replay and incident recovery
Archive audit events externallyCompliance retention exceeds stream retention

Anti-Pattern Avoided

Do not use a single “case-events” stream as legal archive unless retention, immutability, backup, access control, and compliance requirements are explicitly satisfied. Operational stream and compliance archive are different responsibilities.


19. Testing Matrix

Minimum tests before production:

TestWhat to Verify
Replay from firstProjection rebuild works
Replay from timestampIncident-time recovery works
Crash after DB commit before offset storeDuplicate handled
Crash after offset store before DB commitNo skip if design claims exactly-once effect
Producer network drop after persist before confirmDedup/reconciliation works
Consumer lag exceeds expectedAlert fires before retention risk
Retention truncationOld data is unavailable as expected
Bad schema eventConsumer quarantines or fails predictably
Slow consumerBackpressure and lag metrics visible
Confirmation callback pressureProducer does not block indefinitely

20. Common Pitfalls

Pitfall 1 — Treating Stream Ack Like Queue Ack

Ack in stream consumption does not mean delete message. It participates in progress/credit mechanics. If your mental model says “ack removes message”, you will misread storage growth.

Pitfall 2 — No Retention Budget

A stream without retention planning can either grow until disk pressure or delete data before replay requirements are satisfied.

Pitfall 3 — Offset Without Idempotency

Offset tracking only controls where to resume. It does not make processing exactly-once.

Pitfall 4 — Producer Dedup Without Business Event ID

Publishing ID helps broker dedup producer retries. Consumer still needs business/event id for idempotency.

Pitfall 5 — Stream Per Tenant Without Capacity Model

Per-tenant stream sounds clean until thousands of streams create metadata and operational overhead. Prefer partitioning/routing strategy unless tenant isolation truly requires separate streams.

Pitfall 6 — Blocking Confirmation Handler

Confirmation callback is not a place for heavy business processing.

Pitfall 7 — Assuming RabbitMQ Streams Equals Kafka

Both expose log-like semantics, but ecosystem, operations, partitioning, stream processing, and governance differ.


21. Practice: 90-Minute Lab

Lab Goal

Build a local RabbitMQ Stream event feed and projection consumer.

Steps

  1. Enable RabbitMQ Stream plugin.
  2. Create stream case-events.
  3. Publish 100k synthetic case events.
  4. Consume from first and build projection.
  5. Stop consumer halfway.
  6. Restart with named consumer and offset tracking.
  7. Replay from first into a new projection table.
  8. Reduce retention in test and observe old data truncation.
  9. Add duplicate event and verify idempotency.
  10. Add confirmation handler metrics.

Expected Learning

At the end, you should be able to explain:

  • why replay is easy but idempotency is mandatory,
  • why offset storage timing matters,
  • why retention is a correctness parameter,
  • why stream throughput tuning changes latency and duplicate window,
  • why stream is not a drop-in replacement for queue retry/DLX behavior.

Ringkasan

RabbitMQ Streams memberi RabbitMQ model persistent replicated append-only log dengan non-destructive consume semantics. Ini membuka use case yang sulit atau mahal dengan queue biasa: large fan-out, replay, high throughput, dan large backlog.

Namun RabbitMQ Streams membawa kewajiban baru:

  • retention harus didesain,
  • offset harus dipahami,
  • consumer harus idempotent,
  • producer confirmation harus direkonsiliasi,
  • dedup harus dikonfigurasi dengan sequence benar,
  • poison event handling harus dibuat eksplisit,
  • stream tidak boleh dipakai sebagai pengganti semua queue.

Mental model paling penting:

Queue asks: who should do this work?
Stream asks: who wants to observe this history, from which point, and how safely?

Di part berikutnya kita naik satu level: Superstreams, yaitu cara RabbitMQ mem-partition logical stream menjadi beberapa stream agar throughput dan storage bisa scale out.


Referensi Resmi yang Dicek

  • RabbitMQ Documentation — Streams and Superstreams
  • RabbitMQ Stream Java Client Documentation
  • RabbitMQ Documentation — Consumer Acknowledgements and Publisher Confirms
  • RabbitMQ Documentation — Reliability and Data Safety
Lesson Recap

You just completed lesson 13 in build core. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.