Deepen PracticeOrdered learning track

Reconciliation Patterns

Learn Java Data Pipeline Pattern - Part 069

Reconciliation patterns for proving pipeline outputs are complete, accurate, and explainable across source, transport, transformation, and sink boundaries.

14 min read2629 words
PrevNext
Lesson 6984 lesson track46–69 Deepen Practice
#java#data-pipeline#reconciliation#reliability+3 more

Part 069 — Reconciliation Patterns

A pipeline is not correct because it ran successfully.
A pipeline is correct when you can prove that the output corresponds to the intended input under a declared transformation, time window, and version.

In earlier parts we built contracts, quality gates, lineage, SLOs, and operational observability. Those are necessary, but they still leave one dangerous question unanswered:

How do we know the pipeline output is actually the right output?

A job may finish. The Airflow task may be green. Kafka lag may be zero. The Flink checkpoint may be healthy. The Iceberg snapshot may commit. Yet the business result can still be wrong.

Common examples:

  • 1.2 million source records existed, but only 1.19 million reached the sink.
  • A CDC connector skipped deletes because tombstones were mishandled.
  • A backfill overwrote a partition with a partial result.
  • A stream aggregation double-counted retry events.
  • A late correction arrived after a report was published.
  • A join silently dropped events because reference data was stale.
  • A sink succeeded for rows but failed for the checkpoint update.
  • A manual rerun reprocessed the wrong source window.

Reconciliation is the discipline of making these failures visible.

It is not one check. It is a family of proof techniques.


1. The Core Mental Model

A data pipeline transforms an input set into an output set.

Input Dataset + Transform Version + Run Context -> Output Dataset

Reconciliation asks:

Can we prove that Output Dataset is the expected consequence of Input Dataset?

That proof can be exact, approximate, statistical, semantic, or operational. The correct choice depends on the asset.

For a payment ledger, approximate proof is not enough. For a clickstream trend chart, exact row-level matching may be too expensive. For regulatory enforcement lifecycle reports, you usually need partition-level and entity-level evidence, because auditors ask why a specific case appeared or did not appear in a report.

A reconciliation design should declare:

DimensionQuestion
ScopeWhich source range, partition, topic offset, file batch, API cursor, or DB snapshot is being reconciled?
GrainAre we reconciling files, partitions, records, entities, amounts, windows, or snapshots?
MethodCount, checksum, hash, balance, anti-join, sample, invariant, or ledger verification?
ToleranceMust it match exactly, or is bounded drift allowed?
TimingBefore publish, after publish, continuously, or during audit?
ActionBlock, warn, quarantine, restate, rollback, or open incident?
EvidenceWhere is the reconciliation result stored?

A pipeline without reconciliation is a trust request. A pipeline with reconciliation is an evidence-producing system.


2. Reconciliation Is Not the Same as Data Quality

Data quality checks ask whether the data satisfies rules.

Examples:

case_id must not be null
amount must be >= 0
status must be one of OPEN, CLOSED, ESCALATED
assigned_officer_id must exist in officer reference table

Reconciliation asks whether movement and transformation preserved the intended truth.

Examples:

All source cases updated between T1 and T2 are represented in the canonical case table.
The total penalty amount in the gold report equals the sum of validated silver penalty facts.
Every Kafka input offset up to N has either produced a sink effect or a quarantine record.
Every source file in the manifest has exactly one import ledger entry.

Quality checks validate shape and meaning. Reconciliation validates continuity and accounting.

Both are required.

A useful heuristic:

Data quality asks: Is this data valid?
Reconciliation asks: Is this the complete and correct consequence of the prior state?

3. The Reconciliation Boundary

Never say “we reconcile the pipeline” without naming the boundary.

A boundary is a point where loss, duplication, mutation, or misalignment can occur.

Common boundaries:

BoundaryTypical Reconciliation Question
Source DB -> CDC topicDid every committed source change become a CDC event?
CDC topic -> canonical topicDid every relevant CDC event become exactly one canonical event, correction, or rejection?
Kafka topic -> Flink stateDid every consumed event affect state or produce a declared skip/quarantine?
Flink state -> sink tableDid every emitted result reach the sink idempotently?
Bronze -> silverDid every accepted raw record map to a canonical record or rejection?
Silver -> goldDo aggregated totals match source fact tables?
File landing -> archiveDid every landed file get imported, rejected, or expired?
API sync -> target tableDid every page/cursor range produce expected entities and deletion states?
Backfill staging -> publishDoes staged output match expected partition count/checksum before replacing production?

The worse the failure mode, the stronger the reconciliation needs to be.

For example, Kafka lag zero proves only that a consumer group has advanced offsets. It does not prove that each record produced the intended sink effect. If sink writes and offset commits are not atomic, lag can be zero while output is wrong.


4. Reconciliation Result as a First-Class Artifact

Do not bury reconciliation in logs.

A reconciliation result should be a durable artifact, queryable by run, asset, partition, and time range.

A minimal model:

public record ReconciliationResult(
    String reconciliationId,
    String pipelineId,
    String runId,
    String assetName,
    ReconciliationScope scope,
    ReconciliationMethod method,
    ReconciliationStatus status,
    Map<String, MetricValue> expected,
    Map<String, MetricValue> actual,
    List<ReconciliationDifference> differences,
    String transformVersion,
    Instant evaluatedAt
) {}

public enum ReconciliationStatus {
    PASSED,
    PASSED_WITH_TOLERANCE,
    FAILED,
    INCONCLUSIVE,
    SKIPPED
}

public record ReconciliationScope(
    String sourceDataset,
    String targetDataset,
    Instant sourceFromInclusive,
    Instant sourceToExclusive,
    Map<String, String> partitions,
    Map<String, String> sourcePositions
) {}

public enum ReconciliationMethod {
    COUNT,
    CHECKSUM,
    HASH_TOTAL,
    BALANCE_TOTAL,
    ANTI_JOIN,
    LEDGER,
    SAMPLE,
    INVARIANT,
    WATERMARK_COMPLETENESS
}

This artifact becomes part of the evidence trail:

run_manifest
  -> input_manifest
  -> transform_manifest
  -> output_manifest
  -> reconciliation_result
  -> publication_decision

If a stakeholder asks, “Why was yesterday’s report published?”, the answer should not be “because the job was green.”

The answer should be:

Because run R processed source snapshot S, transform version V, produced output snapshot O,
passed contract C, passed quality gate Q, passed reconciliation result K,
and publication event P promoted O at time T.

5. Count Reconciliation

Count reconciliation is the simplest and most commonly used method.

expected_count == actual_count

But naïve count checks are often misleading.

Bad Count Check

source_count = count(*) from source_table where updated_at >= T1 and updated_at < T2
sink_count   = count(*) from target_table where updated_at >= T1 and updated_at < T2

Problems:

  • Source updated_at may change after extraction.
  • Sink updated_at may represent processing time, not source update time.
  • Deletes may not exist in the sink.
  • One source record may produce zero, one, or many sink records.
  • Late corrections may belong to old business time but new processing time.
  • Retries may duplicate sink records.

Better Count Check

Declare the expected cardinality relationship.

source accepted records == canonical records + rejected records

Example:

-- Source ingestion ledger
select count(*) as source_records
from ingestion_record_ledger
where run_id = :run_id
  and decision in ('ACCEPTED', 'REJECTED');

-- Canonical output
select count(*) as canonical_records
from canonical_case_event
where run_id = :run_id;

-- Rejections
select count(*) as rejected_records
from ingestion_rejection
where run_id = :run_id;

Invariant:

source_records = canonical_records + rejected_records

This is stronger because every source record must end in a declared terminal state.

Cardinality Contract

Every transform should declare cardinality:

Transform TypeExpected Cardinality
Parser1 raw record -> 0 or 1 parsed record + optional rejection
Filter1 input -> 0 or 1 output + skip reason
Splitter1 input -> N outputs
Join enrichment1 input -> 1 output or quarantine if required reference missing
AggregationN inputs -> 1 output per group/window
CDC current projectionN changes -> 1 current state per key

Make it explicit:

public enum CardinalityMode {
    ONE_TO_ONE,
    ZERO_OR_ONE,
    ONE_TO_MANY,
    MANY_TO_ONE,
    MANY_TO_MANY
}

public record TransformContract(
    String transformName,
    String version,
    CardinalityMode cardinalityMode,
    boolean requiresTerminalDecisionForEveryInput
) {}

Count reconciliation is powerful when paired with a terminal-decision ledger.


6. Checksum Reconciliation

Counts prove quantity, not identity.

If one record is missing and another duplicate exists, counts still match.

Checksum reconciliation helps detect that.

A simple pattern:

source_count == target_count
source_checksum == target_checksum

The checksum must be based on stable, canonical fields.

Bad checksum input:

JSON string as received

Why bad?

  • Field order may differ.
  • Whitespace may differ.
  • Equivalent numbers may serialize differently.
  • Non-semantic metadata may differ.
  • Timestamp formatting may differ.

Better checksum input:

canonical key + canonical payload fields + source version + business effective time

Example Java canonical hash:

import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.HexFormat;
import java.util.List;
import java.util.Objects;

public final class StableHash {
    private StableHash() {}

    public static String sha256(List<String> fields) {
        try {
            MessageDigest digest = MessageDigest.getInstance("SHA-256");
            for (String field : fields) {
                String normalized = normalize(field);
                digest.update(normalized.getBytes(StandardCharsets.UTF_8));
                digest.update((byte) 0x1F); // unit separator
            }
            return HexFormat.of().formatHex(digest.digest());
        } catch (NoSuchAlgorithmException e) {
            throw new IllegalStateException("SHA-256 unavailable", e);
        }
    }

    private static String normalize(String value) {
        return Objects.toString(value, "<NULL>").trim();
    }

    public static String caseEventHash(
        String caseId,
        String eventType,
        String status,
        Instant businessEffectiveAt,
        String schemaVersion
    ) {
        return sha256(List.of(
            caseId,
            eventType,
            status,
            businessEffectiveAt.toString(),
            schemaVersion
        ));
    }
}

At partition level, you usually do not store one giant hash of all rows as concatenated strings. Instead, store per-record hashes and aggregate them deterministically.

Options:

MethodUse CaseCaveat
Sum of numeric hashFast partition-level comparisonPossible collision; use strong hash and large numeric type
XOR hashDetect changed membership cheaplyDuplicate even-count records can cancel out
Sorted hash chainStronger identity proofMore expensive; requires stable ordering
Merkle treeLarge partition diff localizationMore complex implementation

For critical systems, use count + strong checksum + anti-join drilldown.


7. Hash Total Pattern

The hash total pattern is old but still useful.

Instead of comparing business totals, you compute a numeric total over hashed identifiers.

Example:

sum(hash(case_id))

This detects membership mismatch better than count alone.

A practical reconciliation tuple:

count_rows
count_distinct_keys
sum_hash_key
sum_hash_payload
min_event_time
max_event_time

Java model:

import java.math.BigInteger;
import java.time.Instant;

public record PartitionFingerprint(
    long rowCount,
    long distinctKeyCount,
    BigInteger keyHashTotal,
    BigInteger payloadHashTotal,
    Instant minEventTime,
    Instant maxEventTime
) {
    public boolean sameAs(PartitionFingerprint other) {
        return rowCount == other.rowCount
            && distinctKeyCount == other.distinctKeyCount
            && keyHashTotal.equals(other.keyHashTotal)
            && payloadHashTotal.equals(other.payloadHashTotal)
            && minEventTime.equals(other.minEventTime)
            && maxEventTime.equals(other.maxEventTime);
    }
}

Hash totals are not a substitute for row-level verification where exact audit is required, but they are excellent as a first-pass partition guard.


8. Balance Reconciliation

For financial, inventory, quota, enforcement penalty, or obligation systems, business amounts matter.

Count and checksum can match while totals are wrong because numeric semantics changed.

Balance reconciliation checks business aggregates.

Examples:

sum(source.penalty_amount) == sum(gold.penalty_amount)
sum(opening_balance + debits - credits) == closing_balance
sum(case_obligation_created) - sum(case_obligation_resolved) == active_obligation_balance

A regulatory enforcement example:

For every enforcement case in report month M:
  total_assessed_penalty
  - total_withdrawn_penalty
  - total_reduced_penalty
  + total_increased_penalty
  = currently_enforceable_penalty

Balance reconciliation requires semantic classification.

public enum AmountEffect {
    INCREASE,
    DECREASE,
    NEUTRAL
}

public record AmountFact(
    String factId,
    String caseId,
    String amountType,
    AmountEffect effect,
    long amountMinorUnits,
    String currency,
    Instant effectiveAt
) {
    public long signedAmount() {
        return switch (effect) {
            case INCREASE -> amountMinorUnits;
            case DECREASE -> -amountMinorUnits;
            case NEUTRAL -> 0L;
        };
    }
}

The common anti-pattern is reconciling raw numeric columns without understanding whether they represent deltas, balances, replacements, or corrections.

A penalty amount column might mean:

  • assessed amount at decision time,
  • current amount after variation,
  • delta amount in an adjustment event,
  • display amount in a report snapshot,
  • historical amount valid at a previous time.

Balance reconciliation fails unless these meanings are explicit.


9. Anti-Join Reconciliation

Count and checksum tell you that something differs. Anti-join tells you what differs.

Typical checks:

-- Source keys missing from target
select s.case_id
from source_keys s
left join target_keys t on t.case_id = s.case_id
where t.case_id is null;

-- Target keys not explainable by source
select t.case_id
from target_keys t
left join source_keys s on s.case_id = t.case_id
where s.case_id is null;

For large datasets, do this at partition level and sample or limit differences for alert payloads.

Do not run unlimited anti-joins in hot production paths.

A practical pattern:

1. Compute partition fingerprints.
2. If fingerprints match, mark partition reconciled.
3. If fingerprints differ, run anti-join drilldown for that partition.
4. Store first N differences plus difference counts.
5. Open incident or block publication depending on severity.

Mermaid flow:


10. Ledger-Style Reconciliation

Ledger reconciliation is the strongest pattern for operational pipelines.

The idea:

Every input receives a terminal accounting state.

Terminal states:

StateMeaning
ACCEPTEDInput produced intended output.
REJECTEDInput was invalid and rejection was recorded.
SKIPPEDInput was intentionally ignored with reason.
QUARANTINEDInput cannot be safely processed yet.
DUPLICATEInput was duplicate of already-accounted input.
SUPERSEDEDInput was replaced by a newer correction or version.

A ledger table:

create table pipeline_effect_ledger (
    ledger_id uuid primary key,
    pipeline_id text not null,
    run_id text not null,
    source_system text not null,
    source_position text not null,
    source_record_id text not null,
    idempotency_key text not null,
    terminal_state text not null,
    target_asset text,
    target_key text,
    output_fingerprint text,
    error_code text,
    error_message text,
    transform_version text not null,
    recorded_at timestamptz not null,
    unique (pipeline_id, idempotency_key)
);

Invariant:

For every source input in scope,
there exists exactly one terminal ledger state for the active transform version and run mode.

This is especially useful when:

  • source data contains duplicates,
  • transform can reject records,
  • sink is external,
  • processing is retried,
  • replay/backfill is frequent,
  • audit evidence matters.

A Java writer:

public interface EffectLedger {
    LedgerDecision recordSuccess(EffectSuccess success);
    LedgerDecision recordRejection(EffectRejection rejection);
    LedgerDecision recordQuarantine(EffectQuarantine quarantine);
    boolean alreadyRecorded(String pipelineId, String idempotencyKey);
}

public record EffectSuccess(
    String pipelineId,
    String runId,
    String sourcePosition,
    String sourceRecordId,
    String idempotencyKey,
    String targetAsset,
    String targetKey,
    String outputFingerprint,
    String transformVersion,
    Instant recordedAt
) {}

public enum LedgerDecision {
    INSERTED,
    ALREADY_EXISTS_SAME_EFFECT,
    CONFLICT_DIFFERENT_EFFECT
}

The dangerous case is CONFLICT_DIFFERENT_EFFECT.

It means the same idempotency key attempted to produce a different output. That is not just a duplicate. It is evidence of non-deterministic transformation, bad key design, or changed input semantics.


11. Offset-to-Effect Reconciliation

For Kafka pipelines, a common failure is confusing offset commit with effect completion.

A robust consumer should be able to answer:

For partition P and offsets [A..B], what happened to each record?

Possible states:

PROCESSED_TO_SINK
REJECTED_TO_DLQ
QUARANTINED
SKIPPED_BY_RULE
DUPLICATE_ALREADY_APPLIED
FAILED_RETRYABLE
FAILED_NON_RETRYABLE

Model:

public record OffsetEffect(
    String topic,
    int partition,
    long offset,
    String eventId,
    String effectState,
    String sinkAsset,
    String sinkKey,
    String errorCode,
    Instant processedAt
) {}

Offset reconciliation query:

select
    topic,
    partition,
    min(offset) as min_offset,
    max(offset) as max_offset,
    count(*) as accounted_offsets,
    count(*) filter (where effect_state = 'PROCESSED_TO_SINK') as sink_success,
    count(*) filter (where effect_state = 'QUARANTINED') as quarantined,
    count(*) filter (where effect_state like 'FAILED%') as failed
from kafka_offset_effect_ledger
where topic = :topic
  and partition = :partition
  and offset between :from_offset and :to_offset
group by topic, partition;

If Kafka committed offset is 10,000 but the effect ledger only accounts for 9,990 offsets, you have a correctness gap.

Do not hide it behind consumer lag.


12. File Manifest Reconciliation

File ingestion needs reconciliation because file systems and object stores are full of ambiguous states:

  • file is visible before complete,
  • file is overwritten,
  • file appears twice,
  • file is renamed,
  • file has wrong row count,
  • file has valid format but wrong trailer,
  • archive failed after import,
  • manifest and actual files disagree.

A strong pattern uses a manifest.

{
  "batchId": "batch-2026-07-04-001",
  "producer": "case-management-exporter",
  "files": [
    {
      "path": "landing/cases-0001.csv",
      "rowCount": 250000,
      "sha256": "...",
      "schemaVersion": "case-export.v3"
    }
  ]
}

Reconciliation checks:

manifest_file_count == discovered_file_count
manifest_row_count == parsed_row_count
manifest_hash == actual_file_hash
parsed_row_count == accepted + rejected
accepted == canonical_output + skipped_with_reason

File ledger:

create table file_import_ledger (
    batch_id text not null,
    file_path text not null,
    file_hash text not null,
    declared_row_count bigint not null,
    parsed_row_count bigint,
    accepted_row_count bigint,
    rejected_row_count bigint,
    import_status text not null,
    imported_at timestamptz,
    archived_at timestamptz,
    primary key (batch_id, file_path)
);

A file is not “done” because a parser read it. It is done when it has an accounting state and archive/disposition evidence.


13. CDC Reconciliation

CDC reconciliation is harder than batch reconciliation because the source is moving.

You usually need multiple layers:

13.1 Snapshot Reconciliation

During initial load:

source table snapshot count == target snapshot count
source table key hash == target key hash
source max primary key / source commit position recorded

13.2 Change Stream Reconciliation

For CDC events:

source log position range -> CDC events -> canonical events -> sink effects

Metrics:

MetricMeaning
Source LSN/binlog positionDatabase log progress
Connector offsetCDC engine progress
Topic offsetKafka durability progress
Consumer offsetPipeline read progress
Sink effect countActual applied output
Delete/tombstone countDeletion handling evidence
Schema change countEvolution events observed

13.3 Current State Reconciliation

For CDC current projection:

select count(*) from source_table where not deleted
==
select count(*) from target_current_table where is_current = true

But count alone is insufficient.

Better:

count + key hash + payload hash + delete count + max source position

13.4 Delete Reconciliation

Deletes are a common source of silent divergence.

Declare delete semantics:

Source DeleteTarget Action
Hard deleteTombstone target record
Soft delete flagUpdate current state with deleted marker
Retraction eventAppend correction/retraction fact
Regulatory archiveRetain historical row, remove from active view

Reconcile delete counts separately.

select count(*) from cdc_event
where op = 'DELETE'
  and source_lsn between :from and :to;

select count(*) from target_current
where deleted_at is not null
  and source_lsn between :from and :to;

14. Window Reconciliation for Streaming Aggregates

Streaming aggregations are difficult because windows close over time, late data can arrive, and corrections can restate output.

A window output should include a reconciliation envelope:

public record WindowAggregateOutput(
    String windowId,
    Instant windowStart,
    Instant windowEnd,
    Instant watermarkAtEmission,
    long inputEventCount,
    long distinctBusinessKeyCount,
    long rejectedEventCount,
    long lateEventCount,
    String aggregateFingerprint,
    String transformVersion,
    boolean finalForWatermark,
    boolean subjectToCorrection
) {}

Window reconciliation asks:

For window W:
  input events assigned to W
  - rejected events
  - duplicates
  + valid correction effects
  = aggregate contribution count

For regulatory reporting, never treat a streaming aggregate as eternally final unless the source domain provides a finality signal.

Better states:

PRELIMINARY
WATERMARK_FINAL
BUSINESS_FINAL
RESTATED
SUPERSEDED

15. Reconciliation for Backfill

Backfill reconciliation must be stronger than normal daily processing because it intentionally rewrites history.

A backfill should have:

backfill_campaign_id
source_scope
transform_version
target_scope
expected_mutation
staging_location
validation_result
reconciliation_result
approval_record
publication_event
rollback_or_supersession_plan

Before publish:

source fingerprint == staged output explainability fingerprint
old output is not mutated directly
staged output passes quality and reconciliation
publication is atomic at partition/snapshot boundary

Backfill reconciliation checklist:

CheckWhy
Input scope recordedPrevent accidental wider/narrower reprocessing
Transform version pinnedPrevent non-reproducible outputs
Reference data version pinnedPrevent invisible enrichment drift
Output partition fingerprint storedEnables rollback comparison
Prior output fingerprint storedEnables before/after review
Difference summary generatedHelps reviewers understand impact
Publication event storedProvides evidence of promotion

Backfill is not just a technical rerun. It is a controlled data mutation campaign.


16. Reconciliation Severity

Not every mismatch has the same severity.

A practical severity model:

SeverityMeaningAction
P0Published data is materially wrong or unsafeStop publication, incident, rollback/restatement
P1Critical asset has unexplained mismatch before publishBlock publish, investigate
P2Non-critical mismatch within contained partitionQuarantine affected scope, continue unaffected partitions
P3Known tolerated driftRecord exception, monitor
P4Inconclusive due to missing evidenceFail closed for critical assets, warn for exploratory assets

Make severity policy data-driven:

asset: gold.enforcement_monthly_report
reconciliation:
  required: true
  failClosed: true
  checks:
    - method: COUNT
      tolerance: 0
      severityOnFailure: P1
    - method: BALANCE_TOTAL
      fields: [assessed_penalty_minor_units]
      tolerance: 0
      severityOnFailure: P0
    - method: CHECKSUM
      tolerance: 0
      severityOnFailure: P1

17. Java Reconciliation Engine Skeleton

A simple reconciliation engine should separate:

  • definition,
  • metric extraction,
  • comparison,
  • decision,
  • evidence storage.
public interface ReconciliationCheck {
    ReconciliationMethod method();
    ReconciliationScope scope();
    MetricSet expected(ReconciliationContext context);
    MetricSet actual(ReconciliationContext context);
    ReconciliationDecision compare(MetricSet expected, MetricSet actual);
}

public record ReconciliationContext(
    String pipelineId,
    String runId,
    String assetName,
    String transformVersion,
    Map<String, String> parameters
) {}

public record MetricSet(Map<String, MetricValue> values) {}

public sealed interface MetricValue permits LongMetric, DecimalMetric, StringMetric, HashMetric {}

public record LongMetric(long value) implements MetricValue {}
public record DecimalMetric(java.math.BigDecimal value) implements MetricValue {}
public record StringMetric(String value) implements MetricValue {}
public record HashMetric(String algorithm, String value) implements MetricValue {}

public record ReconciliationDecision(
    ReconciliationStatus status,
    List<ReconciliationDifference> differences,
    String explanation
) {}

public record ReconciliationDifference(
    String metricName,
    String expected,
    String actual,
    String severity,
    String explanation
) {}

Runner:

import java.util.ArrayList;
import java.util.List;

public final class ReconciliationRunner {
    private final ReconciliationResultStore resultStore;

    public ReconciliationRunner(ReconciliationResultStore resultStore) {
        this.resultStore = resultStore;
    }

    public List<ReconciliationResult> run(
        ReconciliationContext context,
        List<ReconciliationCheck> checks
    ) {
        List<ReconciliationResult> results = new ArrayList<>();

        for (ReconciliationCheck check : checks) {
            MetricSet expected = check.expected(context);
            MetricSet actual = check.actual(context);
            ReconciliationDecision decision = check.compare(expected, actual);

            ReconciliationResult result = new ReconciliationResult(
                java.util.UUID.randomUUID().toString(),
                context.pipelineId(),
                context.runId(),
                context.assetName(),
                check.scope(),
                check.method(),
                decision.status(),
                expected.values(),
                actual.values(),
                decision.differences(),
                context.transformVersion(),
                java.time.Instant.now()
            );

            resultStore.save(result);
            results.add(result);
        }

        return results;
    }
}

public interface ReconciliationResultStore {
    void save(ReconciliationResult result);
}

This is intentionally small. The hard part is not the interface; the hard part is choosing the correct scope, grain, metric, and action.


18. Reconciliation and Publication Gates

For production assets, reconciliation should often be a publication gate.

Important: reconcile staging output before publishing it.

Bad pattern:

overwrite production partition
then reconcile
then discover mismatch

Better pattern:

write staging partition
reconcile staging against source
publish atomically
store publication event

For Iceberg-style table formats, snapshot-based publication makes this much safer because a snapshot can represent a coherent table state.


19. Reconciliation for Regulatory Defensibility

In regulatory systems, reconciliation must answer entity-specific questions.

For example:

Why is case CASE-2026-00931 in the monthly enforcement report?

A defensible answer should include:

case_id: CASE-2026-00931
source events:
  - CASE_OPENED at valid time T1, recorded time R1
  - CASE_ESCALATED at valid time T2, recorded time R2
  - PENALTY_ASSESSED at valid time T3, recorded time R3
transform version: enforcement-report-transform:4.2.1
reporting rule version: monthly-enforcement-rules:2026.07
window: 2026-07-01T00:00:00Z to 2026-08-01T00:00:00Z
output row fingerprint: sha256:...
reconciliation status: PASSED
publication snapshot: iceberg snapshot 918273645

For the opposite question:

Why is case CASE-2026-00412 not in the report?

You need skip/rejection evidence:

case_id: CASE-2026-00412
terminal decision: SKIPPED
reason: case did not reach reportable status before business cutoff
rule version: monthly-enforcement-rules:2026.07
validated_at: 2026-08-01T03:01:12Z

Without terminal decisions, absence is hard to explain.


20. Reconciliation Anti-Patterns

Anti-Pattern 1: “Job Succeeded, Therefore Data Is Correct”

A successful job proves execution, not correctness.

Anti-Pattern 2: Count Only

Count checks miss swapped, duplicated, or mutated records.

Anti-Pattern 3: Reconcile with Different Time Semantics

Source uses business time, target uses processing time. The check is meaningless.

Anti-Pattern 4: Reconcile After Destructive Publish

If reconciliation fails, you already damaged production data.

Anti-Pattern 5: No Difference Drilldown

A red reconciliation check with no examples slows incident response.

Anti-Pattern 6: No Exception Registry

Known accepted differences become tribal knowledge.

Anti-Pattern 7: No Reconciliation Version

When rules change, old results become impossible to interpret.

Anti-Pattern 8: Treating Quarantine as Data Loss

Quarantine is valid if it is explicitly accounted for and resolved.

Anti-Pattern 9: Ignoring Deletes

Deletes must be reconciled separately because they often vanish from current-state views.

Anti-Pattern 10: Sampling Critical Data

Sampling is useful for exploratory or low-risk assets. It is not sufficient for ledger, regulatory, or financial correctness.


21. Production Checklist

Before calling a pipeline production-grade, ask:

[ ] Does every critical asset have declared reconciliation methods?
[ ] Is reconciliation scope explicit and tied to run manifest?
[ ] Are source positions recorded?
[ ] Are transform and contract versions recorded?
[ ] Are source, target, and exception counts compared?
[ ] Are checksums/fingerprints used where counts are insufficient?
[ ] Are business balances reconciled for amount-bearing facts?
[ ] Are deletes/retractions/corrections reconciled separately?
[ ] Is reconciliation performed before destructive publication?
[ ] Are failed reconciliations tied to block/warn/quarantine policy?
[ ] Are differences stored with enough examples to debug?
[ ] Are results queryable by asset, run, partition, time, and source position?
[ ] Is reconciliation evidence retained according to audit policy?
[ ] Are backfills reconciled more strictly than normal runs?
[ ] Are known exceptions explicit, approved, and expiring?

22. Key Takeaways

Reconciliation is the difference between a data pipeline and a data accounting system.

The deeper lesson:

A pipeline should not merely move data.
It should produce evidence that the movement was complete, correct, bounded, and explainable.

Use count checks for basic volume. Use checksum and hash totals for identity. Use balance checks for business amounts. Use anti-joins for drilldown. Use ledger-style accounting for critical pipelines. Use offset-to-effect reconciliation for Kafka. Use manifest reconciliation for files. Use snapshot and delete reconciliation for CDC. Use window reconciliation for streaming aggregates. Use stricter reconciliation for backfill.

A production-grade pipeline does not say:

Trust me, I ran.

It says:

Here is the input scope, output scope, transform version, reconciliation proof, exceptions, and publication evidence.

That is the standard you should build toward.

Lesson Recap

You just completed lesson 69 in deepen practice. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.

Continue The Track

Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.