Start HereOrdered learning track

Event Contracts with AsyncAPI and Kafka

Learn Production Grade Contract-First Java Orchestration Platform - Part 007

Desain kontrak event-driven production-grade dengan AsyncAPI dan Kafka untuk platform regulatory case orchestration, termasuk event taxonomy, envelope, topic model, partition key, schema evolution, replay safety, dan consumer contract.

21 min read4101 words
PrevNext
Lesson 0740 lesson track0108 Start Here
#java#kafka#asyncapi#event-driven-architecture+4 more

Part 007 — Event Contracts with AsyncAPI and Kafka

HTTP contract menjawab pertanyaan: bagaimana client meminta sesuatu ke sistem?

Event contract menjawab pertanyaan yang lebih berbahaya: apa yang sistem umumkan ke dunia luar, dalam urutan apa, dengan makna apa, dan apakah pengumuman itu aman diproses ulang?

Di sistem kecil, event sering dibuat seperti log:

{
  "caseId": "CASE-2026-000001",
  "status": "ACCEPTED"
}

Itu terlihat cukup sampai ada consumer yang membangun SLA projection, consumer lain mengirim notifikasi, consumer lain membuat report, dan consumer lama masih membaca payload versi sebelumnya.

Production event tidak boleh hanya menjadi "pesan yang kebetulan dikirim ke Kafka". Event adalah kontrak integrasi yang bisa memengaruhi banyak service, laporan regulator, automasi workflow, dan proses recovery.

Part ini membangun kontrak event untuk Regulatory Enforcement & Case Orchestration Platform menggunakan pendekatan AsyncAPI + Kafka.

Kita tidak akan mengulang basic Kafka seperti apa itu broker atau consumer. Fokus kita adalah desain production-grade:

  • event taxonomy;
  • topic ownership;
  • message envelope;
  • partition key;
  • ordering boundary;
  • idempotency;
  • schema evolution;
  • replay safety;
  • consumer obligation;
  • DLQ/retry contract;
  • correlation dengan HTTP, database, dan Camunda 7.

1. Mental Model: Event adalah Fakta, Bukan Request Terselubung

Kesalahan paling umum dalam event-driven system adalah memakai event sebagai remote command yang disamarkan.

Contoh buruk:

{
  "eventType": "PleaseStartInvestigation",
  "caseId": "CASE-2026-000001"
}

Nama event itu bukan fakta. Itu permintaan.

Event yang benar harus menyatakan sesuatu yang sudah terjadi:

{
  "eventType": "CaseAccepted",
  "caseId": "CASE-2026-000001",
  "acceptedAt": "2026-07-02T09:15:22Z"
}

Bedanya besar.

Command berkata:

Please do X.

Event berkata:

X happened.

Dalam sistem produksi, perbedaan ini memengaruhi ownership:

BentukMaknaOwner keputusanConsumer boleh menolak?Aman untuk replay?
Commandpermintaan melakukan aksireceiveryabiasanya tidak tanpa idempotency kuat
Eventfakta yang sudah terjadipublisher/domain ownertidak mengubah fakta, hanya gagal memproses projectionya, jika handler idempotent
Querypermintaan membaca statereceiveryatidak relevan

Di seri ini:

  • HTTP endpoint menerima command intent dari client.
  • Domain service memutuskan state transition.
  • PostgreSQL menyimpan state dan outbox dalam satu transaksi.
  • Kafka publisher membaca outbox dan menerbitkan event fact.
  • Consumer membangun projection, memicu workflow correlation, mengirim notifikasi, atau menjalankan side effect.

Invariant penting:

Event published tidak boleh menceritakan sesuatu yang belum committed di database.

Karena itu, event publishing tidak dilakukan langsung dari request handler sebelum transaksi database commit. Kita akan bahas outbox mendalam di Part 032. Di part ini, kita mendefinisikan kontraknya dulu.


2. Event Taxonomy untuk Case Platform

Tidak semua pesan di Kafka memiliki makna sama.

Kita butuh taxonomy agar nama topic, payload, lifecycle, retention, dan consumer expectation jelas.

Untuk studi kasus ini, kita pakai kategori berikut.

KategoriContohMaknaRetentionReplay
Domain eventCaseAccepted, CaseRejectedfakta domain dari bounded context casepanjangharus replayable
Process eventInvestigationTaskCreated, SlaBreachedfakta workflow/processmenengah-panjangreplayable untuk projection
Integration eventCaseAcceptedIntegrationEventfakta yang distabilkan untuk sistem eksternalpanjangreplayable
Notification eventCaseOfficerAssignedNotificationRequestedpermintaan side effect notifikasipendek-menengahidempotent tapi tidak selalu perlu full replay
Audit eventCaseAuditEntryAppendedimmutable audit signalsangat panjangreplayable, sensitif akses
Operational eventOutboxPublisherLaggedtelemetry operasionalpendektidak untuk domain recovery

Kita akan disiplin:

  • domain event diterbitkan oleh service pemilik aggregate;
  • integration event adalah kontrak eksternal yang lebih stabil dari internal domain event;
  • notification event tidak boleh menjadi sumber kebenaran domain;
  • operational event tidak boleh dipakai consumer bisnis;
  • audit event memiliki aturan privasi lebih ketat.

2.1 Domain Event vs Integration Event

Domain event sering terlalu kaya atau terlalu dekat ke internal model.

Contoh domain event internal:

{
  "eventType": "CaseRiskAssessmentCompleted",
  "caseId": "CASE-2026-000001",
  "assessmentVersion": 12,
  "riskScore": 87,
  "riskBand": "HIGH",
  "ruleResults": [
    { "ruleCode": "LATE_DISCLOSURE", "matched": true },
    { "ruleCode": "REPEAT_OFFENDER", "matched": true }
  ]
}

Consumer eksternal mungkin tidak boleh melihat ruleResults. Maka kita terbitkan integration event yang lebih aman:

{
  "eventType": "CaseRiskClassified",
  "caseId": "CASE-2026-000001",
  "riskBand": "HIGH",
  "classifiedAt": "2026-07-02T09:15:22Z"
}

Jangan menganggap semua event internal otomatis boleh menjadi integration event.

Rule:

Integration event adalah public API. Domain event adalah internal language. Jangan membocorkan internal language kecuali memang sengaja dijadikan public contract.


3. Topic Design: Topic adalah Boundary, Bukan Folder Pesan

Topic Kafka sering dinamai sembarangan:

case-events
case-events-v2
case-events-new
case-events-final
case-events-final-2

Itu tanda tidak ada ownership dan lifecycle.

Topic production harus menjawab:

  • siapa owner topic?
  • apa jenis message di dalamnya?
  • apakah ordering dibutuhkan?
  • apa partition key?
  • apakah compaction dipakai?
  • berapa retention?
  • siapa consumer yang diizinkan?
  • apakah payload berisi data sensitif?
  • apakah topic bisa direplay?
  • apa compatibility policy?

Untuk seri ini, kita pakai konvensi:

<domain>.<stream>.<classification>.v<major>

Contoh:

enforcement.case.lifecycle.public.v1
enforcement.case.audit.restricted.v1
enforcement.case.sla.internal.v1
enforcement.case.notification.internal.v1
enforcement.case.outbox-operational.internal.v1

Makna field:

BagianContohFungsi
domainenforcementdomain besar organisasi
streamcase.lifecyclestream/subdomain
classificationpublic, internal, restrictedakses dan sensitivitas
versionv1major compatibility boundary

3.1 Jangan Satu Topic untuk Semua Hal

Satu topic besar seperti events memberi ilusi sederhana. Masalahnya muncul saat:

  • retention audit harus panjang, notification pendek;
  • consumer tertentu hanya boleh baca event publik;
  • partition key berbeda per jenis event;
  • event volume SLA tinggi, event decision rendah;
  • schema compatibility berbeda;
  • replay satu subset menjadi mahal.

Lebih baik topic dipisah berdasarkan stream semantics, bukan berdasarkan nama service semata.

Contoh pemisahan:

enforcement.case.lifecycle.public.v1
  - CaseSubmitted
  - CaseAccepted
  - CaseRejected
  - CaseClosed

enforcement.case.assignment.internal.v1
  - CaseAssigned
  - CaseUnassigned
  - CaseReassigned

enforcement.case.sla.internal.v1
  - SlaStarted
  - SlaPaused
  - SlaResumed
  - SlaBreached

enforcement.case.audit.restricted.v1
  - CaseAuditEntryAppended

3.2 Kapan Satu Topic Berisi Banyak Event Type?

Satu topic boleh berisi beberapa event type jika mereka berbagi:

  • partition key sama;
  • ordering boundary sama;
  • retention sama;
  • audience sama;
  • security classification sama;
  • lifecycle compatibility sama.

Untuk case.lifecycle, semua event memakai caseId sebagai partition key dan consumer biasanya perlu urutan lifecycle per case. Maka beberapa event type dalam satu topic masih masuk akal.


4. Partition Key: Ordering Boundary yang Sering Salah Dipilih

Kafka memberi ordering dalam satu partition, bukan seluruh topic.

Karena itu partition key bukan detail teknis kecil. Partition key menentukan:

  • urutan apa yang dijamin;
  • parallelism maksimum consumer;
  • potensi hot partition;
  • cara replay;
  • cara correlation;
  • dampak resharding.

Untuk lifecycle case, partition key natural adalah caseId.

key = CASE-2026-000001

Artinya semua event lifecycle untuk case yang sama masuk ke partition yang sama sehingga consumer melihat urutan per case.

4.1 Partition Key Decision Table

StreamCandidate keyDipilihAlasan
case lifecyclecaseId, tenantId, officerIdcaseIdperlu urutan per case
assignmentcaseId, assigneeIdcaseIdassignment adalah bagian lifecycle case
SLAcaseId, slaIdcaseId atau slaIdtergantung satu case bisa punya banyak SLA paralel atau tidak
auditcaseId, auditEntryIdcaseIdaudit timeline perlu urut per case
notificationrecipientId, caseIdtergantung deliverykalau rate limit per recipient, pilih recipientId
reporting aggregateperiod, caseTypejangan pakai lifecycle topicbuat projection sendiri

4.2 Hot Partition Risk

caseId biasanya aman jika volume tersebar. Tetapi ada domain yang tidak aman:

  • satu mega-case punya jutaan evidence event;
  • satu tenant besar menghasilkan 80% traffic;
  • satu officer menangani bulk reassignment;
  • satu scheduled job menghasilkan burst untuk satu key.

Jangan pilih tenantId sebagai partition key jika satu tenant bisa mendominasi throughput.

Rule praktis:

Pilih partition key berdasarkan ordering requirement terkecil yang benar, bukan berdasarkan field yang paling mudah.

Kalau ordering hanya perlu per case, jangan pakai tenant.


5. Event Envelope: Metadata Wajib Production

Payload event tidak boleh hanya body domain. Kita perlu envelope.

Envelope menyediakan metadata lintas event:

  • eventId untuk deduplication;
  • eventType untuk routing;
  • eventVersion untuk compatibility;
  • occurredAt untuk waktu domain terjadi;
  • publishedAt untuk waktu event dikirim;
  • producer untuk ownership;
  • correlationId untuk trace lintas request/event/process;
  • causationId untuk hubungan sebab-akibat;
  • tenantId untuk multi-tenancy;
  • subject untuk entity utama;
  • schemaRef untuk schema governance;
  • dataClassification untuk privacy/security.

Envelope dasar:

{
  "eventId": "018f9f53-8a39-7c18-b0b9-9e0d60a1f2a1",
  "eventType": "CaseAccepted",
  "eventVersion": 1,
  "occurredAt": "2026-07-02T09:15:22.123Z",
  "publishedAt": "2026-07-02T09:15:23.010Z",
  "producer": "case-command-service",
  "tenantId": "regulator-id",
  "subject": "case/CASE-2026-000001",
  "correlationId": "corr-01J1V7W1AZ5Q5Q7F1F7WQGZ0PE",
  "causationId": "cmd-01J1V7W12M8HDQK8SZE9AMY0Z1",
  "partitionKey": "CASE-2026-000001",
  "schemaRef": "asyncapi://enforcement.case.lifecycle.public.v1/CaseAccepted/1",
  "dataClassification": "PUBLIC_TO_AUTHORIZED_SYSTEMS",
  "data": {
    "caseId": "CASE-2026-000001",
    "acceptedAt": "2026-07-02T09:15:22.123Z",
    "acceptedBy": "officer-123",
    "caseType": "MARKET_ABUSE",
    "riskBand": "HIGH"
  }
}

5.1 occurredAt vs publishedAt

Jangan samakan waktu domain dengan waktu publish.

FieldMakna
occurredAtwaktu fakta domain terjadi/committed
publishedAtwaktu publisher berhasil mengirim ke Kafka

Saat outbox tertunda karena Kafka outage, occurredAt bisa jauh lebih awal daripada publishedAt.

Consumer SLA harus memakai waktu yang benar.

Contoh:

Case accepted at 09:00.
Kafka outage until 09:20.
Event published at 09:21.
SLA 4 hours should start from 09:00, not 09:21.

5.2 eventId Bukan caseId

caseId mengidentifikasi entity. eventId mengidentifikasi fakta unik.

Satu case bisa punya banyak event:

caseId = CASE-2026-000001
  eventId = evt-1 CaseSubmitted
  eventId = evt-2 CaseAccepted
  eventId = evt-3 InvestigationStarted

Consumer idempotency harus berdasarkan eventId, bukan hanya caseId. Kalau consumer menyimpan caseId saja, event kedua untuk case yang sama bisa keliru dianggap duplikat.


6. Event Body Design: Minimal Tapi Cukup

Ada dua ekstrem yang sama-sama buruk.

Ekstrem pertama: event terlalu kecil.

{
  "caseId": "CASE-2026-000001"
}

Consumer harus call API balik untuk tahu detail. Ini menciptakan synchronous dependency dan membuat replay sulit.

Ekstrem kedua: event terlalu besar.

{
  "case": {
    "everything": "including mutable fields, comments, evidence, documents, pii, internal scores"
  }
}

Consumer menjadi tergantung ke semua detail internal, data sensitif menyebar, dan perubahan payload menjadi mahal.

Rule:

Event harus membawa data yang diperlukan consumer untuk bereaksi terhadap fakta tersebut, tetapi tidak boleh menjadi dump database row.

6.1 CaseAccepted Body

{
  "caseId": "CASE-2026-000001",
  "caseType": "MARKET_ABUSE",
  "riskBand": "HIGH",
  "acceptedAt": "2026-07-02T09:15:22.123Z",
  "acceptedBy": "officer-123",
  "acceptanceReasonCode": "VALID_JURISDICTION",
  "nextExpectedAction": "START_INVESTIGATION"
}

Kenapa field ini ada?

FieldAlasan
caseIdentity identity
caseTyperouting dan reporting
riskBandSLA dan prioritization
acceptedAttimeline dan SLA
acceptedByaudit/projection
acceptanceReasonCodeexplainability
nextExpectedActionconsumer orchestration helper, bukan command wajib

nextExpectedAction perlu hati-hati. Ia bukan instruksi. Ia metadata domain yang menyatakan ekspektasi lifecycle. Consumer tetap harus punya aturan sendiri.


7. AsyncAPI Document Layout

AsyncAPI memberi cara machine-readable untuk mendokumentasikan API event-driven: server, channel, operation, message, schema, security, dan metadata.

Untuk repository seri ini:

case-contract-asyncapi/
└── src/main/asyncapi/
    ├── case-events.yaml
    ├── components/
    │   ├── messages.yaml
    │   ├── schemas.yaml
    │   ├── security.yaml
    │   └── traits.yaml
    └── examples/
        ├── case-submitted.json
        ├── case-accepted.json
        └── sla-breached.json

Kita gunakan AsyncAPI 3.x style: channel mendeskripsikan address/transport binding, operation mendeskripsikan perilaku send/receive, message mendeskripsikan payload.

Skeleton:

asyncapi: 3.0.0
info:
  title: Regulatory Enforcement Case Events
  version: 1.0.0
  description: Event contracts for case lifecycle, SLA, assignment, and audit streams.

servers:
  productionKafka:
    host: kafka.prod.internal:9092
    protocol: kafka
    description: Production Kafka cluster
  stagingKafka:
    host: kafka.staging.internal:9092
    protocol: kafka
    description: Staging Kafka cluster

channels:
  caseLifecycleEvents:
    address: enforcement.case.lifecycle.public.v1
    messages:
      CaseSubmitted:
        $ref: '#/components/messages/CaseSubmitted'
      CaseAccepted:
        $ref: '#/components/messages/CaseAccepted'
      CaseRejected:
        $ref: '#/components/messages/CaseRejected'

operations:
  publishCaseLifecycleEvents:
    action: send
    channel:
      $ref: '#/channels/caseLifecycleEvents'
    messages:
      - $ref: '#/channels/caseLifecycleEvents/messages/CaseSubmitted'
      - $ref: '#/channels/caseLifecycleEvents/messages/CaseAccepted'
      - $ref: '#/channels/caseLifecycleEvents/messages/CaseRejected'

8. AsyncAPI Message Contract

Kita definisikan envelope reusable.

components:
  schemas:
    EventEnvelope:
      type: object
      required:
        - eventId
        - eventType
        - eventVersion
        - occurredAt
        - producer
        - tenantId
        - subject
        - correlationId
        - partitionKey
        - data
      properties:
        eventId:
          type: string
          format: uuid
        eventType:
          type: string
        eventVersion:
          type: integer
          minimum: 1
        occurredAt:
          type: string
          format: date-time
        publishedAt:
          type: string
          format: date-time
        producer:
          type: string
        tenantId:
          type: string
        subject:
          type: string
          examples:
            - case/CASE-2026-000001
        correlationId:
          type: string
        causationId:
          type: string
        partitionKey:
          type: string
        schemaRef:
          type: string
        dataClassification:
          type: string
          enum:
            - PUBLIC_TO_AUTHORIZED_SYSTEMS
            - INTERNAL
            - RESTRICTED
        data:
          type: object

Untuk message tertentu, kita bisa memakai allOf agar envelope tetap konsisten.

components:
  messages:
    CaseAccepted:
      name: CaseAccepted
      title: Case Accepted
      summary: Published when a submitted case is accepted for enforcement processing.
      contentType: application/json
      payload:
        allOf:
          - $ref: '#/components/schemas/EventEnvelope'
          - type: object
            properties:
              eventType:
                const: CaseAccepted
              eventVersion:
                const: 1
              data:
                $ref: '#/components/schemas/CaseAcceptedData'

Payload data:

components:
  schemas:
    CaseAcceptedData:
      type: object
      additionalProperties: false
      required:
        - caseId
        - caseType
        - riskBand
        - acceptedAt
        - acceptedBy
        - acceptanceReasonCode
      properties:
        caseId:
          type: string
          pattern: '^CASE-[0-9]{4}-[0-9]{6}$'
        caseType:
          type: string
          enum:
            - MARKET_ABUSE
            - DISCLOSURE_FAILURE
            - LICENSING_BREACH
            - CONDUCT_RISK
        riskBand:
          type: string
          enum:
            - LOW
            - MEDIUM
            - HIGH
            - CRITICAL
        acceptedAt:
          type: string
          format: date-time
        acceptedBy:
          type: string
        acceptanceReasonCode:
          type: string
        nextExpectedAction:
          type: string
          enum:
            - START_INVESTIGATION
            - WAIT_FOR_REVIEW

8.1 additionalProperties: false — Dipakai dengan Sadar

Untuk event contract, additionalProperties: false membuat schema ketat. Ini baik untuk producer discipline, tetapi bisa mengganggu forward compatibility jika consumer gagal saat melihat field baru.

Strategi yang lebih aman:

  • schema producer boleh ketat;
  • consumer harus toleran terhadap field tambahan;
  • contract test memastikan field wajib tidak hilang;
  • field baru optional dulu;
  • field lama deprecated dulu, tidak langsung dihapus.

Kita akan memakai prinsip:

Producer strict. Consumer tolerant.

9. Message Headers vs Payload Metadata

Kafka punya record headers. Jangan semua metadata dimasukkan hanya ke payload.

Field yang sering berguna di header:

  • eventId;
  • eventType;
  • eventVersion;
  • correlationId;
  • tenantId;
  • schemaRef;
  • dataClassification.

Kenapa?

Consumer bisa melakukan routing ringan tanpa deserialize seluruh payload.

Namun metadata penting tetap disimpan di payload envelope juga agar:

  • event tetap self-contained saat diekspor;
  • replay dari object storage tetap lengkap;
  • audit/debug tidak tergantung header preservation;
  • DLQ payload tetap bisa dipahami.

Rule:

Header boleh menduplikasi metadata envelope untuk operational routing, tetapi payload tetap sumber kebenaran kontrak event.

Contoh producer header:

record.headers().add("eventType", "CaseAccepted".getBytes(StandardCharsets.UTF_8));
record.headers().add("eventId", eventId.toString().getBytes(StandardCharsets.UTF_8));
record.headers().add("correlationId", correlationId.getBytes(StandardCharsets.UTF_8));
record.headers().add("schemaRef", schemaRef.getBytes(StandardCharsets.UTF_8));

10. Event Versioning Strategy

Ada tiga level versioning:

  1. topic major version;
  2. event type version;
  3. schema revision.

10.1 Topic Major Version

Topic major version berubah jika compatibility rusak secara besar.

enforcement.case.lifecycle.public.v1
enforcement.case.lifecycle.public.v2

Jangan membuat topic baru untuk perubahan kecil optional field.

10.2 Event Type Version

eventVersion berubah ketika bentuk message untuk event type berubah.

{
  "eventType": "CaseAccepted",
  "eventVersion": 1
}

Jika CaseAccepted versi 2 menambah optional field, consumer lama harus tetap bisa jalan.

Jika versi 2 mengubah makna field, itu breaking change.

10.3 Schema Revision

Schema revision adalah detail registry/governance. Misalnya:

asyncapi://enforcement.case.lifecycle.public.v1/CaseAccepted/1.3.0

Kita tidak perlu topic baru untuk setiap schema revision.

10.4 Compatibility Rules

Perubahan yang biasanya aman:

  • menambah optional field;
  • menambah enum hanya jika consumer tolerant;
  • memperluas string pattern secara compatible;
  • menambah event type baru di topic yang sama jika consumer mengabaikan event yang tidak dikenal.

Perubahan yang berbahaya:

  • menghapus field wajib;
  • mengganti tipe field;
  • mengganti makna field;
  • mengganti partition key;
  • mengganti semantic ordering;
  • mengganti timezone meaning;
  • mengganti unit angka;
  • menambah enum saat consumer memakai exhaustive switch tanpa fallback.

Contoh Java bug:

switch (riskBand) {
    case "LOW" -> handleLow();
    case "MEDIUM" -> handleMedium();
    case "HIGH" -> handleHigh();
    case "CRITICAL" -> handleCritical();
    default -> throw new IllegalArgumentException("Unknown risk band: " + riskBand);
}

Untuk consumer projection, unknown enum seharusnya tidak selalu fatal. Ia bisa masuk quarantine atau fallback:

RiskBand parsed = RiskBand.fromWireValue(value)
    .orElse(RiskBand.UNKNOWN);

11. Event Naming: Nama adalah Kontrak Semantik

Nama event harus jelas dan lampau.

BurukLebih baikAlasan
CaseStatusChangedCaseAccepted, CaseRejected, CaseClosedstatus changed terlalu generik
ProcessCaseCaseSubmitted atau command SubmitCasecommand/event campur
InvestigationUpdateInvestigationStarted, InvestigationSuspendedupdate tidak menjelaskan fakta
UserTaskEventInvestigationTaskAssignedterlalu teknis Camunda
CaseEventV2topic/event version terpisahnama event jangan jadi tempat version chaos

Rule:

Event name harus membuat consumer bisa menebak invariants yang berubah tanpa membaca source code producer.


12. Event Granularity: Jangan Terlalu Halus, Jangan Terlalu Kasar

Jika event terlalu halus:

CaseRiskScoreUpdated
CaseRiskBandUpdated
CasePriorityUpdated
CaseQueueUpdated

Consumer harus menggabungkan banyak event untuk tahu satu keputusan.

Jika event terlalu kasar:

CaseUpdated

Consumer tidak tahu makna perubahan.

Gunakan event berdasarkan business fact.

Contoh:

CaseRiskAssessmentCompleted

Event ini boleh membawa score, band, priority, dan reason code karena semuanya bagian dari satu assessment decision.

12.1 Decision Heuristic

Buat event baru jika:

  • ada consumer yang bereaksi berbeda terhadap fakta itu;
  • fakta itu punya waktu kejadian sendiri;
  • fakta itu punya audit significance;
  • fakta itu punya authorization/privacy berbeda;
  • fakta itu bisa gagal/diulang secara terpisah;
  • fakta itu mengubah lifecycle invariant.

Jangan buat event baru jika:

  • hanya field teknis berubah;
  • tidak ada consumer nyata;
  • hanya untuk mempermudah satu query internal;
  • event akan selalu dipakai bersama event lain secara atomik.

13. Consumer Contract: Consumer Juga Punya Kewajiban

Contract-first sering hanya fokus pada producer. Itu kurang.

Consumer juga wajib mematuhi kontrak:

  • harus idempotent;
  • harus toleran terhadap duplicate delivery;
  • harus tidak mengandalkan global ordering;
  • harus mengabaikan event type yang tidak relevan;
  • harus menyimpan offset hanya setelah side effect aman;
  • harus punya poison message strategy;
  • harus punya replay mode;
  • harus tidak call balik producer secara masif saat replay;
  • harus menjaga correlation id;
  • harus tidak mengubah fakta event.

13.1 Consumer Declaration

Untuk consumer penting, kita dokumentasikan consumer contract.

x-consumers:
  - name: sla-projection-service
    purpose: Build SLA deadlines and breach projections for accepted cases.
    consumes:
      - CaseAccepted
      - InvestigationStarted
      - SlaPaused
      - SlaResumed
    idempotencyKey: eventId
    orderingAssumption: per caseId only
    replaySupported: true
    sideEffects:
      - writes PostgreSQL projection table
    failurePolicy:
      retry: exponential-backoff
      dlq: enforcement.case.lifecycle.public.v1.sla-projection.dlq

Ini extension field, bukan standar wajib. Tetapi secara governance, field seperti ini sangat berguna.

13.2 Consumer Offset and Side Effect

Consumer projection harus berpikir seperti ini:

read event
validate envelope
check inbox/event_processed table
apply side effect in local transaction
record eventId as processed
commit transaction
commit Kafka offset

Jika offset commit sebelum side effect, event bisa hilang secara logis.

Jika side effect commit sebelum offset, event bisa dibaca lagi. Karena itu handler harus idempotent.


14. Replay Safety

Replay bukan fitur nice-to-have. Replay adalah alat recovery.

Replay dipakai saat:

  • consumer projection corrupt;
  • bug handler diperbaiki;
  • service baru butuh bootstrap data;
  • audit reconstruction;
  • migration projection;
  • laporan regulator perlu dihitung ulang.

Agar replay aman, event harus:

  • immutable;
  • self-contained untuk kebutuhan consumer;
  • punya eventId stabil;
  • punya occurredAt stabil;
  • punya schema version;
  • tidak tergantung API callback untuk data historis mutable;
  • consumer side effect bisa dibedakan antara replay dan live mode jika diperlukan.

14.1 Live Mode vs Replay Mode

Consumer notifikasi tidak boleh mengirim email ulang saat replay.

Maka consumer side effect harus punya mode:

ConsumerLive modeReplay mode
SLA projectionupdate projectionrebuild projection
Search indexupdate indexrebuild index
Notificationsend notificationskip or write dry-run log
Audit projectionappend/checkreconstruct
Camunda correlationcorrelate processusually disabled unless controlled migration

Replay tidak selalu berarti semua consumer aktif.

Rule:

Consumer dengan external side effect harus replay-aware.


15. DLQ dan Poison Message Contract

DLQ bukan tempat sampah yang dilupakan. DLQ adalah kontrak operasional.

Poison message terjadi ketika event valid secara Kafka, tetapi consumer tidak bisa memprosesnya karena:

  • schema tidak dikenal;
  • field mandatory hilang;
  • referential data tidak ada;
  • bug handler;
  • data semantik tidak valid;
  • dependency downstream selalu gagal;
  • side effect ditolak permanen.

DLQ message harus membawa:

  • original topic;
  • original partition;
  • original offset;
  • original key;
  • original headers;
  • original payload;
  • consumer name;
  • failure category;
  • error message ringkas;
  • failedAt;
  • correlationId;
  • retry count.

Contoh DLQ envelope:

{
  "dlqId": "018f9f77-8a39-7c18-b0b9-9e0d60a1f999",
  "consumer": "sla-projection-service",
  "failedAt": "2026-07-02T09:20:00Z",
  "failureCategory": "SCHEMA_VALIDATION_FAILED",
  "errorCode": "UNKNOWN_RISK_BAND",
  "originalTopic": "enforcement.case.lifecycle.public.v1",
  "originalPartition": 3,
  "originalOffset": 881992,
  "originalKey": "CASE-2026-000001",
  "correlationId": "corr-01J1V7W1AZ5Q5Q7F1F7WQGZ0PE",
  "payload": {
    "eventId": "018f9f53-8a39-7c18-b0b9-9e0d60a1f2a1"
  }
}

15.1 Retry vs DLQ Decision

FailureRetry?DLQ?Contoh
network timeoutyasetelah batasDB temporarily unavailable
schema invalidtidak banyakyarequired field missing
unknown enumtergantung policyquarantine/DLQnew enum not supported
duplicate eventtidaktidakalready processed
downstream 429ya dengan backoffsetelah batasnotification provider rate limit
authorization config missingtidak cepatya + alertconsumer lacks permission

Rule:

Retry untuk failure sementara. DLQ untuk failure yang butuh perubahan data, konfigurasi, atau kode.


16. Correlation: HTTP ke Kafka ke Camunda

Contract-first harus menjaga trace semantic lintas protokol.

Saat client submit case:

POST /cases
X-Correlation-Id: corr-123
Idempotency-Key: idem-456

Sistem menyimpan:

  • command id;
  • idempotency key;
  • correlation id;
  • created case id;
  • outbox event id;
  • Camunda business key.

Event membawa correlation id yang sama:

{
  "eventId": "evt-789",
  "eventType": "CaseSubmitted",
  "correlationId": "corr-123",
  "causationId": "cmd-submit-case-id",
  "subject": "case/CASE-2026-000001"
}

Camunda process menggunakan businessKey = caseId dan menyimpan correlationId sebagai process variable atau context metadata.

16.1 correlationId vs causationId

FieldPertanyaan yang dijawab
correlationIdoperasi besar apa yang menghubungkan semua ini?
causationIdevent/command apa yang langsung menyebabkan event ini?

Contoh chain:

HTTP SubmitCase command
  correlationId = corr-123
  commandId = cmd-1

CaseSubmitted event
  correlationId = corr-123
  causationId = cmd-1
  eventId = evt-1

CaseAccepted event
  correlationId = corr-123
  causationId = evt-1 or cmd-accept-2
  eventId = evt-2

Tanpa causation id, debugging chain panjang menjadi kabur.


17. Java Representation: Jangan Bocorkan Transport ke Domain

Event payload perlu class Java. Tetapi jangan jadikan generated DTO sebagai domain object.

Layering:

case-contract-events-generated
  Generated DTO from AsyncAPI/JSON Schema

case-event-model
  Internal event envelope/value object

case-command-service
  Domain service creates event model
  Mapper converts to generated DTO for publishing

Contoh domain event internal:

public record CaseAcceptedEvent(
        EventId eventId,
        CaseId caseId,
        CaseType caseType,
        RiskBand riskBand,
        Instant acceptedAt,
        OfficerId acceptedBy,
        ReasonCode acceptanceReasonCode,
        CorrelationId correlationId,
        CausationId causationId
) implements DomainEvent {

    @Override
    public String eventType() {
        return "CaseAccepted";
    }

    @Override
    public String partitionKey() {
        return caseId.value();
    }
}

Generated DTO mapping:

public final class CaseAcceptedEventMapper {

    public CaseAcceptedMessage toWire(CaseAcceptedEvent event) {
        CaseAcceptedData data = new CaseAcceptedData()
                .caseId(event.caseId().value())
                .caseType(event.caseType().wireValue())
                .riskBand(event.riskBand().wireValue())
                .acceptedAt(event.acceptedAt())
                .acceptedBy(event.acceptedBy().value())
                .acceptanceReasonCode(event.acceptanceReasonCode().value());

        return new CaseAcceptedMessage()
                .eventId(event.eventId().value())
                .eventType("CaseAccepted")
                .eventVersion(1)
                .occurredAt(event.acceptedAt())
                .producer("case-command-service")
                .subject("case/" + event.caseId().value())
                .correlationId(event.correlationId().value())
                .causationId(event.causationId().value())
                .partitionKey(event.partitionKey())
                .data(data);
    }
}

Mapper terlihat membosankan. Justru bagus. Boundary eksplisit membuat perubahan contract tidak menyebar ke domain core.


18. Event Validation Pipeline

Producer harus validate event sebelum outbox insert atau sebelum publish?

Jawaban production-grade:

  • validasi domain dilakukan sebelum state transition;
  • validasi wire contract dilakukan sebelum outbox insert;
  • validasi publish safety dilakukan sebelum Kafka send;
  • consumer validasi envelope sebelum handler.

Pipeline:

Kenapa validasi sebelum outbox insert?

Karena outbox row adalah janji untuk publish. Jika payload invalid masuk outbox, publisher akan gagal berulang dan menciptakan operational noise.

Tetapi publisher tetap validate ringan, karena:

  • data bisa berasal dari versi service lama;
  • outbox bisa di-repair manual;
  • bug migration bisa menghasilkan payload buruk;
  • defense in depth.

19. Contract Testing untuk Event

Event contract testing harus menjawab:

  • producer menghasilkan event valid;
  • consumer bisa membaca sample event lama;
  • consumer toleran field baru;
  • breaking change terdeteksi sebelum release;
  • partition key tidak berubah tanpa review;
  • event examples tetap sinkron dengan schema.

19.1 Producer Contract Test

@Test
void caseAcceptedEventMustMatchContract() {
    CaseAcceptedEvent event = fixture.caseAcceptedEvent();
    CaseAcceptedMessage message = mapper.toWire(event);

    JsonNode json = objectMapper.valueToTree(message);

    assertThat(schemaValidator.validate("CaseAccepted", json))
            .isEmpty();

    assertThat(message.getPartitionKey())
            .isEqualTo(message.getData().getCaseId());
}

19.2 Consumer Compatibility Test

Consumer harus diuji dengan fixture historis.

src/test/resources/events/enforcement.case.lifecycle.public.v1/
├── CaseSubmitted.v1.valid.json
├── CaseAccepted.v1.valid.json
├── CaseAccepted.v1.with-extra-field.json
├── CaseRejected.v1.valid.json
└── CaseAccepted.v1.unknown-risk-band.json

Test:

@Test
void consumerMustIgnoreUnknownOptionalFields() {
    EventEnvelope envelope = loadFixture("CaseAccepted.v1.with-extra-field.json");

    assertThatCode(() -> consumer.handle(envelope))
            .doesNotThrowAnyException();
}

19.3 Partition Key Regression Test

@Test
void caseLifecyclePartitionKeyMustBeCaseId() {
    CaseAcceptedMessage message = fixture.caseAcceptedMessage();

    ProducerRecord<String, byte[]> record = producerRecordFactory.toRecord(message);

    assertThat(record.topic())
            .isEqualTo("enforcement.case.lifecycle.public.v1");
    assertThat(record.key())
            .isEqualTo(message.getData().getCaseId());
}

Partition key adalah contract. Jangan biarkan refactor mengubahnya diam-diam.


20. Security and Privacy in Event Contracts

Event menyebar lebih luas daripada database row. Begitu data masuk Kafka, banyak consumer bisa punya copy.

Karena itu event contract harus punya privacy stance.

20.1 Data Classification

ClassificationBoleh berisiConsumer
PUBLIC_TO_AUTHORIZED_SYSTEMSdata bisnis non-rahasia untuk sistem terotorisasibanyak internal/external partner
INTERNALdata operasional internalservice internal
RESTRICTEDPII/sensitive/audit detailconsumer terbatas
SECRETcredential/keytidak boleh di event

Rule:

Credential, token, secret, raw password, private key, dan session identifier tidak boleh masuk event payload.

20.2 PII Minimization

Buruk:

{
  "caseId": "CASE-2026-000001",
  "subjectName": "Jane Doe",
  "subjectNationalId": "...",
  "subjectAddress": "...",
  "riskBand": "HIGH"
}

Lebih aman:

{
  "caseId": "CASE-2026-000001",
  "subjectRef": "party/PARTY-123456",
  "riskBand": "HIGH"
}

Consumer yang memang butuh PII harus melewati API/DB boundary dengan authorization, bukan mendapatkannya dari public lifecycle topic.


21. Event Contract Review Checklist

Sebelum event baru diterima, jawab ini:

Semantics

  • Fakta apa yang sudah terjadi?
  • Apa beda event ini dengan event yang sudah ada?
  • Apakah nama event bentuk past tense?
  • Apa invariant yang berubah?
  • Apakah event ini domain, integration, process, notification, audit, atau operational event?

Topic and Ordering

  • Topic mana?
  • Mengapa topic itu benar?
  • Partition key apa?
  • Ordering apa yang dijamin?
  • Apakah ada risiko hot partition?

Payload

  • Field mana yang wajib?
  • Field mana yang optional?
  • Apakah payload self-contained untuk consumer utama?
  • Apakah ada PII atau data sensitif?
  • Apakah occurredAt jelas?
  • Apakah timezone dan unit eksplisit?

Compatibility

  • Apa compatibility policy?
  • Apakah consumer lama akan gagal?
  • Apakah enum baru aman?
  • Apakah field lama deprecated dulu?
  • Apakah ada fixture historis?

Operations

  • Apakah event replayable?
  • Apakah consumer idempotent?
  • Apakah ada DLQ policy?
  • Apakah observability metadata lengkap?
  • Apakah runbook recovery tersedia?

22. Anti-Patterns

22.1 Event sebagai Database Row Dump

Jika event sama dengan row database, maka schema internal menjadi public contract. Perubahan tabel menjadi breaking change integrasi.

22.2 statusChanged Everywhere

StatusChanged memaksa consumer memahami transition matrix sendiri. Lebih baik publish business fact spesifik.

22.3 Partition Key Berdasarkan Tenant Tanpa Analisis

Tenant key sering terlihat natural, tetapi bisa menciptakan hot partition dan ordering yang terlalu luas.

22.4 Consumer Call Back ke API untuk Semua Detail

Ini mengubah async system menjadi sync distributed call graph saat load/replay.

22.5 Event Tanpa eventId

Tanpa event id, idempotency consumer menjadi tebakan.

22.6 DLQ Tanpa Ownership

DLQ tanpa owner dan replay tool hanyalah kuburan message.

22.7 Breaking Change dengan Nama Event Sama

Mengubah makna field tanpa major version adalah salah satu bentuk integrasi paling mahal.


23. Production Failure Model

FailureDampakMitigasi desain
Kafka unavailableevent tidak publishoutbox menahan event sampai Kafka pulih
duplicate publishconsumer memproses dua kalieventId + idempotent consumer
consumer crash after DB commit before offset commitevent dibaca ulanginbox/processed_event table
bad payload in outboxpublisher stuckpre-outbox validation + quarantine
schema incompatibleconsumer gagalcompatibility test + DLQ + tolerant reader
hot partitionlag tinggi untuk key tertentupartition key review + topic split jika perlu
replay triggers email againuser menerima notifikasi duplikatreplay mode + side effect guard
event includes PII accidentallydata leakagecontract review + classification + schema lint
event order misunderstoodprojection salahdocument ordering boundary per partition

24. Minimal Contract Set untuk Seri Ini

Kita akan memakai event-event berikut sebagai baseline:

enforcement.case.lifecycle.public.v1
  CaseSubmitted
  CaseAccepted
  CaseRejected
  InvestigationStarted
  CaseDecisionRecorded
  CaseClosed

enforcement.case.assignment.internal.v1
  CaseAssigned
  CaseReassigned
  CaseAssignmentReleased

enforcement.case.sla.internal.v1
  SlaStarted
  SlaPaused
  SlaResumed
  SlaBreached

enforcement.case.audit.restricted.v1
  CaseAuditEntryAppended

enforcement.case.notification.internal.v1
  NotificationRequested
  NotificationDelivered
  NotificationFailed

Kita tidak akan membuat ratusan event. Kita mulai dari event yang punya value arsitektural.


25. Contoh AsyncAPI Lebih Lengkap

asyncapi: 3.0.0
info:
  title: Regulatory Enforcement Case Events
  version: 1.0.0
  description: Event contracts for regulatory case lifecycle.

defaultContentType: application/json

servers:
  productionKafka:
    host: kafka.prod.internal:9092
    protocol: kafka

channels:
  caseLifecycle:
    address: enforcement.case.lifecycle.public.v1
    messages:
      CaseAccepted:
        $ref: '#/components/messages/CaseAccepted'

operations:
  caseCommandServicePublishesLifecycle:
    action: send
    channel:
      $ref: '#/channels/caseLifecycle'
    messages:
      - $ref: '#/channels/caseLifecycle/messages/CaseAccepted'

components:
  messages:
    CaseAccepted:
      name: CaseAccepted
      title: Case Accepted
      contentType: application/json
      headers:
        type: object
        properties:
          eventType:
            type: string
            const: CaseAccepted
          correlationId:
            type: string
          schemaRef:
            type: string
      payload:
        $ref: '#/components/schemas/CaseAcceptedEnvelope'

  schemas:
    CaseAcceptedEnvelope:
      type: object
      additionalProperties: false
      required:
        - eventId
        - eventType
        - eventVersion
        - occurredAt
        - producer
        - tenantId
        - subject
        - correlationId
        - partitionKey
        - dataClassification
        - data
      properties:
        eventId:
          type: string
          format: uuid
        eventType:
          type: string
          const: CaseAccepted
        eventVersion:
          type: integer
          const: 1
        occurredAt:
          type: string
          format: date-time
        publishedAt:
          type: string
          format: date-time
        producer:
          type: string
          const: case-command-service
        tenantId:
          type: string
        subject:
          type: string
        correlationId:
          type: string
        causationId:
          type: string
        partitionKey:
          type: string
        schemaRef:
          type: string
        dataClassification:
          type: string
          const: PUBLIC_TO_AUTHORIZED_SYSTEMS
        data:
          $ref: '#/components/schemas/CaseAcceptedData'

    CaseAcceptedData:
      type: object
      additionalProperties: false
      required:
        - caseId
        - caseType
        - riskBand
        - acceptedAt
        - acceptedBy
        - acceptanceReasonCode
      properties:
        caseId:
          type: string
          pattern: '^CASE-[0-9]{4}-[0-9]{6}$'
        caseType:
          type: string
          enum:
            - MARKET_ABUSE
            - DISCLOSURE_FAILURE
            - LICENSING_BREACH
            - CONDUCT_RISK
        riskBand:
          type: string
          enum:
            - LOW
            - MEDIUM
            - HIGH
            - CRITICAL
        acceptedAt:
          type: string
          format: date-time
        acceptedBy:
          type: string
        acceptanceReasonCode:
          type: string
        nextExpectedAction:
          type: string
          enum:
            - START_INVESTIGATION
            - WAIT_FOR_REVIEW

26. How This Connects to Later Parts

Part ini adalah fondasi untuk beberapa bagian berikutnya:

  • Part 008 memakai event contract untuk mendesain database table outbox_event dan audit schema.
  • Part 026-030 memakai event untuk Camunda correlation dan process incident handling.
  • Part 031-034 memperdalam Kafka topic, outbox, inbox, consumer idempotency, dan process integration.
  • Part 038 memakai envelope metadata untuk observability.
  • Part 039 memakai failure model untuk drill produksi.

Jangan lanjut ke implementasi Kafka producer sebelum kontrak ini stabil. Producer tanpa contract hanya mempercepat kekacauan.


27. Production Checklist

Sebelum event contract dianggap siap:

  • event names memakai business fact, bukan command;
  • topic name mengikuti convention;
  • partition key tertulis jelas;
  • ordering guarantee tertulis eksplisit;
  • envelope punya eventId, eventType, eventVersion, occurredAt, correlationId, causationId, partitionKey;
  • data classification jelas;
  • PII diminimalkan;
  • schema punya required/optional distinction;
  • examples valid terhadap schema;
  • producer contract test ada;
  • consumer compatibility fixture ada;
  • DLQ policy ada;
  • replay behavior consumer tertulis;
  • versioning policy tertulis;
  • owner topic dan owner event jelas;
  • breaking change procedure jelas.

28. Sumber Primer dan Rujukan


29. Ringkasan

Event contract adalah public memory dari sistem. Sekali consumer bergantung pada event, event itu menjadi API.

Desain production-grade harus menjawab:

  • event ini fakta apa?
  • siapa owner-nya?
  • topic mana?
  • key apa?
  • ordering apa?
  • metadata apa?
  • schema version apa?
  • consumer boleh berasumsi apa?
  • replay aman atau tidak?
  • duplicate ditangani bagaimana?
  • data sensitif bocor atau tidak?

Di part berikutnya, kita masuk ke sisi database: bagaimana PostgreSQL menjadi kontrak internal yang menjaga invariant, bukan sekadar storage pasif.

Lesson Recap

You just completed lesson 07 in start here. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.

Continue The Track

Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.