Start HereOrdered learning track

Pipeline Invariants

Learn Java Data Pipeline Pattern - Part 003

Invariant inti data pipeline production-grade: completeness, ordering, freshness, idempotency, replayability, determinism, bounded side effects, dan auditability. Membahas cara berpikir, desain Java, failure mode, dan review checklist.

19 min read3613 words
PrevNext
Lesson 0384 lesson track01–15 Start Here
#java#data-pipeline#data-engineering#distributed-systems+3 more

Part 003 — Pipeline Invariants

Pipeline yang reliable bukan pipeline yang “tidak pernah error”. Pipeline yang reliable adalah pipeline yang punya invariant eksplisit: aturan kebenaran yang tetap dijaga walaupun data terlambat, source berubah, worker restart, job di-retry, sink timeout, atau operator manusia melakukan backfill.

Bagian ini adalah fondasi desain. Kita tidak akan mulai dari tool, karena Kafka, Flink, Spark, Beam, Debezium, Airflow, Temporal, database, dan object storage semuanya hanya cara implementasi. Yang lebih fundamental adalah pertanyaan:

Dalam kondisi gagal, apa yang tetap harus benar?

Itulah invariant.

Di production-grade data pipeline, invariant lebih penting daripada “best practice”. Best practice sering berubah tergantung stack. Invariant adalah kontrak kebenaran yang bisa kamu bawa lintas stack.


1. Mengapa Invariant Lebih Penting daripada Pattern

Pattern menjawab:

Biasanya orang membangun pipeline seperti apa?

Invariant menjawab:

Properti apa yang tidak boleh rusak?

Contoh sederhana:

Source DB -> Kafka -> Stream Processor -> Reporting Table

Pattern yang terlihat:

  • CDC ingestion
  • Kafka topic
  • stream processing
  • materialized view
  • reporting sink

Namun invariant yang sebenarnya menentukan kualitas sistem:

  • setiap perubahan penting dari source harus masuk ke Kafka;
  • event untuk entity yang sama harus diproses dalam urutan yang dapat dijelaskan;
  • duplicate event tidak boleh menggandakan saldo, hitungan, atau status;
  • transformasi harus bisa diulang tanpa hasil yang berubah secara liar;
  • output harus punya jejak lineage ke input;
  • jika pipeline gagal di tengah, recovery tidak boleh menghasilkan state yang mustahil;
  • jika source mengirim koreksi, hasil downstream harus punya strategi koreksi.

Top engineer tidak hanya berkata, “pakai Kafka biar reliable”. Ia bertanya:

Reliable terhadap invariant yang mana?

Kafka menyimpan event secara durable dan memberi model offset. Flink memberi stateful stream processing dan checkpoint. Beam memberi model bounded/unbounded dan event-time. Debezium membaca perubahan database. Tetapi tidak ada tool yang otomatis menjawab seluruh invariant bisnismu.


2. Apa Itu Pipeline Invariant

Dalam konteks pipeline:

Invariant adalah pernyataan kebenaran yang harus tetap berlaku pada seluruh lifecycle data, termasuk saat retry, replay, partial failure, late arrival, schema evolution, dan operasi manual.

Invariant harus bisa diuji, diobservasi, atau minimal diaudit. Kalau tidak bisa, ia masih berupa harapan.

Contoh invariant buruk:

Pipeline harus cepat.

Terlalu kabur.

Contoh invariant yang lebih baik:

Untuk setiap approved_case_event yang diterima sebelum watermark T,
reporting.case_status_current harus berisi status final terbaru untuk case_id tersebut
maksimal 5 menit setelah event diterima, kecuali event masuk quarantine karena gagal validasi kontrak.

Invariant ini punya beberapa dimensi:

  • input yang dimaksud jelas;
  • batas waktu jelas;
  • output jelas;
  • pengecualian jelas;
  • bisa dimonitor;
  • bisa diuji lewat replay.

3. Daftar Invariant Inti Pipeline

Seri ini memakai delapan invariant utama.

Kedelapannya:

  1. Completeness — data yang seharusnya diproses benar-benar sampai dan tercakup.
  2. Ordering — urutan yang penting bagi makna bisnis tidak rusak diam-diam.
  3. Freshness — output cukup baru untuk use case-nya.
  4. Idempotency — retry/duplicate/replay tidak menghasilkan efek ganda yang salah.
  5. Replayability — pipeline bisa diulang untuk recovery, backfill, migrasi, atau audit.
  6. Determinism — input dan versi logic yang sama menghasilkan output yang sama.
  7. Bounded side effects — efek eksternal dikendalikan agar partial failure tidak merusak state.
  8. Auditability — output bisa dijelaskan: dari mana asalnya, versi apa yang dipakai, kapan berubah.

Kedelapan invariant ini saling terkait. Contoh: replayability tanpa idempotency bisa merusak sink; freshness tanpa completeness bisa menghasilkan angka cepat tetapi salah; ordering tanpa watermark bisa salah saat late events.


4. Invariant #1 — Completeness

4.1 Makna Completeness

Completeness berarti:

Semua input yang berada dalam scope pipeline tercakup dalam output atau punya status terminal yang eksplisit.

Status terminal bisa berupa:

  • successfully processed;
  • skipped by rule;
  • quarantined;
  • dead-lettered;
  • superseded;
  • ignored because out of scope;
  • failed after retry budget exhausted.

Yang tidak boleh terjadi adalah silent loss.

Silent loss adalah kondisi paling berbahaya karena sistem tampak sehat, tetapi data hilang.

4.2 Contoh Completeness Invariant

Untuk pipeline ingestion file:

Setiap file yang muncul di landing bucket dengan manifest valid harus berada pada salah satu status:
RECEIVED, VALIDATED, PROCESSED, QUARANTINED, atau FAILED_TERMINAL.
Tidak boleh ada file yang tidak tercatat di ingestion ledger lebih dari 2 menit setelah manifest diterima.

Untuk CDC pipeline:

Setiap committed transaction dari source database yang mengubah tabel enforcement_case harus muncul sebagai event downstream atau tercatat sebagai skipped berdasarkan filter eksplisit.

Untuk API ingestion:

Untuk setiap cursor window yang berhasil di-fetch, seluruh item di halaman tersebut harus dipersist sebagai raw record sebelum transformasi downstream dimulai.

4.3 Completeness vs Success Count

Banyak tim mengira completeness berarti jumlah sukses tinggi. Itu keliru.

processed_success = 99.9%

Angka ini tidak menjawab:

  • 0.1% yang gagal itu data apa?
  • apakah ada source partition yang tidak terbaca?
  • apakah ada page API yang ter-skip?
  • apakah ada Kafka consumer group yang tertinggal?
  • apakah ada file yang tidak terdeteksi?
  • apakah output sink punya jumlah yang sesuai?

Completeness harus dilihat dari coverage.

Completeness = scoped input - explicitly terminal records = 0 unknown records

Yang dikejar bukan “semua sukses”, tetapi “tidak ada unknown”.

4.4 Java Design: Processing Ledger

Untuk pipeline penting, buat ledger.

public enum ProcessingStatus {
    RECEIVED,
    VALIDATED,
    PROCESSING,
    PROCESSED,
    QUARANTINED,
    FAILED_RETRYABLE,
    FAILED_TERMINAL,
    SKIPPED_OUT_OF_SCOPE
}

public record ProcessingLedgerEntry(
        String pipelineName,
        String runId,
        String sourceId,
        String recordKey,
        String inputFingerprint,
        ProcessingStatus status,
        int attempt,
        Instant firstSeenAt,
        Instant updatedAt,
        String errorCode,
        String errorMessage
) {}

Ledger bukan sekadar log. Log bersifat naratif. Ledger adalah state machine kecil untuk coverage.

Minimal ledger harus menjawab:

  • record/file/batch mana yang masuk scope;
  • status terakhirnya apa;
  • berapa kali dicoba;
  • error terminalnya apa;
  • transform version mana yang memprosesnya;
  • sink write id mana yang dibuat.

4.5 Failure Mode Completeness

FailureGejalaAkar MasalahMitigasi
File dibaca saat belum selesai ditulisrecord parse error acaktidak ada atomic publish protocolmanifest atau atomic rename
API page ter-skipjumlah downstream kurangcursor disimpan sebelum page dipersistpersist raw page dulu, baru advance cursor
Kafka commit terlalu cepatevent hilang setelah crashoffset commit sebelum sink durablecommit setelah sink acknowledged
Filter diam-diam salahdata dianggap tidak adarule tidak diauditexplicit skip ledger
CDC snapshot tidak konsistenrow hilang/duplikatsnapshot dan stream tidak disambung benarsnapshot+offset boundary yang jelas

5. Invariant #2 — Ordering

5.1 Makna Ordering

Ordering berarti:

Pipeline menjaga urutan yang diperlukan untuk mempertahankan makna data.

Tidak semua data butuh global ordering. Global ordering mahal dan sering tidak perlu. Yang penting adalah menemukan ordering scope.

Jenis ordering:

  1. Per-key ordering — event untuk case_id=123 diproses sesuai urutan.
  2. Causal ordering — event B yang bergantung pada event A tidak boleh diterapkan sebelum A.
  3. Event-time ordering — agregasi berdasarkan waktu kejadian, bukan waktu diterima.
  4. Transaction ordering — perubahan dalam transaksi source tetap konsisten.
  5. Correction ordering — koreksi historis punya aturan menang terhadap event lama.

5.2 Ordering Tidak Sama dengan Sorting

Sorting adalah operasi teknis. Ordering adalah invariant semantik.

Contoh:

CASE_CREATED at 10:00
CASE_ESCALATED at 10:05
CASE_CLOSED at 10:10

Jika pipeline memproses CASE_CLOSED sebelum CASE_CREATED, output bisa mustahil:

case_status = CLOSED
case_created_at = null

Masalahnya bukan hanya urutan array. Masalahnya adalah state domain menjadi tidak valid.

5.3 Java Design: Versioned Event Application

Untuk entity pipeline, hindari update buta.

Buruk:

caseProjection.setStatus(event.status());
repository.save(caseProjection);

Lebih baik:

public record CaseEvent(
        String eventId,
        String caseId,
        long aggregateVersion,
        Instant eventTime,
        CaseEventType type,
        JsonNode payload
) {}

public final class CaseProjectionUpdater {
    public CaseProjection apply(CaseProjection current, CaseEvent event) {
        if (event.aggregateVersion() <= current.lastAppliedVersion()) {
            return current; // duplicate or older event
        }

        if (event.aggregateVersion() != current.lastAppliedVersion() + 1) {
            throw new GapDetectedException(
                    current.caseId(),
                    current.lastAppliedVersion(),
                    event.aggregateVersion()
            );
        }

        return switch (event.type()) {
            case CASE_CREATED -> current.created(event);
            case CASE_ESCALATED -> current.escalated(event);
            case CASE_CLOSED -> current.closed(event);
            case CASE_REOPENED -> current.reopened(event);
        };
    }
}

Di sini ordering invariant dimodelkan sebagai aturan:

next_event.version == current.lastAppliedVersion + 1

Jika gap terjadi, jangan asal lanjut. Masukkan ke holding area atau trigger replay.

5.4 Partition Key dan Ordering

Di Kafka-style system, ordering biasanya dijamin per partition, bukan global. Maka pemilihan key sangat kritis.

Good key for case lifecycle: case_id
Bad key for case lifecycle: random UUID
Bad key for lifecycle order: event_type

Jika semua event untuk satu case harus urut, partition key harus stabil berdasarkan case_id.

Namun ada trade-off:

  • key terlalu sempit bisa menyebabkan hot partition;
  • key terlalu acak merusak order;
  • key berdasarkan event type memecah lifecycle entity;
  • key berdasarkan tenant bisa menjaga tenant order tapi bisa membuat satu tenant besar menjadi bottleneck.

5.5 Ordering Review Question

Pertanyaan review:

Urutan apa yang benar-benar diperlukan?

Bukan:

Apakah sistem menjamin ordering?

Karena jawaban yang benar hampir selalu:

Tergantung scope ordering-nya.

6. Invariant #3 — Freshness

6.1 Makna Freshness

Freshness berarti:

Output pipeline cukup baru dibandingkan kebutuhan bisnisnya.

Freshness bukan hanya latency teknis. Ada beberapa jam:

event_time       = kapan kejadian bisnis terjadi
ingestion_time   = kapan pipeline menerima data
processing_time  = kapan worker memproses data
publish_time     = kapan output tersedia
observed_time    = kapan user/system melihat output

Freshness yang baik harus menyebut jam mana yang dipakai.

Contoh buruk:

Pipeline harus real-time.

Contoh baik:

95% CASE_ESCALATED events harus tersedia di case_risk_dashboard
maksimal 60 detik sejak ingestion_time, dan 99% maksimal 5 menit,
kecuali source CDC lag melebihi 2 menit.

6.2 Freshness vs Completeness

Freshness sering trade-off dengan completeness.

Misal agregasi harian:

Total enforcement actions per region per day

Jika output diterbitkan jam 00:01, fresh tetapi mungkin belum complete karena ada late events. Jika menunggu 24 jam, complete tetapi stale.

Solusinya bukan memilih salah satu secara dogmatis. Solusinya mendefinisikan state output:

PRELIMINARY -> CORRECTED -> FINAL

atau:

as_of_watermark = 2026-07-04T10:00:00Z
completeness_confidence = 99.5%

6.3 Lag Metrics yang Perlu Dibedakan

MetricMaknaContoh
Source lagsource belum mengeluarkan perubahanDebezium tertinggal dari WAL
Broker lagconsumer belum membaca messageKafka consumer lag
Processing lagworker lambat memprosesqueue internal penuh
Watermark lagevent-time progress tertinggallate/out-of-order events
Sink lagoutput belum commitwarehouse write lambat
Visibility lagoutput commit tapi belum terlihat usercache/report refresh

Jika semua disebut “lag”, debugging akan lambat.

6.4 Java Design: Freshness Stamp

Setiap output penting sebaiknya punya metadata freshness.

public record OutputFreshness(
        Instant maxEventTimeIncluded,
        Instant maxIngestionTimeIncluded,
        Instant producedAt,
        Duration eventTimeLag,
        Duration ingestionLag,
        String sourceCursor,
        String pipelineRunId
) {}

Untuk reporting table:

CREATE TABLE report_case_status_current (
    case_id TEXT PRIMARY KEY,
    status TEXT NOT NULL,
    last_event_time TIMESTAMPTZ NOT NULL,
    last_event_id TEXT NOT NULL,
    pipeline_produced_at TIMESTAMPTZ NOT NULL,
    source_cursor TEXT NOT NULL,
    transform_version TEXT NOT NULL
);

Freshness tanpa metadata hanya klaim.


7. Invariant #4 — Idempotency

7.1 Makna Idempotency

Idempotency berarti:

Operasi yang sama dapat dijalankan lebih dari sekali tanpa mengubah hasil akhir secara salah.

Dalam pipeline, duplicate bukan edge case. Duplicate adalah normal consequence dari retry, rebalance, timeout, reconnect, replay, dan operator manual.

Jika desainmu tidak duplicate-safe, pipeline-mu belum production-grade.

7.2 Idempotency Scope

Idempotency harus didefinisikan per boundary:

BoundaryPertanyaan
Source readJika source dibaca ulang, apakah raw record dobel?
TransformJika transform dijalankan ulang, apakah output sama?
Sink writeJika write diulang, apakah row/event/side effect dobel?
NotificationJika alert dikirim ulang, apakah user menerima spam?
External APIJika call timeout tapi berhasil di server, apakah retry aman?

7.3 Idempotency Key

Idempotency membutuhkan key stabil.

Pilihan key:

  • source event ID;
  • database transaction ID + row primary key + operation sequence;
  • file path + line number + file checksum;
  • API cursor + item ID + version;
  • business entity ID + version;
  • deterministic hash dari canonical payload.

Key buruk:

  • random UUID dibuat saat processing;
  • timestamp processing;
  • auto-increment sink tanpa natural key;
  • hash payload mentah yang berubah karena field non-semantik.

7.4 Java Design: Idempotent Sink

public interface IdempotentSink<T> {
    SinkResult write(IdempotencyKey key, T value) throws SinkException;
}

public record IdempotencyKey(
        String namespace,
        String naturalKey,
        String version,
        String fingerprint
) {}

public enum SinkResult {
    INSERTED,
    UPDATED,
    ALREADY_APPLIED,
    REJECTED_STALE,
    REJECTED_CONFLICT
}

Implementation ke PostgreSQL-style sink:

CREATE TABLE pipeline_applied_event (
    namespace TEXT NOT NULL,
    natural_key TEXT NOT NULL,
    version TEXT NOT NULL,
    fingerprint TEXT NOT NULL,
    applied_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    PRIMARY KEY (namespace, natural_key, version)
);

Pseudocode:

public SinkResult write(IdempotencyKey key, CaseProjection projection) {
    try {
        appliedEventRepository.insert(key);
        projectionRepository.upsert(projection);
        return SinkResult.INSERTED;
    } catch (DuplicateKeyException duplicate) {
        AppliedEvent existing = appliedEventRepository.get(key.namespace(), key.naturalKey(), key.version());
        if (existing.fingerprint().equals(key.fingerprint())) {
            return SinkResult.ALREADY_APPLIED;
        }
        return SinkResult.REJECTED_CONFLICT;
    }
}

Perhatikan: duplicate dengan fingerprint sama aman. Duplicate dengan key sama tapi fingerprint beda adalah konflik serius.

7.5 Idempotent Bukan Selalu “Do Nothing”

Kadang event yang sama harus menghasilkan output yang sama, bukan sekadar diabaikan.

Contoh:

  • rebuild materialized view;
  • recompute aggregate;
  • replay ke sink baru;
  • re-run correction pipeline.

Maka idempotency perlu dipahami sebagai:

same semantic operation -> same final state

bukan selalu:

second call -> skip

8. Invariant #5 — Replayability

8.1 Makna Replayability

Replayability berarti:

Pipeline dapat memproses ulang data historis untuk menghasilkan output yang dapat dijelaskan.

Replay dibutuhkan untuk:

  • recovery setelah bug;
  • backfill data lama;
  • migrasi schema;
  • rebuild projection;
  • audit/regulatory evidence;
  • melatih ulang model;
  • mengisi sink baru;
  • memperbaiki transform logic.

Pipeline yang tidak bisa replay biasanya akan berubah menjadi kumpulan patch manual.

8.2 Replay Source

Untuk replay, kamu perlu sumber yang cukup durable.

Kemungkinan sumber:

  • raw landing files;
  • Kafka topic dengan retention cukup;
  • compacted changelog;
  • database snapshot;
  • CDC log;
  • object storage bronze layer;
  • event store;
  • audit table;
  • immutable append-only ledger.

Jika source hanya API yang mengembalikan current state, replay historis mungkin mustahil.

8.3 Replay Boundary

Replay harus punya boundary jelas:

Replay what?
From where?
To where?
Using which transform version?
With which side effects enabled?
With which correction rules?

Contoh replay spec:

replayId: replay-20260704-case-status-v3
source:
  type: kafka
  topic: case.lifecycle.events.v1
  fromOffset: 12500000
  toOffset: 15300000
transform:
  name: case-status-projection
  version: 3.2.1
sink:
  table: report_case_status_current_shadow
sideEffects:
  sendNotifications: false
  callExternalApi: false
validation:
  compareWith: report_case_status_current
  tolerance: zero-for-status

8.4 Java Design: Replay Mode

Pipeline harus tahu mode eksekusinya.

public enum ExecutionMode {
    LIVE,
    BACKFILL,
    REPLAY,
    DRY_RUN,
    SHADOW
}

public record PipelineExecutionContext(
        String pipelineName,
        String runId,
        ExecutionMode mode,
        String transformVersion,
        boolean sideEffectsEnabled,
        Instant startedAt
) {}

Gunakan context ini di boundary side effect:

public final class AlertPublisher {
    public void publish(PipelineExecutionContext context, Alert alert) {
        if (!context.sideEffectsEnabled()) {
            audit.info("alert suppressed in mode {} for {}", context.mode(), alert.alertId());
            return;
        }
        notificationClient.send(alert);
    }
}

Replay yang mengirim email/notifikasi eksternal tanpa kontrol adalah insiden.

8.5 Replayability Anti-Patterns

Anti-patternDampak
Transform membaca Instant.now() langsungreplay menghasilkan output berbeda
Sink hanya insert append tanpa idempotencyreplay menggandakan data
Raw input tidak disimpantidak bisa audit ulang
Schema lama tidak bisa dibacadata historis tidak bisa diproses
External API dipanggil saat replayside effect tidak terkendali
Offset range tidak dicatatreplay tidak bisa direproduksi

9. Invariant #6 — Determinism

9.1 Makna Determinism

Determinism berarti:

Dengan input, konfigurasi, transform version, dan reference data version yang sama, pipeline menghasilkan output yang sama.

Determinism bukan berarti output tidak pernah berubah. Output boleh berubah jika:

  • input berubah;
  • logic version berubah;
  • reference data version berubah;
  • correction policy berubah;
  • window/watermark policy berubah.

Yang tidak boleh adalah output berubah tanpa penyebab yang bisa dijelaskan.

9.2 Sumber Non-Determinism

SumberContohMitigasi
Current timeInstant.now() di transforminject clock dari event/context
Random valueUUID random untuk output keydeterministic ID dari input
External lookup mutablelookup current exchange rateversioned reference data
Race conditionparallel update aggregateper-key serialization atau commutative aggregation
Floating pointaggregate financial amount pakai doubleBigDecimal + rounding policy
Unordered collectionoutput tergantung HashMap iterationexplicit sorting
Config driftjob lama dan baru beda configconfig snapshot per run

9.3 Java Design: Deterministic Transform

Buruk:

public CaseRiskOutput transform(CaseEvent event) {
    return new CaseRiskOutput(
            UUID.randomUUID().toString(),
            event.caseId(),
            Instant.now(),
            riskClient.currentScore(event.caseId())
    );
}

Lebih baik:

public record TransformContext(
        String transformVersion,
        Instant processingCutoff,
        ReferenceDataSnapshot referenceData,
        Clock clockForMetadataOnly
) {}

public CaseRiskOutput transform(CaseEvent event, TransformContext context) {
    RiskScore score = context.referenceData().riskScoreFor(event.caseId(), event.eventTime());

    return new CaseRiskOutput(
            deterministicOutputId(event),
            event.caseId(),
            event.eventTime(),
            score.value(),
            context.transformVersion(),
            context.referenceData().version()
    );
}

private String deterministicOutputId(CaseEvent event) {
    return "case-risk:" + event.caseId() + ":" + event.aggregateVersion();
}

Kuncinya:

  • output ID deterministik;
  • waktu bisnis berasal dari event;
  • reference data versi eksplisit;
  • transform version disimpan.

9.4 Determinism vs Parallelism

Parallelism sering mengancam determinism.

Contoh aggregate:

sum(amount)

Secara matematis komutatif. Namun jika memakai floating point, urutan penjumlahan bisa mengubah hasil kecil. Untuk uang, jangan pakai double. Gunakan BigDecimal atau integer minor unit.

Contoh lain:

first event wins

Jika “first” berarti processing time, parallelism bisa membuat hasil berbeda antar run. Definisikan first berdasarkan:

  • event time;
  • sequence number;
  • source offset;
  • deterministic tie-breaker.

10. Invariant #7 — Bounded Side Effects

10.1 Makna Bounded Side Effects

Pipeline sering tidak hanya menulis data. Ia bisa:

  • mengirim email;
  • memanggil external API;
  • membuka case;
  • membuat ticket;
  • update search index;
  • menghapus data;
  • trigger workflow;
  • mengirim alert.

Side effect berbahaya karena tidak selalu bisa di-replay atau di-rollback.

Bounded side effects berarti:

Efek eksternal pipeline dibatasi, diberi idempotency key, dicatat, dan dipisahkan dari transformasi murni sebisa mungkin.

10.2 Pisahkan Pure Transform dan Effect

Desain yang buruk:

public void process(Event event) {
    Output output = transform(event);
    repository.save(output);
    emailClient.send(output.email());
    kafkaProducer.send(output.event());
}

Jika emailClient.send berhasil tapi kafkaProducer.send gagal, recovery menjadi ambigu.

Desain lebih baik:

Transform menghasilkan effect intent, bukan langsung melakukan effect.

public record EffectIntent(
        String effectId,
        String effectType,
        String idempotencyKey,
        JsonNode payload,
        String sourceEventId,
        String transformVersion
) {}

Dispatcher memproses intent dengan ledger:

public enum EffectStatus {
    PENDING,
    SENT,
    CONFIRMED,
    FAILED_RETRYABLE,
    FAILED_TERMINAL,
    SUPPRESSED
}

10.3 Side Effect Rules

Rule umum:

  1. Pure transform boleh replay kapan saja.
  2. Sink data harus idempotent.
  3. External side effect harus punya effect ledger.
  4. Replay/backfill default-nya tidak mengirim side effect eksternal.
  5. Side effect yang irreversible harus dipisah dari hot path.
  6. Setiap external call harus punya timeout, retry budget, dan idempotency key jika sistem tujuan mendukung.

10.4 Compensation Bukan Rollback

Di sistem terdistribusi, rollback sering tidak mungkin. Yang mungkin adalah compensation.

Contoh:

  • email salah tidak bisa “di-unemail”; kirim koreksi;
  • ticket salah tidak bisa hilang dari audit; close dengan reason;
  • payment/refund harus entry baru, bukan delete transaksi lama.

Untuk pipeline regulated, jangan hapus jejak. Buat event koreksi.


11. Invariant #8 — Auditability

11.1 Makna Auditability

Auditability berarti:

Output pipeline bisa dijelaskan secara faktual: input apa yang berkontribusi, logic versi apa yang dipakai, kapan diproses, siapa/apa yang memicu, dan bagaimana error ditangani.

Auditability penting bukan hanya untuk regulator. Ia juga mempercepat debugging.

11.2 Pertanyaan Audit

Pipeline production-grade harus bisa menjawab:

  • Dari input mana row ini berasal?
  • Event mana yang terakhir mengubah output ini?
  • Transform version apa yang menghasilkan output?
  • Schema version apa yang dipakai?
  • Reference data version mana yang dipakai?
  • Apakah output ini hasil live processing, replay, atau backfill?
  • Apakah ada data yang di-quarantine?
  • Apakah ada late correction setelah output awal?
  • Siapa yang menjalankan rerun?
  • Apakah hasil rerun sama dengan hasil lama?

11.3 Java Metadata untuk Audit

public record AuditMetadata(
        String sourceSystem,
        String sourceRecordId,
        String sourceOffset,
        String sourceTransactionId,
        String pipelineName,
        String pipelineRunId,
        String transformVersion,
        String schemaVersion,
        String referenceDataVersion,
        ExecutionMode executionMode,
        Instant eventTime,
        Instant ingestedAt,
        Instant processedAt
) {}

Simpan metadata ini di output penting atau side table.

Untuk high-volume data, metadata bisa disimpan sebagai:

  • columns utama untuk field yang sering dicari;
  • JSON metadata untuk detail tambahan;
  • lineage table untuk many-to-many input-output;
  • OpenLineage-style event untuk dataset-level lineage;
  • immutable audit log untuk perubahan penting.

11.4 Auditability vs Observability

Observability menjawab:

Sistem sedang sehat atau tidak?

Auditability menjawab:

Mengapa output ini seperti ini?

Keduanya berbeda.

Metric records_processed_total tidak cukup untuk audit. Log “processed successfully” juga tidak cukup. Audit membutuhkan relasi input-output dan versi logic.


12. Composite Invariants: Ketika Invariant Saling Bertabrakan

Dalam sistem nyata, invariant sering trade-off.

12.1 Freshness vs Completeness

Dashboard real-time ingin cepat. Finance report ingin lengkap.

Solusi:

  • output bertingkat: preliminary/final;
  • watermark metadata;
  • correction pipeline;
  • SLA berbeda per use case;
  • expose freshness dan completeness confidence.

12.2 Ordering vs Throughput

Per-key ordering menjaga lifecycle entity. Tetapi global ordering membunuh throughput.

Solusi:

  • per-key ordering;
  • partition key sesuai aggregate;
  • commutative aggregation jika memungkinkan;
  • holdback buffer hanya untuk key yang bermasalah;
  • detect gap, bukan serialize seluruh dunia.

12.3 Replayability vs Cost

Menyimpan raw input selamanya mahal.

Solusi:

  • retention tiering;
  • compacted representation;
  • raw immutable bronze untuk domain penting;
  • archive cold storage;
  • replay window berdasarkan regulatory dan bisnis;
  • transform output snapshot untuk mempercepat rebuild.

12.4 Determinism vs Mutable Reference Data

Transform sering butuh lookup data yang berubah.

Solusi:

  • versioned reference data;
  • effective-dated lookup;
  • snapshot reference per run;
  • store reference version in output;
  • rerun with explicit reference snapshot.

13. Invariant-First Pipeline Design Process

Gunakan proses berikut sebelum menulis implementasi.

13.1 Step 1 — Define Business Fact

Jangan mulai dari tabel atau topic. Mulai dari fakta.

Contoh:

A case was escalated from level L1 to L2 at time T by actor A for reason R.

Ini berbeda dari:

row in case_status table changed

Row change adalah representasi teknis. Business fact adalah makna.

13.2 Step 2 — Define Source Scope

Tentukan input yang termasuk scope:

  • tabel mana;
  • event type mana;
  • tenant mana;
  • time range mana;
  • status mana;
  • source system mana;
  • schema version mana.

Tanpa source scope, completeness tidak bisa diukur.

13.3 Step 3 — Define Output Contract

Tentukan output:

  • bentuk data;
  • semantic meaning;
  • key uniqueness;
  • freshness expectation;
  • correction behavior;
  • consumer assumption;
  • retention;
  • ownership.

13.4 Step 4 — Define Invariants

Minimal jawab:

Completeness: apa yang tidak boleh hilang?
Ordering: urutan apa yang harus dijaga?
Freshness: seberapa cepat output harus terlihat?
Idempotency: duplicate/retry aman di mana?
Replayability: dari mana dan sampai mana bisa replay?
Determinism: apa yang membuat hasil bisa berubah?
Side effects: efek eksternal apa yang harus dibatasi?
Auditability: bukti apa yang harus tersedia?

13.5 Step 5 — Map Failure Modes

Untuk setiap invariant, tulis failure mode.

Contoh:

Invariant: every approved case event updates reporting projection exactly once semantically.
Failure modes:
- duplicate Kafka delivery
- consumer crash after DB write before offset commit
- DB timeout after successful commit
- event schema missing status field
- out-of-order event version
- replay to same table

Kemudian desain mitigasi.


14. Implementation Skeleton: Invariant-Aware Java Pipeline

Berikut skeleton sederhana yang akan kita kembangkan di part berikutnya.

public interface Pipeline<I, O> {
    void run(PipelineExecutionContext context);
}

public interface Source<I> {
    List<SourceRecord<I>> poll(SourceCursor cursor, int maxRecords);
}

public interface Transformer<I, O> {
    List<TransformResult<O>> transform(SourceRecord<I> input, TransformContext context);
}

public interface Sink<O> {
    SinkResult write(OutputRecord<O> output, PipelineExecutionContext context);
}

public interface CheckpointStore {
    Optional<SourceCursor> load(String pipelineName);
    void save(String pipelineName, SourceCursor cursor, CheckpointMetadata metadata);
}

Record envelope:

public record SourceRecord<T>(
        String sourceSystem,
        String sourceRecordId,
        String sourceCursor,
        String partitionKey,
        Instant eventTime,
        Instant ingestedAt,
        String schemaVersion,
        T payload,
        Map<String, String> headers
) {}

Output envelope:

public record OutputRecord<T>(
        String outputKey,
        String outputVersion,
        String fingerprint,
        T payload,
        AuditMetadata audit
) {}

Runner dengan urutan aman:

public final class InvariantAwarePipeline<I, O> implements Pipeline<I, O> {
    private final Source<I> source;
    private final Transformer<I, O> transformer;
    private final Sink<O> sink;
    private final CheckpointStore checkpoints;
    private final ProcessingLedger ledger;

    @Override
    public void run(PipelineExecutionContext context) {
        SourceCursor cursor = checkpoints.load(context.pipelineName()).orElse(SourceCursor.initial());

        while (true) {
            List<SourceRecord<I>> records = source.poll(cursor, 500);
            if (records.isEmpty()) {
                return;
            }

            for (SourceRecord<I> input : records) {
                ledger.received(context, input);

                try {
                    List<TransformResult<O>> outputs = transformer.transform(input, context.toTransformContext());

                    for (TransformResult<O> output : outputs) {
                        sink.write(output.toOutputRecord(input, context), context);
                    }

                    ledger.processed(context, input);
                    cursor = cursor.advanceTo(input.sourceCursor());
                } catch (RetryablePipelineException retryable) {
                    ledger.failedRetryable(context, input, retryable);
                    throw retryable;
                } catch (NonRetryablePipelineException terminal) {
                    ledger.quarantined(context, input, terminal);
                    cursor = cursor.advanceTo(input.sourceCursor());
                }
            }

            checkpoints.save(context.pipelineName(), cursor, CheckpointMetadata.from(context));
        }
    }
}

Ini belum sempurna. Namun ada prinsip penting:

  • ledger mencatat coverage;
  • sink harus idempotent;
  • checkpoint tidak maju sebelum record punya terminal status;
  • non-retryable masuk quarantine agar tidak memblokir seluruh stream;
  • retryable menghentikan loop agar tidak melompati data yang belum jelas.

15. Testing Invariants, Bukan Hanya Code Path

Testing pipeline production-grade harus menyerang invariant.

15.1 Completeness Test

@Test
void everyInputRecordMustReachTerminalStatus() {
    PipelineRunResult result = harness.runWithInputs(List.of(validA, invalidB, validC));

    assertThat(result.ledgerStatus(validA)).isEqualTo(PROCESSED);
    assertThat(result.ledgerStatus(invalidB)).isEqualTo(QUARANTINED);
    assertThat(result.ledgerStatus(validC)).isEqualTo(PROCESSED);
    assertThat(result.unknownRecords()).isEmpty();
}

15.2 Idempotency Test

@Test
void duplicateInputMustNotDuplicateSinkEffect() {
    harness.runWithInputs(List.of(eventA));
    harness.runWithInputs(List.of(eventA));

    assertThat(sink.rowsFor(eventA.caseId())).hasSize(1);
    assertThat(effectLedger.effectsFor(eventA.eventId())).hasSize(1);
}

15.3 Replay Determinism Test

@Test
void sameInputAndTransformVersionMustProduceSameOutput() {
    List<OutputRecord<CaseProjection>> first = harness.runReplay(inputs, transformV3);
    List<OutputRecord<CaseProjection>> second = harness.runReplay(inputs, transformV3);

    assertThat(second).isEqualTo(first);
}

15.4 Ordering Test

@Test
void outOfOrderAggregateVersionMustBeHeldOrRejected() {
    CaseEvent v2 = eventVersion(2);

    PipelineRunResult result = harness.runWithInputs(List.of(v2));

    assertThat(result.holdingArea()).contains(v2);
    assertThat(result.sinkRows()).isEmpty();
}

16. Operational Metrics Mapped to Invariants

Jangan membuat metrics acak. Map metric ke invariant.

InvariantMetric
Completenessinput_received_total, terminal_status_total, unknown_record_count
Orderinggap_detected_total, out_of_order_total, holding_area_size
Freshnessingestion_lag_seconds, watermark_lag_seconds, sink_visibility_lag_seconds
Idempotencyduplicate_detected_total, idempotent_replay_total, conflict_detected_total
Replayabilityreplay_runs_total, replay_input_count, replay_diff_count
Determinismreplay_hash_mismatch_total, output_fingerprint_change_total
Side effectseffect_pending_count, effect_duplicate_suppressed_total, effect_failed_total
Auditabilityoutput_without_lineage_count, unknown_transform_version_count

Metric yang bagus punya tindakan jelas.

Jika unknown_record_count > 0, operator tahu ada data yang belum terminal. Jika output_without_lineage_count > 0, itu bug audit.


17. Production Review Checklist

Gunakan checklist ini saat review desain pipeline.

17.1 Scope and Contract

  • Apa business fact yang dipindahkan atau diturunkan?
  • Apa source-of-truth-nya?
  • Apa input scope eksplisit?
  • Apa output contract eksplisit?
  • Siapa consumer output?
  • Apakah output preliminary, final, atau correctable?

17.2 Completeness

  • Bagaimana mendeteksi silent loss?
  • Apakah semua input punya terminal status?
  • Apakah skip/quarantine dicatat eksplisit?
  • Apakah source cursor disimpan setelah raw data durable?
  • Apakah ada reconciliation source-to-sink?

17.3 Ordering

  • Ordering scope apa yang diperlukan?
  • Apakah partition key sesuai aggregate?
  • Apa yang terjadi jika event out-of-order?
  • Apa yang terjadi jika ada gap versi?
  • Apakah late event dikoreksi atau diabaikan?

17.4 Freshness

  • Freshness dihitung dari event time, ingestion time, atau processing time?
  • Apa SLO p95/p99?
  • Apa behavior saat lag melebihi SLO?
  • Apakah output membawa watermark/as-of metadata?

17.5 Idempotency

  • Apa idempotency key?
  • Apakah sink write duplicate-safe?
  • Apa yang terjadi jika timeout setelah commit berhasil?
  • Apakah replay ke sink yang sama aman?
  • Apakah fingerprint konflik dideteksi?

17.6 Replayability and Determinism

  • Dari mana replay dilakukan?
  • Apakah retention cukup?
  • Apakah transform version disimpan?
  • Apakah reference data versioned?
  • Apakah output bisa dibandingkan antar run?
  • Apakah side effect dimatikan saat replay?

17.7 Auditability

  • Apakah output punya source lineage?
  • Apakah schema version disimpan?
  • Apakah run ID disimpan?
  • Apakah quarantine bisa ditelusuri?
  • Apakah operator action tercatat?

18. Mental Model Final

Jangan desain pipeline dari diagram ini:

A -> B -> C

Desain dari pertanyaan ini:

What must remain true when A, B, or C fails independently?

Sebuah pipeline production-grade adalah gabungan dari:

  • data movement;
  • semantic transformation;
  • state transition;
  • failure containment;
  • replay mechanism;
  • audit evidence;
  • operational control.

Tools hanya membantu. Invariant yang membuat sistem bisa dipercaya.


19. Ringkasan

Kita sudah membangun delapan invariant inti:

  1. Completeness — tidak ada data hilang diam-diam.
  2. Ordering — urutan yang bermakna tetap terjaga sesuai scope.
  3. Freshness — output cukup baru dan freshness-nya terukur.
  4. Idempotency — retry, duplicate, dan replay aman.
  5. Replayability — pipeline bisa diproses ulang dengan boundary jelas.
  6. Determinism — hasil bisa direproduksi untuk input dan versi yang sama.
  7. Bounded side effects — efek eksternal dikendalikan dan diaudit.
  8. Auditability — output bisa dijelaskan secara faktual.

Mulai part berikutnya, kita akan membedah taxonomy pipeline: batch, streaming, CDC, request-driven, reverse ETL, dan materialization. Tujuannya bukan hafal kategori, tetapi tahu konsekuensi desain masing-masing terhadap invariant di atas.


References

Lesson Recap

You just completed lesson 03 in start here. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.

Continue The Track

Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.