Deepen PracticeOrdered learning track

Consumer Inbox and Idempotency

Learn Production Grade Contract-First Java Orchestration Platform - Part 033

Consumer Inbox and Idempotency for Kafka-driven Java services with PostgreSQL, MyBatis, and production-grade failure handling.

14 min read2760 words
PrevNext
Lesson 3340 lesson track2333 Deepen Practice
#java#kafka#postgresql#mybatis+4 more

Part 033 — Consumer Inbox and Idempotency

Di part sebelumnya kita membangun producer outbox agar perubahan domain dan event publication tidak menjadi dual-write langsung antara PostgreSQL dan Kafka.

Sekarang kita pindah ke sisi sebaliknya: consumer.

Masalahnya sederhana di permukaan:

"Consumer membaca event dari Kafka lalu melakukan side effect."

Masalah produksinya jauh lebih keras:

"Bagaimana memastikan event yang sama tidak mengubah state dua kali ketika consumer crash, offset commit gagal, pod restart, rebalance terjadi, DB lambat, Kafka mengirim ulang, atau operator menjalankan replay?"

Di sistem produksi, consumer Kafka hampir selalu harus diperlakukan sebagai at-least-once input. Kafka bisa memberikan primitive yang kuat di dalam ekosistem Kafka, tetapi ketika consumer melakukan side effect ke PostgreSQL, Camunda, HTTP API, email, storage, atau sistem lain, kita tetap perlu mendesain idempotency di boundary aplikasi.

Target part ini:

  • membangun mental model offset, processing, side effect, dan duplicate delivery;
  • mendesain consumer inbox table sebagai bukti pemrosesan;
  • membuat algorithm consumer yang aman terhadap crash window;
  • membedakan retry, quarantine, DLQ, dan replay;
  • menghubungkan Kafka offset dengan PostgreSQL transaction secara jujur;
  • membuat MyBatis mapper dan Java service skeleton;
  • menyiapkan runbook produksi untuk duplicate, poison event, dan consumer lag.

Kita tidak akan mengulang basic Kafka consumer API. Kita akan fokus pada reliability pattern.


1. Mental Model: Offset Bukan Bukti Business Processing

Kafka offset adalah posisi baca pada partition. Offset menjawab:

"Consumer group ini sudah sampai posisi mana?"

Offset tidak otomatis menjawab:

"Apakah side effect domain sudah berhasil?"

Side effect bisa berupa:

  • insert/update PostgreSQL;
  • complete human task;
  • correlate Camunda message;
  • call external API;
  • publish event baru;
  • generate audit record;
  • update projection;
  • enqueue notification.

Consumer yang hanya mengandalkan offset akan rapuh.

1.1 Crash Window yang Harus Dianggap Normal

Misal consumer memproses event CaseAccepted.

Kalau update kedua tidak idempotent, sistem rusak.

Contoh kerusakan:

  • counter naik dua kali;
  • SLA obligation dibuat dua kali;
  • Camunda message dikorelasikan dua kali;
  • outbox event turunan publish dua kali;
  • audit menampilkan dua keputusan yang sama;
  • user menerima dua notifikasi;
  • case masuk state transition yang tidak valid.

Kesimpulan desain:

Kafka offset commit tidak boleh menjadi satu-satunya bukti business processing. Kita butuh bukti pemrosesan di storage milik consumer.

Itulah peran consumer inbox.


2. Consumer Inbox Pattern

Consumer inbox adalah tabel database yang mencatat event masuk dan status pemrosesannya untuk consumer tertentu.

Inbox menjawab:

  • event apa yang sudah pernah dilihat;
  • consumer mana yang memproses;
  • apakah event sudah diproses sukses;
  • kalau gagal, error apa dan berapa kali retry;
  • payload asli apa yang diterima;
  • kapan harus retry;
  • apakah event diparkir untuk manual repair.

2.1 Batas Penting

Inbox bukan:

  • pengganti Kafka;
  • tempat query bisnis utama;
  • data warehouse;
  • event store canonical;
  • alasan untuk menyimpan semua payload selamanya tanpa retention.

Inbox adalah operational deduplication and recovery ledger.

2.2 State Machine Inbox

State ini sengaja eksplisit. Jangan menyimpan semua kegagalan sebagai FAILED. FAILED terlalu miskin makna.

Gunakan:

  • RECEIVED: event baru disimpan;
  • PROCESSING: worker sedang mengerjakan;
  • PROCESSED: side effect sudah sukses;
  • RETRY_PENDING: gagal sementara, boleh dicoba lagi;
  • QUARANTINED: tidak aman diproses otomatis;
  • IGNORED: valid untuk diabaikan karena alasan eksplisit.

3. Event Identity: Kunci Idempotency

Agar consumer bisa mengenali duplikasi, event perlu punya identity stabil.

Dari Part 031, envelope event seharusnya punya minimal:

{
  "eventId": "evt_01HZX...",
  "eventType": "regulatory.case.accepted.v1",
  "eventVersion": 1,
  "occurredAt": "2026-07-03T09:15:30Z",
  "producer": "case-command-service",
  "correlationId": "corr_...",
  "causationId": "cmd_...",
  "aggregateType": "case",
  "aggregateId": "case_123",
  "partitionKey": "case_123",
  "payload": {}
}

3.1 Unique Key yang Benar

Minimal unique key:

consumer_name + event_id

Kenapa bukan hanya event_id?

Karena event yang sama boleh dikonsumsi oleh beberapa consumer berbeda:

  • case-projection-consumer;
  • sla-consumer;
  • camunda-correlation-consumer;
  • notification-consumer;
  • analytics-consumer.

Setiap consumer punya obligation berbeda.

3.2 Offset Bukan Event ID

Jangan jadikan topic + partition + offset sebagai satu-satunya event identity untuk idempotency domain.

Offset berguna untuk observability dan debugging. Tetapi event yang sama secara logis bisa muncul lagi pada replay, republish, migration, backfill, atau source connector berbeda.

Simpan keduanya:

  • event_id untuk logical idempotency;
  • topic/partition/offset untuk traceability.

4. PostgreSQL Schema untuk Consumer Inbox

Berikut schema inti.

create schema if not exists integration;

create table integration.consumer_inbox (
    inbox_id              uuid primary key default gen_random_uuid(),

    consumer_name         text not null,
    event_id              text not null,
    event_type            text not null,
    event_version         integer not null,

    aggregate_type        text not null,
    aggregate_id          text not null,

    topic_name            text not null,
    partition_no          integer not null,
    offset_no             bigint not null,

    correlation_id        text null,
    causation_id          text null,
    producer              text not null,

    occurred_at           timestamptz not null,
    received_at           timestamptz not null default now(),

    status                text not null,
    attempt_count         integer not null default 0,
    next_attempt_at       timestamptz null,

    locked_by             text null,
    locked_at             timestamptz null,

    processed_at          timestamptz null,
    quarantined_at        timestamptz null,

    error_code            text null,
    error_message         text null,
    error_detail          jsonb null,

    payload               jsonb not null,
    headers               jsonb not null default '{}'::jsonb,

    created_at            timestamptz not null default now(),
    updated_at            timestamptz not null default now(),

    constraint consumer_inbox_status_ck check (
        status in (
            'RECEIVED',
            'PROCESSING',
            'PROCESSED',
            'RETRY_PENDING',
            'QUARANTINED',
            'IGNORED'
        )
    ),

    constraint consumer_inbox_attempt_ck check (attempt_count >= 0),
    constraint consumer_inbox_event_version_ck check (event_version > 0),

    constraint consumer_inbox_unique_event_per_consumer_uk
        unique (consumer_name, event_id)
);

Tambahkan index operasional.

create index consumer_inbox_retry_idx
on integration.consumer_inbox (consumer_name, status, next_attempt_at, received_at)
where status in ('RECEIVED', 'RETRY_PENDING');

create index consumer_inbox_aggregate_idx
on integration.consumer_inbox (aggregate_type, aggregate_id, received_at desc);

create index consumer_inbox_topic_offset_idx
on integration.consumer_inbox (topic_name, partition_no, offset_no);

create index consumer_inbox_quarantine_idx
on integration.consumer_inbox (consumer_name, quarantined_at desc)
where status = 'QUARANTINED';

4.1 Kenapa Payload Disimpan?

Payload disimpan untuk:

  • audit teknis;
  • replay lokal;
  • debugging contract mismatch;
  • repair manual;
  • membuktikan event apa yang memicu side effect;
  • menghindari kehilangan konteks saat Kafka retention sudah lewat.

Tetapi payload retention harus dibatasi. Jangan menyimpan data sensitif selamanya tanpa kebijakan retention.


5. Dua Cara Menggunakan Inbox

Ada dua pola utama.

5.1 Inline Inbox

Consumer membaca Kafka, menyimpan inbox, memproses domain side effect, lalu commit offset.

Kelebihan:

  • sederhana;
  • latensi rendah;
  • cocok untuk side effect DB lokal;
  • mudah dites.

Kekurangan:

  • Kafka consumer thread menunggu domain processing;
  • kalau side effect lambat, poll loop bisa terganggu;
  • kalau memanggil sistem eksternal, transaction boundary menjadi berbahaya.

5.2 Staged Inbox

Consumer Kafka hanya menyimpan event ke inbox, lalu worker database memproses inbox.

Kelebihan:

  • Kafka poll loop ringan;
  • retry bisa dikendalikan oleh DB;
  • cocok untuk side effect kompleks;
  • bisa pause/resume processing tanpa pause consumption;
  • bagus untuk Camunda correlation, projection, dan external call.

Kekurangan:

  • latensi bertambah;
  • perlu worker terpisah;
  • perlu runbook untuk backlog inbox;
  • offset bisa sudah commit sementara side effect belum selesai, sehingga Kafka lag terlihat aman padahal business lag masih ada.

Untuk sistem case orchestration yang punya Camunda, SLA, dan external dependency, staged inbox sering lebih aman.


6. Rekomendasi untuk Seri Ini

Untuk platform kita, gunakan kombinasi:

Consumer TypePattern
Simple read model projection lokalInline inbox boleh
Camunda correlationStaged inbox
External API callStaged inbox
Notification/emailStaged inbox
SLA obligation updateInline atau staged, tergantung kompleksitas
Analytics sinkStaged/batch

Rule praktis:

Kalau side effect hanya DB lokal dan cepat, inline inbox cukup. Kalau side effect menyentuh engine/workflow/external dependency, staged inbox lebih defensible.


7. Algorithm Inline Inbox

Pseudo algorithm:

for each Kafka record:
    parse envelope
    validate event contract
    begin DB transaction
        inserted = insert inbox row if absent
        if not inserted:
            if existing status is PROCESSED or IGNORED:
                commit DB
                commit Kafka offset
                continue
            if existing status is QUARANTINED:
                commit DB
                do not process automatically
                commit Kafka offset only if policy allows quarantine progress
                continue

        apply domain side effect idempotently
        insert domain audit
        optionally insert outbox events
        mark inbox PROCESSED
    commit DB transaction
    commit Kafka offset

7.1 Critical Ordering

Offset commit harus dilakukan setelah DB commit.

Salah:

commit Kafka offset
apply DB update

Kalau consumer crash setelah offset commit tetapi sebelum DB update, event hilang dari sudut pandang consumer group.

Benar:

apply DB update
commit DB
commit Kafka offset

Kalau crash setelah DB commit tetapi sebelum offset commit, event akan dibaca ulang. Inbox akan deduplicate.


8. Algorithm Staged Inbox

Kafka consumer:

for each record:
    parse envelope
    validate envelope minimally
    begin DB transaction
        insert inbox if absent
    commit DB transaction
    commit Kafka offset

Inbox worker:

loop:
    begin DB transaction
        claim due inbox rows with FOR UPDATE SKIP LOCKED
    commit

    for each row:
        begin DB transaction
            reload row for update
            if status not PROCESSING: skip
            apply side effect
            mark PROCESSED
        commit

        on transient failure:
            mark RETRY_PENDING with next_attempt_at

        on non-retryable failure:
            mark QUARANTINED

Claim query:

with candidate as (
    select inbox_id
    from integration.consumer_inbox
    where consumer_name = #{consumerName}
      and status in ('RECEIVED', 'RETRY_PENDING')
      and (next_attempt_at is null or next_attempt_at <= now())
    order by received_at asc
    limit #{limit}
    for update skip locked
)
update integration.consumer_inbox i
set status = 'PROCESSING',
    locked_by = #{workerId},
    locked_at = now(),
    attempt_count = attempt_count + 1,
    updated_at = now()
from candidate c
where i.inbox_id = c.inbox_id
returning i.*;

8.1 Stale Lock Recovery

Worker bisa mati setelah claim.

Tambahkan repair:

update integration.consumer_inbox
set status = 'RETRY_PENDING',
    next_attempt_at = now(),
    locked_by = null,
    locked_at = null,
    error_code = 'STALE_PROCESSING_LOCK',
    error_message = 'Processing lock exceeded timeout',
    updated_at = now()
where status = 'PROCESSING'
  and locked_at < now() - interval '10 minutes';

Jangan terlalu agresif. Kalau side effect eksternal bisa memakan waktu lama, stale timeout harus disesuaikan.


9. MyBatis Mapper

9.1 Interface

public interface ConsumerInboxMapper {

    int insertIfAbsent(ConsumerInboxInsertRow row);

    ConsumerInboxRow findByConsumerAndEventId(
        @Param("consumerName") String consumerName,
        @Param("eventId") String eventId
    );

    List<ConsumerInboxRow> claimDue(
        @Param("consumerName") String consumerName,
        @Param("workerId") String workerId,
        @Param("limit") int limit
    );

    int markProcessed(
        @Param("inboxId") UUID inboxId,
        @Param("processedAt") OffsetDateTime processedAt
    );

    int markRetryPending(
        @Param("inboxId") UUID inboxId,
        @Param("errorCode") String errorCode,
        @Param("errorMessage") String errorMessage,
        @Param("errorDetail") String errorDetailJson,
        @Param("nextAttemptAt") OffsetDateTime nextAttemptAt
    );

    int markQuarantined(
        @Param("inboxId") UUID inboxId,
        @Param("errorCode") String errorCode,
        @Param("errorMessage") String errorMessage,
        @Param("errorDetail") String errorDetailJson
    );
}

9.2 XML Mapper

<mapper namespace="com.acme.integration.inbox.ConsumerInboxMapper">

  <insert id="insertIfAbsent" parameterType="ConsumerInboxInsertRow">
    insert into integration.consumer_inbox (
      consumer_name,
      event_id,
      event_type,
      event_version,
      aggregate_type,
      aggregate_id,
      topic_name,
      partition_no,
      offset_no,
      correlation_id,
      causation_id,
      producer,
      occurred_at,
      status,
      payload,
      headers
    )
    values (
      #{consumerName},
      #{eventId},
      #{eventType},
      #{eventVersion},
      #{aggregateType},
      #{aggregateId},
      #{topicName},
      #{partitionNo},
      #{offsetNo},
      #{correlationId},
      #{causationId},
      #{producer},
      #{occurredAt},
      'RECEIVED',
      #{payload,jdbcType=OTHER,typeHandler=com.acme.mybatis.JsonbTypeHandler},
      #{headers,jdbcType=OTHER,typeHandler=com.acme.mybatis.JsonbTypeHandler}
    )
    on conflict (consumer_name, event_id) do nothing
  </insert>

  <select id="findByConsumerAndEventId" resultMap="ConsumerInboxRowMap">
    select *
    from integration.consumer_inbox
    where consumer_name = #{consumerName}
      and event_id = #{eventId}
  </select>

  <select id="claimDue" resultMap="ConsumerInboxRowMap">
    with candidate as (
      select inbox_id
      from integration.consumer_inbox
      where consumer_name = #{consumerName}
        and status in ('RECEIVED', 'RETRY_PENDING')
        and (next_attempt_at is null or next_attempt_at &lt;= now())
      order by received_at asc
      limit #{limit}
      for update skip locked
    )
    update integration.consumer_inbox i
    set status = 'PROCESSING',
        locked_by = #{workerId},
        locked_at = now(),
        attempt_count = attempt_count + 1,
        updated_at = now()
    from candidate c
    where i.inbox_id = c.inbox_id
    returning i.*
  </select>

  <update id="markProcessed">
    update integration.consumer_inbox
    set status = 'PROCESSED',
        processed_at = #{processedAt},
        locked_by = null,
        locked_at = null,
        error_code = null,
        error_message = null,
        error_detail = null,
        updated_at = now()
    where inbox_id = #{inboxId}
      and status = 'PROCESSING'
  </update>

  <update id="markRetryPending">
    update integration.consumer_inbox
    set status = 'RETRY_PENDING',
        next_attempt_at = #{nextAttemptAt},
        locked_by = null,
        locked_at = null,
        error_code = #{errorCode},
        error_message = #{errorMessage},
        error_detail = #{errorDetail,jdbcType=OTHER,typeHandler=com.acme.mybatis.JsonbTypeHandler},
        updated_at = now()
    where inbox_id = #{inboxId}
      and status = 'PROCESSING'
  </update>

  <update id="markQuarantined">
    update integration.consumer_inbox
    set status = 'QUARANTINED',
        quarantined_at = now(),
        locked_by = null,
        locked_at = null,
        error_code = #{errorCode},
        error_message = #{errorMessage},
        error_detail = #{errorDetail,jdbcType=OTHER,typeHandler=com.acme.mybatis.JsonbTypeHandler},
        updated_at = now()
    where inbox_id = #{inboxId}
      and status = 'PROCESSING'
  </update>

</mapper>

10. Java Consumer Skeleton

Kita buat consumer tipis. Ia tidak berisi business logic.

public final class KafkaInboxConsumer implements Runnable {

    private final KafkaConsumer<String, String> consumer;
    private final EventEnvelopeParser parser;
    private final TransactionRunner tx;
    private final ConsumerInboxMapper inboxMapper;
    private final String consumerName;

    private volatile boolean running = true;

    @Override
    public void run() {
        consumer.subscribe(List.of("regulatory.case.events.v1"));

        while (running) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));

            for (ConsumerRecord<String, String> record : records) {
                handleRecord(record);
            }

            consumer.commitSync();
        }
    }

    private void handleRecord(ConsumerRecord<String, String> record) {
        EventEnvelope envelope = parser.parse(record.value());

        tx.required(() -> {
            ConsumerInboxInsertRow row = ConsumerInboxInsertRow.from(envelope, record, consumerName);
            inboxMapper.insertIfAbsent(row);
            return null;
        });
    }

    public void shutdown() {
        running = false;
        consumer.wakeup();
    }
}

Perhatikan:

  • consumer tidak melakukan domain side effect;
  • consumer hanya menyimpan event ke inbox;
  • offset commit terjadi setelah DB insert;
  • duplikasi aman karena unique constraint;
  • parsing minimal harus cukup untuk menolak payload rusak.

10.1 Apa yang Terjadi Jika Payload Tidak Bisa Diparse?

Jika envelope rusak total, kita tidak punya event_id.

Pilihan:

  1. jangan commit offset, consumer stuck;
  2. kirim ke raw DLQ dengan metadata topic/partition/offset;
  3. simpan raw poison record ke table consumer_raw_poison;
  4. stop consumer dan minta operator.

Untuk sistem produksi, buat policy eksplisit.

Contoh raw poison table:

create table integration.consumer_raw_poison (
    poison_id      uuid primary key default gen_random_uuid(),
    consumer_name  text not null,
    topic_name     text not null,
    partition_no   integer not null,
    offset_no      bigint not null,
    record_key     text null,
    raw_value      text null,
    headers        jsonb not null default '{}'::jsonb,
    error_message  text not null,
    created_at     timestamptz not null default now(),

    constraint consumer_raw_poison_offset_uk
        unique (consumer_name, topic_name, partition_no, offset_no)
);

Jangan membiarkan satu payload rusak menghentikan seluruh partition tanpa runbook.


11. Inbox Worker Skeleton

public final class InboxWorker implements Runnable {

    private final TransactionRunner tx;
    private final ConsumerInboxMapper inboxMapper;
    private final DomainEventDispatcher dispatcher;
    private final RetryPolicy retryPolicy;
    private final String consumerName;
    private final String workerId;
    private final int batchSize;

    private volatile boolean running = true;

    @Override
    public void run() {
        while (running) {
            List<ConsumerInboxRow> rows = tx.required(() ->
                inboxMapper.claimDue(consumerName, workerId, batchSize)
            );

            if (rows.isEmpty()) {
                sleepQuietly(Duration.ofMillis(250));
                continue;
            }

            for (ConsumerInboxRow row : rows) {
                processOne(row);
            }
        }
    }

    private void processOne(ConsumerInboxRow row) {
        try {
            tx.required(() -> {
                ConsumerInboxRow locked = inboxMapper.findByIdForUpdate(row.inboxId());

                if (!locked.isProcessingBy(workerId)) {
                    return null;
                }

                dispatcher.dispatch(locked);
                inboxMapper.markProcessed(locked.inboxId(), OffsetDateTime.now());
                return null;
            });
        } catch (TransientProcessingException ex) {
            tx.requiresNew(() -> {
                OffsetDateTime next = retryPolicy.nextAttempt(row.attemptCount());
                inboxMapper.markRetryPending(row.inboxId(), ex.code(), ex.getMessage(), ex.detailJson(), next);
                return null;
            });
        } catch (NonRetryableProcessingException ex) {
            tx.requiresNew(() -> {
                inboxMapper.markQuarantined(row.inboxId(), ex.code(), ex.getMessage(), ex.detailJson());
                return null;
            });
        }
    }
}

11.1 Dispatcher Harus Deterministic

public final class DomainEventDispatcher {

    private final Map<String, DomainEventHandler> handlers;

    public void dispatch(ConsumerInboxRow row) {
        DomainEventHandler handler = handlers.get(row.eventType());

        if (handler == null) {
            throw new NonRetryableProcessingException(
                "UNSUPPORTED_EVENT_TYPE",
                "No handler for event type " + row.eventType()
            );
        }

        handler.handle(row);
    }
}

Jangan membuat dispatcher yang mencari class via reflection liar berdasarkan nama event dari payload. Itu membuka attack surface dan membuat behavior sulit diaudit.


12. Handler Idempotency

Inbox mencegah event yang sama diproses dua kali oleh consumer yang sama. Tetapi handler juga tetap harus idempotent di level domain.

Kenapa?

Karena:

  • operator bisa menjalankan repair manual;
  • event lama bisa dipublish ulang dengan event id berbeda;
  • consumer name bisa berubah;
  • bug historical bisa menyebabkan duplikasi;
  • side effect eksternal belum tentu terlindungi inbox.

12.1 Contoh Handler: Update Projection

public final class CaseAcceptedProjectionHandler implements DomainEventHandler {

    private final CaseProjectionMapper projectionMapper;

    @Override
    public void handle(ConsumerInboxRow row) {
        CaseAcceptedPayload payload = row.payloadAs(CaseAcceptedPayload.class);

        int updated = projectionMapper.upsertAcceptedState(
            payload.caseId(),
            payload.acceptedAt(),
            row.eventId()
        );

        if (updated != 1) {
            throw new TransientProcessingException(
                "PROJECTION_UPSERT_FAILED",
                "Expected one projection row to be upserted"
            );
        }
    }
}

SQL:

insert into read_model.case_summary (
    case_id,
    case_status,
    accepted_at,
    last_event_id,
    updated_at
)
values (
    #{caseId},
    'ACCEPTED',
    #{acceptedAt},
    #{eventId},
    now()
)
on conflict (case_id) do update
set case_status = excluded.case_status,
    accepted_at = excluded.accepted_at,
    last_event_id = excluded.last_event_id,
    updated_at = now()
where read_model.case_summary.last_event_id is distinct from excluded.last_event_id;

12.2 Contoh Handler: Create SLA Obligation

SLA creation tidak boleh duplicate.

create unique index sla_obligation_case_type_uk
on case_core.sla_obligation (case_id, obligation_type)
where cancelled_at is null;

Handler:

public void handle(ConsumerInboxRow row) {
    CaseAcceptedPayload payload = row.payloadAs(CaseAcceptedPayload.class);

    slaMapper.insertIfAbsent(
        payload.caseId(),
        "INITIAL_ASSESSMENT_DUE",
        payload.acceptedAt().plusDays(5),
        row.eventId()
    );
}

SQL:

insert into case_core.sla_obligation (
    case_id,
    obligation_type,
    due_at,
    source_event_id,
    status
)
values (
    #{caseId},
    #{obligationType},
    #{dueAt},
    #{sourceEventId},
    'OPEN'
)
on conflict do nothing;

Idempotency terbaik sering berupa unique constraint bisnis, bukan if di Java.


13. Retry Taxonomy

Tidak semua error harus retry.

ErrorRetry?Action
DB connection timeoutyesretry with backoff
PostgreSQL serialization failureyesretry transaction
Lock timeoutyesretry
Unsupported event versionnoquarantine
Missing required field after schema validationnoquarantine
Referenced case not founddependsretry if eventual consistency expected; otherwise quarantine
External API 500yesretry
External API 400noquarantine
Camunda no matching executiondependsretry if process may not yet wait; quarantine after threshold
Duplicate eventnomark processed/ignored

13.1 Backoff

public final class RetryPolicy {

    public OffsetDateTime nextAttempt(int attemptCount) {
        long seconds = switch (attemptCount) {
            case 0, 1 -> 5;
            case 2 -> 30;
            case 3 -> 120;
            case 4 -> 600;
            default -> 1800;
        };

        long jitter = ThreadLocalRandom.current().nextLong(0, Math.min(60, seconds));
        return OffsetDateTime.now().plusSeconds(seconds + jitter);
    }

    public boolean shouldQuarantine(int attemptCount) {
        return attemptCount >= 10;
    }
}

Backoff tanpa jitter dapat menciptakan retry storm.


14. DLQ vs Quarantine

DLQ sering dipakai terlalu cepat.

DLQ baik untuk:

  • payload tidak bisa diparse;
  • schema tidak didukung;
  • poison record yang menghambat partition;
  • event yang tidak dapat diproses otomatis setelah threshold.

Tetapi DLQ bukan solusi ajaib.

Jika Anda memindahkan record ke DLQ tanpa menyimpan:

  • consumer name;
  • original topic/partition/offset;
  • event id;
  • error code;
  • contract version;
  • handler name;
  • correlation id;
  • aggregate id;
  • timestamp kegagalan;
  • retry count;

maka DLQ hanya menjadi kuburan payload.

14.1 Quarantine Table Lebih Operasional

Karena kita sudah punya inbox, QUARANTINED sering lebih berguna daripada publish ke DLQ topic.

Kelebihan quarantine table:

  • bisa query dengan SQL;
  • bisa dikaitkan ke case;
  • bisa punya UI internal;
  • bisa diretry dengan update status;
  • bisa diaudit;
  • bisa diberi owner.

DLQ topic tetap boleh dipakai untuk integrasi platform observability, tetapi jangan menggantikan operational ledger.


15. Partition Ordering dan Head-of-Line Blocking

Kafka memberi ordering dalam satu partition. Jika key kita case_id, semua event untuk case yang sama berada di partition yang sama dan urut relatif terhadap producer/topic.

Tetapi inbox worker bisa merusak ordering jika memproses rows paralel tanpa aturan.

15.1 Risk

Event:

  1. CaseAccepted
  2. EvidenceRequested
  3. CaseClosed

Kalau worker memproses CaseClosed sebelum CaseAccepted, projection kacau.

15.2 Strategy

Untuk event yang butuh ordering per aggregate:

  • claim rows ordered by received_at;
  • batasi parallelism per aggregate;
  • gunakan advisory lock per aggregate saat processing;
  • atau simpan sequence number di event contract.

Contoh advisory lock:

select pg_advisory_xact_lock(hashtext(#{aggregateType} || ':' || #{aggregateId}));

Gunakan advisory lock secara disiplin. Jangan lock terlalu banyak resource dalam satu transaction.

15.3 Sequence Number Contract

Jika producer bisa menyediakan aggregateVersion, consumer bisa mendeteksi gap.

{
  "aggregateId": "case_123",
  "aggregateVersion": 17
}

Projection table:

alter table read_model.case_summary
add column last_aggregate_version bigint not null default 0;

Update hanya jika versi berikutnya valid:

update read_model.case_summary
set case_status = #{newStatus},
    last_aggregate_version = #{aggregateVersion},
    updated_at = now()
where case_id = #{caseId}
  and last_aggregate_version = #{aggregateVersion} - 1;

Kalau update 0 row:

  • duplicate version: ignore if already processed;
  • gap: retry later;
  • out-of-order: retry;
  • invalid event: quarantine.

16. Rebalance, Pod Shutdown, dan Offset Commit

Kubernetes akan restart pod. Kafka consumer group akan rebalance. Ini normal.

Consumer harus:

  • handle graceful shutdown;
  • stop polling;
  • finish current DB transaction;
  • commit offset yang aman;
  • close consumer;
  • avoid long processing inside poll thread;
  • expose readiness false saat shutdown.

16.1 Readiness During Shutdown

SIGTERM received
-> readiness = false
-> stop accepting new work
-> stop poll loop
-> finish current record/batch
-> commit offsets
-> close consumer
-> exit before terminationGracePeriodSeconds

Untuk staged inbox, shutdown lebih mudah karena poll thread hanya insert inbox.


17. Observability

Minimum metrics:

MetricMeaning
kafka_consumer_lagKafka lag per topic/partition
consumer_inbox_received_totalevent inserted
consumer_inbox_duplicate_totalduplicate event detected
consumer_inbox_processed_totalprocessed success
consumer_inbox_retry_totalretry scheduled
consumer_inbox_quarantined_totalpoison/non-retryable
consumer_inbox_oldest_pending_age_secondsbusiness lag
consumer_inbox_processing_duration_secondsside effect latency
consumer_inbox_stale_processing_totalworker crash indicator

Structured log fields:

{
  "event": "consumer_inbox_processed",
  "consumerName": "case-camunda-correlation-consumer",
  "eventId": "evt_01HZX...",
  "eventType": "regulatory.case.accepted.v1",
  "aggregateId": "case_123",
  "topic": "regulatory.case.events.v1",
  "partition": 4,
  "offset": 120932,
  "correlationId": "corr_...",
  "attemptCount": 1,
  "durationMs": 37
}

Penting:

Kafka lag rendah tidak berarti sistem sehat jika staged inbox backlog tinggi.

Dashboard harus menampilkan dua lag:

  • Kafka lag;
  • inbox business lag.

18. Test Matrix

18.1 Unit Tests

  • parser menolak envelope tanpa event id;
  • retry policy memberi backoff;
  • dispatcher memilih handler benar;
  • unsupported event type menjadi non-retryable;
  • duplicate event tidak memanggil handler.

18.2 PostgreSQL Integration Tests

Gunakan Testcontainers PostgreSQL.

Test:

  • insertIfAbsent duplicate;
  • claim batch dengan SKIP LOCKED;
  • stale lock recovery;
  • markProcessed hanya bekerja dari PROCESSING;
  • unique constraint consumer/event;
  • index query plan untuk backlog besar.

18.3 Kafka Integration Tests

Gunakan Kafka test environment.

Test crash windows:

ScenarioExpected
crash after DB commit before offset commitevent re-read, inbox duplicate, no double side effect
crash before DB commitevent re-read, side effect eventually done
duplicate event id with different offsetduplicate detected
poison payloadraw poison saved or DLQ
handler transient failureretry scheduled
handler non-retryable failurequarantined
worker crash after claimstale lock repair

18.4 Replay Tests

Replay adalah fitur produksi, bukan eksperimen manual.

Test replay harus menjawab:

  • Apakah event lama bisa diproses tanpa merusak state?
  • Apakah handler idempotent?
  • Apakah event version lama masih didukung?
  • Apakah replay memicu notifikasi eksternal ulang?
  • Apakah outbox event turunan dibuat lagi?

Untuk side effect seperti email, gunakan suppression flag atau separate handler.


19. Failure Model

FailureDetectionRecovery
Kafka record duplicateunique constraint conflictmark skip/processed
DB commit success, offset commit failduplicate on restartinbox dedup
offset commit success, DB commit failshould not happen if ordering correctfix code; data repair if happened
worker crash after claimstale PROCESSINGstale lock recovery
unsupported eventquarantinedeploy handler or ignore with audit
event arrives before referenced aggregateretry pendingeventual consistency wait
event version unsupportedquarantinecompatibility release
handler bugquarantine/retry stormdisable worker, patch, replay
backlog growsmetricsscale worker, inspect bottleneck
poison event blocks partitionraw poison policyDLQ/quarantine and commit offset

20. Production Runbook

20.1 Duplicate Spike

Check:

select consumer_name, event_type, count(*)
from integration.consumer_inbox
where received_at > now() - interval '1 hour'
group by consumer_name, event_type
order by count(*) desc;

Questions:

  • Did producer retry aggressively?
  • Did outbox publisher republish?
  • Did consumer group reset offset?
  • Did replay job run?
  • Are duplicates harmless?

20.2 Quarantine Review

select consumer_name,
       event_type,
       error_code,
       count(*) as total,
       min(quarantined_at) as first_seen,
       max(quarantined_at) as last_seen
from integration.consumer_inbox
where status = 'QUARANTINED'
group by consumer_name, event_type, error_code
order by total desc;

Decision:

  • deploy handler fix;
  • patch reference data;
  • ignore event with explicit reason;
  • replay after repair;
  • update contract compatibility.

20.3 Requeue Quarantined Event

Jangan update langsung tanpa audit.

update integration.consumer_inbox
set status = 'RETRY_PENDING',
    next_attempt_at = now(),
    quarantined_at = null,
    error_code = null,
    error_message = null,
    error_detail = null,
    updated_at = now()
where inbox_id = #{inboxId}
  and status = 'QUARANTINED';

Tambahkan audit operation di table terpisah.


21. Anti-Patterns

21.1 Commit Offset Sebelum Side Effect

Ini bisa menghilangkan event.

21.2 Consumer Handler Tidak Idempotent

Inbox saja tidak cukup jika event logis bisa dipublish ulang dengan ID berbeda.

21.3 DLQ sebagai Tempat Sampah

DLQ tanpa owner, metadata, dashboard, dan replay procedure adalah kuburan.

21.4 Semua Error Diretry

Unsupported contract tidak akan sembuh karena retry.

21.5 Parallel Worker Tanpa Ordering Contract

Throughput naik, correctness turun.

21.6 Menyimpan Payload Sensitif Selamanya

Inbox perlu retention dan redaction policy.

21.7 Menganggap Kafka Exactly-Once Menyelesaikan Semua Side Effect

Kafka transaction tidak otomatis membuat PostgreSQL, Camunda, email, dan external API menjadi exactly-once.


22. Production Checklist

Sebelum consumer production:

  • Event envelope punya eventId, eventType, eventVersion, aggregateId, correlationId.
  • consumer_name + event_id unique.
  • Offset commit hanya setelah DB insert/processing aman.
  • Handler idempotent di level domain.
  • Retry taxonomy eksplisit.
  • Quarantine punya owner dan dashboard.
  • Raw poison payload punya policy.
  • Stale processing repair tersedia.
  • Kafka lag dan inbox business lag dimonitor.
  • Replay test tersedia.
  • Duplicate test tersedia.
  • Shutdown behavior dites di Kubernetes.
  • Payload retention/redaction disepakati.
  • Consumer version compatibility diuji.
  • Runbook duplicate/quarantine/replay tersedia.

23. What You Should Internalize

Consumer inbox bukan pattern tambahan yang "nice to have".

Untuk sistem produksi event-driven, consumer inbox adalah pagar antara:

  • delivery dan processing;
  • offset dan side effect;
  • message arrival dan business truth;
  • retry otomatis dan manual recovery.

Top engineer tidak bertanya:

"Apakah Kafka akan mengirim duplicate?"

Mereka bertanya:

"Ketika duplicate terjadi, invariant mana yang tetap aman, dan bukti apa yang kita punya untuk membuktikannya?"

Di part berikutnya, kita pakai consumer inbox ini untuk masalah yang lebih sulit: menghubungkan Kafka event ke Camunda process instance tanpa membuat process correlation rapuh, duplicate, atau hilang saat race dengan wait state.

Lesson Recap

You just completed lesson 33 in deepen practice. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.

Continue The Track

Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.