Deepen PracticeOrdered learning track

Flink Joins and Enrichment

Learn Java Data Pipeline Pattern - Part 047

Flink joins and enrichment patterns for production Java data pipelines: broadcast state, async I/O, temporal joins, versioned dimensions, missing reference policy, state cost, and replay-safe enrichment.

18 min read3475 words
PrevNext
Lesson 4784 lesson track46–69 Deepen Practice
#java#data-pipeline#flink#stream-processing+3 more

Part 047 — Flink Joins and Enrichment

Join adalah operasi yang terlihat familiar dari SQL, tetapi di stream processing join bukan sekadar:

SELECT *
FROM event e
JOIN dimension d ON e.key = d.key

Di pipeline production, join adalah pertanyaan correctness:

Saat sebuah event diproses, versi reference data mana yang sah dipakai?

Itu pertanyaan yang jauh lebih tajam daripada “bagaimana join dua dataset”.

Contoh domain enforcement lifecycle:

  • CaseEscalated perlu diperkaya dengan current officer assignment.
  • InspectionCompleted perlu diperkaya dengan facility risk profile.
  • PenaltyCalculated perlu diperkaya dengan regulation version yang berlaku saat pelanggaran terjadi.
  • SlaBreached perlu diperkaya dengan holiday calendar, jurisdiction, dan case priority.

Setiap contoh punya semantics waktu yang berbeda. “Current officer assignment” mungkin boleh latest-state. “Regulation version yang berlaku saat pelanggaran terjadi” tidak boleh latest-state. Ia butuh versioned temporal lookup.

Join yang salah tidak selalu crash. Ia menghasilkan data yang valid secara teknis tetapi salah secara bisnis.


1. The Core Mental Model

Flink enrichment dapat dimodelkan seperti ini:

Operator J harus menjawab lima hal:

PertanyaanContoh
Join keyfacilityId, caseId, officerId, jurisdictionCode
Reference sourceCDC topic, broadcast config stream, external API, database lookup, file snapshot
Time semanticslatest, event-time version, processing-time version, effective business time
Missing policyfail, delay, quarantine, default, partial output, retry
State boundarykeyed state, broadcast state, external cache, temporal table, async lookup

Pattern join yang benar dipilih dari kombinasi lima jawaban itu.


2. Join Is Not One Pattern

Di production, “join” biasanya berarti salah satu dari pattern berikut.

PatternUse CaseCorrectness Risk
Static in-memory lookupSmall config loaded at startupConfig stale, no audit of version
Broadcast state joinSmall/medium reference stream distributed to all tasksState growth, inconsistent version during deployment
Keyed stream-stream joinTwo event streams correlated by key/timeUnbounded state, late/missing counterpart
Temporal table joinFact stream joined with versioned table as of event timeWrong time attribute, version gaps
Async I/O lookupExternal DB/API enrichmentTimeout, overload, nondeterministic replay
Cache-aside lookupExternal reference with local cacheStale cache, hidden invalidation policy
Pre-materialized view joinReference is prepared upstreamFreshness lag, ownership confusion
Offline batch joinBackfill/reporting joinDifferent logic from online path

The top 1% mindset: jangan pilih API dulu. Pilih semantics dulu.


3. Enrichment Correctness Invariants

Sebelum menulis Flink code, tulis invariant berikut.

For every input event E, enrichment output O must be reproducible from:
  E
  reference state version V
  transform version T
  missing-reference policy M
  event-time policy W

Jika output tidak bisa direproduksi, pipeline tidak bisa diaudit.

3.1 Required Invariants

InvariantMeaning
Stable join keyKey tidak berubah diam-diam antar layer
Explicit reference versionOutput tahu reference version yang dipakai
Explicit time basisJoin memakai event time, processing time, ingestion time, atau business time secara eksplisit
Deterministic missing policyMissing reference tidak diperlakukan random per retry
Bounded stateState punya TTL/window/compaction strategy
Replay-safe outputReprocessing event menghasilkan output yang sama atau correction yang sah
Observable freshnessReference lag terlihat, bukan tersembunyi
Auditable enrichmentOutput membawa metadata enrichment

Minimal enriched event metadata:

public record EnrichmentMetadata(
    String referenceDataset,
    String referenceKey,
    String referenceVersion,
    Instant referenceObservedAt,
    Instant lookupEventTime,
    String policy,
    boolean partial,
    String transformVersion
) {}

Tanpa metadata ini, debugging enrichment berubah menjadi forensik manual.


4. Latest-State vs Temporal-State

Kesalahan paling umum: memakai latest-state untuk problem yang membutuhkan temporal-state.

Latest-State Join

Latest-state berarti:

Gunakan reference value terbaru yang diketahui operator saat event diproses.

Cocok untuk:

  • dashboard current status,
  • low-risk label enrichment,
  • routing decision non-audit,
  • operational alert yang memang melihat kondisi saat ini.

Tidak cocok untuk:

  • legal/regulatory calculation,
  • financial amount,
  • historical reporting,
  • breach determination,
  • anything requiring “what was true at the time”.

Temporal-State Join

Temporal-state berarti:

Gunakan reference value yang berlaku pada waktu tertentu.

Waktu tersebut bisa:

Time BasisMeaning
Event timeSaat event bisnis terjadi
Business effective timeSaat policy/regulation berlaku
Source commit timeSaat perubahan committed di source DB
Processing timeSaat operator memproses event

Untuk enforcement lifecycle, temporal-state biasanya lebih benar.

Contoh:

Penalty event:
  violationTime = 2026-06-01T10:00:00Z
  processedAt   = 2026-07-04T08:00:00Z

Regulation changed:
  version A valid until 2026-06-15
  version B valid from 2026-06-15

Correct enrichment: version A
Wrong latest enrichment: version B

5. Pattern Selection Matrix

RequirementRecommended Pattern
Small reference data, frequently updatedBroadcast state
Huge reference table, lookup by keyAsync I/O lookup or lookup join
Need event-time historical correctnessTemporal table join / versioned state
Need correlate two event streamsInterval join / keyed co-process
Need low latency and reference is smallBroadcast state
Need high throughput and reference changes slowlyPre-materialized local state
Need deterministic backfillAvoid live external lookup; use versioned snapshot/table
Need audit-grade resultTemporal/versioned join with metadata
Need best-effort UI decorationLatest-state async enrichment may be fine

6. Broadcast State Pattern

Broadcast state is useful when reference data is small enough to replicate to all parallel operator instances.

Conceptually:

The reference stream updates a map-like state available to all tasks.

6.1 When Broadcast State Fits

Use it when:

  • reference data is small or medium,
  • updates are not extremely high volume,
  • all partitions may need access to all reference keys,
  • lookup must be low-latency,
  • external lookup is too slow or unreliable,
  • reference changes need to be part of Flink checkpointed state.

Examples:

  • jurisdiction rules,
  • risk category mapping,
  • holiday calendar,
  • feature flags for pipeline behavior,
  • policy thresholds,
  • code-to-description dictionary.

Avoid it when:

  • reference table has millions of large records,
  • reference churn is very high,
  • reference contains sensitive data that should not be replicated widely,
  • update ordering is complex and versioned temporal semantics are required.

7. Broadcast State Java Skeleton

A common shape:

public record CaseEvent(
    String caseId,
    String jurisdictionCode,
    Instant eventTime,
    String eventType
) {}

public record JurisdictionRule(
    String jurisdictionCode,
    String ruleVersion,
    boolean escalationRequiresSupervisor,
    Instant updatedAt
) {}

public record EnrichedCaseEvent(
    CaseEvent event,
    JurisdictionRule rule,
    EnrichmentMetadata metadata
) {}

Broadcast state descriptor:

MapStateDescriptor<String, JurisdictionRule> ruleStateDescriptor =
    new MapStateDescriptor<>(
        "jurisdiction-rules",
        Types.STRING,
        TypeInformation.of(JurisdictionRule.class)
    );

Connect main stream with broadcast reference stream:

BroadcastStream<JurisdictionRule> rules = ruleUpdates.broadcast(ruleStateDescriptor);

DataStream<EnrichedCaseEvent> enriched = caseEvents
    .keyBy(CaseEvent::jurisdictionCode)
    .connect(rules)
    .process(new JurisdictionEnrichmentFunction(ruleStateDescriptor));

Process function:

public final class JurisdictionEnrichmentFunction
    extends KeyedBroadcastProcessFunction<String, CaseEvent, JurisdictionRule, EnrichedCaseEvent> {

  private final MapStateDescriptor<String, JurisdictionRule> descriptor;

  public JurisdictionEnrichmentFunction(
      MapStateDescriptor<String, JurisdictionRule> descriptor
  ) {
    this.descriptor = descriptor;
  }

  @Override
  public void processBroadcastElement(
      JurisdictionRule rule,
      Context ctx,
      Collector<EnrichedCaseEvent> out
  ) throws Exception {
    BroadcastState<String, JurisdictionRule> state = ctx.getBroadcastState(descriptor);

    JurisdictionRule existing = state.get(rule.jurisdictionCode());

    if (existing == null || rule.updatedAt().isAfter(existing.updatedAt())) {
      state.put(rule.jurisdictionCode(), rule);
    }
  }

  @Override
  public void processElement(
      CaseEvent event,
      ReadOnlyContext ctx,
      Collector<EnrichedCaseEvent> out
  ) throws Exception {
    ReadOnlyBroadcastState<String, JurisdictionRule> state = ctx.getBroadcastState(descriptor);
    JurisdictionRule rule = state.get(event.jurisdictionCode());

    if (rule == null) {
      // In real production code, use side output or missing-reference lane.
      return;
    }

    out.collect(new EnrichedCaseEvent(
        event,
        rule,
        new EnrichmentMetadata(
            "jurisdiction-rules",
            event.jurisdictionCode(),
            rule.ruleVersion(),
            rule.updatedAt(),
            event.eventTime(),
            "require-reference-present",
            false,
            "case-enrichment-v1"
        )
    ));
  }
}

This is not complete production code yet. It lacks missing-reference side output, metrics, schema version, deletion handling, and temporal versioning. But it shows the shape.


8. Broadcast State Failure Model

FailureWhat HappensDefensive Design
Reference update lost before KafkaState never sees itSource reconciliation
Reference update duplicatedSame rule applied twiceVersioned update/idempotent put
Reference update reorderedOlder rule overwrites newer ruleCompare version/event time
Rule deletedOld rule remains foreverTombstone handling
Main event arrives before referenceMissing enrichmentDelay/quarantine/default policy
State grows foreverRocksDB/memory pressureTTL, compaction, active cleanup
Deployment changes state shapeRestore failureStable UID, state migration plan
Backfill uses current rulesHistorical output wrongTemporal versioned state

Broadcast state is easy to start and easy to misuse.


9. Missing Reference Policy

Missing reference is not an exception. It is a business policy.

Options:

PolicyBehaviorUse When
Fail fastThrow and restart/fail jobReference must always exist and source is trusted
DropIgnore eventAlmost never appropriate for audit data
Partial outputEmit event with partial=trueDownstream can handle incomplete enrichment
QuarantineSend to side outputReference may arrive later or needs investigation
Delay and retryBuffer until reference appearsBounded wait is acceptable
Default valueUse explicit defaultDomain has safe default
External fallbackQuery source of truthLow volume, fallback is reliable

Production recommendation:

For regulatory/event-sourced data:
  prefer quarantine or partial output over silent drop.

A missing reference lane should include:

public record MissingReference(
    String inputEventId,
    String referenceDataset,
    String referenceKey,
    Instant eventTime,
    String pipelineName,
    String transformVersion,
    String reason
) {}

Use side output:

public static final OutputTag<MissingReference> MISSING_REFERENCE =
    new OutputTag<>("missing-reference") {};

Then in processElement:

if (rule == null) {
  ctx.output(MISSING_REFERENCE, new MissingReference(
      event.caseId(),
      "jurisdiction-rules",
      event.jurisdictionCode(),
      event.eventTime(),
      "case-enrichment",
      "v1",
      "REFERENCE_NOT_FOUND"
  ));
  return;
}

The key point: missing reference should be visible, counted, routed, and replayable.


10. Versioned Broadcast State

If reference data has versions, do not store only latest value.

Instead of:

MapState<String, Rule> latestRuleByJurisdiction;

Use:

MapState<String, NavigableMap<Instant, Rule>> ruleTimelineByJurisdiction;

Conceptually:

jurisdiction = ID-JK
  2025-01-01 -> rule-v1
  2026-01-01 -> rule-v2
  2026-06-15 -> rule-v3

Lookup:

eventTime = 2026-03-01
choose floorEntry(eventTime) => rule-v2

Production issue: Java NavigableMap inside Flink state can become large and serialization-heavy. For large versioned dimensions, prefer Table/SQL temporal join or external versioned store designed for range lookup.


11. Async I/O Enrichment

Async I/O is useful when the reference data is too large to hold inside Flink state.

Use it when:

  • reference table is huge,
  • only a small subset is needed,
  • lookup service is designed for high QPS,
  • latency budget can tolerate remote call,
  • replay determinism is not strict or reference lookup is versioned.

Avoid it when:

  • external lookup is not versioned but historical correctness matters,
  • external system cannot handle replay/backfill load,
  • lookup latency is unstable,
  • output must be reproducible years later,
  • enrichment must continue during reference service outage.

12. Async I/O Java Skeleton

Input and output:

public record FacilityEvent(
    String eventId,
    String facilityId,
    Instant eventTime,
    String eventType
) {}

public record FacilityProfile(
    String facilityId,
    String riskBand,
    String version,
    Instant effectiveFrom
) {}

public record EnrichedFacilityEvent(
    FacilityEvent event,
    FacilityProfile profile,
    EnrichmentMetadata metadata
) {}

Async function:

public final class FacilityProfileAsyncFunction
    extends RichAsyncFunction<FacilityEvent, EnrichedFacilityEvent> {

  private transient FacilityProfileClient client;

  @Override
  public void open(Configuration parameters) {
    this.client = new FacilityProfileClient(/* connection config */);
  }

  @Override
  public void asyncInvoke(
      FacilityEvent event,
      ResultFuture<EnrichedFacilityEvent> resultFuture
  ) {
    client.lookup(event.facilityId(), event.eventTime())
        .whenComplete((profile, error) -> {
          if (error != null) {
            resultFuture.completeExceptionally(error);
            return;
          }

          if (profile == null) {
            resultFuture.complete(List.of());
            return;
          }

          resultFuture.complete(List.of(new EnrichedFacilityEvent(
              event,
              profile,
              new EnrichmentMetadata(
                  "facility-profile",
                  event.facilityId(),
                  profile.version(),
                  profile.effectiveFrom(),
                  event.eventTime(),
                  "async-versioned-lookup",
                  false,
                  "facility-enrichment-v1"
              )
          )));
        });
  }

  @Override
  public void timeout(
      FacilityEvent input,
      ResultFuture<EnrichedFacilityEvent> resultFuture
  ) {
    resultFuture.completeExceptionally(
        new RuntimeException("Facility profile lookup timeout for " + input.facilityId())
    );
  }
}

Apply async I/O:

DataStream<EnrichedFacilityEvent> enriched = AsyncDataStream
    .unorderedWait(
        facilityEvents,
        new FacilityProfileAsyncFunction(),
        3,
        TimeUnit.SECONDS,
        500
    );

12.1 Ordered vs Unordered Wait

ModeMeaningUse When
unorderedWaitOutput may arrive in lookup completion orderOrdering not required, better throughput
orderedWaitOutput preserves input orderDownstream depends on order, lower throughput

In most enrichment pipelines, ordering should not be used as hidden correctness. Prefer unordered plus explicit event identity, unless downstream really needs per-key order.


13. Async I/O Failure Model

FailureImpactDefensive Pattern
TimeoutEvent fails or is retriedTimeout budget + side output
Reference service slowBackpressureConcurrency limit + circuit breaker
Reference service outageJob instabilityDegrade/quarantine policy
Lookup returns current valueHistorical backfill wrongVersioned lookup by event time
Lookup has side effectsReplay unsafeLookup must be read-only
High replay volumeReference service overloadOffline snapshot or bulk join
Partial responseBad enrichmentResponse validation
Non-deterministic responseAudit failureInclude reference version in output

Async I/O is operationally dangerous when it hides an external dependency inside stream processing.

A stream processor with async lookup is not just Flink anymore. It is a distributed system involving Flink, network, external service, its database, its cache, and its deployment cycle.


14. Cache-Aside Enrichment

A common optimization:

for each event:
  if local cache has key:
    use cache
  else:
    call external lookup
    store in cache

This is fine for low-risk decoration. It is dangerous for audit-grade transformation unless cache semantics are explicit.

Questions:

  • What is the TTL?
  • Is TTL processing-time or event-time?
  • Is cache invalidated by CDC update?
  • Does cache store versioned values?
  • Does replay bypass cache?
  • Does output include cache hit/miss metadata?
  • Does cache survive restart?

A better cache entry:

public record ReferenceCacheEntry<T>(
    String key,
    T value,
    String version,
    Instant effectiveFrom,
    Instant loadedAt,
    Instant expiresAt
) {}

Do not hide cache from the output. At least expose metrics:

reference_cache_hit_total
reference_cache_miss_total
reference_cache_expired_total
reference_lookup_latency_ms
reference_lookup_timeout_total
reference_version_missing_total

15. Temporal Joins

Temporal join answers:

What was the reference value as of this event's time?

This is the correct pattern for many regulatory and historical pipelines.

Example:

CaseEvent(caseId=C-1, jurisdiction=JKT, eventTime=2026-05-10)
JurisdictionRule(JKT, version=v1, validFrom=2026-01-01)
JurisdictionRule(JKT, version=v2, validFrom=2026-06-01)

Join result for eventTime 2026-05-10 => v1

In Flink SQL/Table world, this is commonly represented with a versioned table and FOR SYSTEM_TIME AS OF style temporal semantics.

Conceptual SQL:

SELECT
  e.case_id,
  e.event_time,
  e.jurisdiction_code,
  r.rule_version,
  r.escalation_requires_supervisor
FROM case_events AS e
LEFT JOIN jurisdiction_rules FOR SYSTEM_TIME AS OF e.event_time AS r
ON e.jurisdiction_code = r.jurisdiction_code;

The important part is not syntax. The important part is the time basis.

15.1 Temporal Join Requirements

RequirementWhy It Matters
Main stream has event-time attributeJoin needs as-of time
Reference table is versionedHistorical lookup must be possible
Primary key is definedLookup identity must be stable
Watermark is saneLate data semantics need bounded progress
Version gaps handledNo reference may exist for some period
Reference corrections modeledHistorical reference may be corrected

16. Lookup Join vs Temporal Join

These are often confused.

Join TypeMeaning
Lookup joinQuery an external table/system at processing time or lookup time
Temporal joinJoin against a version of a table as of a specific time

A lookup join can be versioned if the lookup key includes time:

lookup(facilityId, eventTime)

A lookup join is not automatically temporal if it only does:

lookup(facilityId)

That returns latest/current, not historical.


17. Stream-Stream Join

Sometimes both sides are event streams.

Example:

  • InspectionStarted
  • InspectionCompleted

You want duration and outcome.

Important questions:

  • How long can completion arrive after start?
  • What if completion arrives first?
  • What if one side never arrives?
  • What if duplicate start/completion arrives?
  • What if event is corrected?
  • What if start and completion use different clocks?

A stream-stream join always implies state retention. If you do not bound it, state grows forever.


18. Keyed CoProcess Join Skeleton

For custom stream-stream correlation, use keyed state and timers.

public record InspectionStarted(
    String inspectionId,
    String caseId,
    Instant startedAt
) {}

public record InspectionCompleted(
    String inspectionId,
    String result,
    Instant completedAt
) {}

public record InspectionDurationCalculated(
    String inspectionId,
    String caseId,
    Duration duration,
    String result
) {}

Function shape:

public final class InspectionJoinFunction extends KeyedCoProcessFunction<
    String,
    InspectionStarted,
    InspectionCompleted,
    InspectionDurationCalculated> {

  private transient ValueState<InspectionStarted> startedState;
  private transient ValueState<InspectionCompleted> completedState;

  @Override
  public void open(Configuration parameters) {
    startedState = getRuntimeContext().getState(
        new ValueStateDescriptor<>("inspection-started", InspectionStarted.class)
    );
    completedState = getRuntimeContext().getState(
        new ValueStateDescriptor<>("inspection-completed", InspectionCompleted.class)
    );
  }

  @Override
  public void processElement1(
      InspectionStarted started,
      Context ctx,
      Collector<InspectionDurationCalculated> out
  ) throws Exception {
    InspectionCompleted completed = completedState.value();

    if (completed != null) {
      emit(started, completed, out);
      clear();
    } else {
      startedState.update(started);
      ctx.timerService().registerEventTimeTimer(started.startedAt().plus(Duration.ofDays(7)).toEpochMilli());
    }
  }

  @Override
  public void processElement2(
      InspectionCompleted completed,
      Context ctx,
      Collector<InspectionDurationCalculated> out
  ) throws Exception {
    InspectionStarted started = startedState.value();

    if (started != null) {
      emit(started, completed, out);
      clear();
    } else {
      completedState.update(completed);
      ctx.timerService().registerEventTimeTimer(completed.completedAt().plus(Duration.ofDays(7)).toEpochMilli());
    }
  }

  @Override
  public void onTimer(
      long timestamp,
      OnTimerContext ctx,
      Collector<InspectionDurationCalculated> out
  ) throws Exception {
    // In production, emit missing-counterpart side output.
    clear();
  }

  private void emit(
      InspectionStarted started,
      InspectionCompleted completed,
      Collector<InspectionDurationCalculated> out
  ) {
    out.collect(new InspectionDurationCalculated(
        started.inspectionId(),
        started.caseId(),
        Duration.between(started.startedAt(), completed.completedAt()),
        completed.result()
    ));
  }

  private void clear() throws Exception {
    startedState.clear();
    completedState.clear();
  }
}

This pattern is powerful because it makes missing counterpart and timeout explicit.


19. Join Output Semantics

Every join output should declare its output mode.

ModeMeaning
AppendOnce emitted, never changed
UpsertSame key may be emitted again with newer version
RetractPrevious result can be invalidated
CorrectionNew event corrects prior output
PartialOutput may be incomplete

Example:

public enum OutputMode {
  APPEND,
  UPSERT,
  RETRACT,
  CORRECTION,
  PARTIAL
}

Why it matters:

  • A reporting table can handle upsert.
  • A Kafka fact topic may not accept mutation semantics.
  • An audit ledger should not silently replace prior facts.
  • A downstream alert engine may need retraction to avoid false alerts.

20. Reference Data Modeling

Reference data should be modeled as a stream or table with explicit lifecycle.

Bad:

{
  "facilityId": "F-100",
  "riskBand": "HIGH"
}

Better:

{
  "facilityId": "F-100",
  "riskBand": "HIGH",
  "version": "risk-profile-v7",
  "validFrom": "2026-01-01T00:00:00Z",
  "validTo": null,
  "sourceCommitTime": "2026-01-02T03:10:00Z",
  "deleted": false,
  "schemaVersion": "facility-risk-profile.v2"
}

Reference table columns:

FieldPurpose
reference_keyJoin key
versionReproducible enrichment
valid_fromTemporal lookup start
valid_toTemporal lookup end
source_commit_timeSource change order
payload_hashChange detection
deletedTombstone semantics
classificationPrivacy/security policy

21. CDC-Derived Reference Tables

A common production pattern:

Critical detail: CDC event time is not the same as business effective time.

TimeMeaning
Source commit timeDB transaction committed
CDC read timeConnector observed log entry
Ingestion timeEvent entered Kafka/Flink
Business effective timeReference value applies from this time

For regulatory logic, business effective time is often the correct join time. For replication/materialization, source commit time may be enough.


22. Enrichment as a Contract

An enrichment stage should have a contract like:

name: case-jurisdiction-enrichment
input: case-events.v3
reference:
  dataset: jurisdiction-rules.v2
  key: jurisdictionCode
  timeBasis: eventTime
  versionRequired: true
missingPolicy: quarantine
outputMode: append
output: enriched-case-events.v1
metadata:
  includeReferenceVersion: true
  includeTransformVersion: true
  includePartialFlag: true

This contract lets platform tooling answer:

  • Which pipelines depend on jurisdiction-rules.v2?
  • What breaks if jurisdictionCode changes?
  • Which outputs used rule version v17?
  • Can this pipeline be backfilled deterministically?
  • How many records were quarantined because reference was missing?

23. Replay and Backfill

Backfill changes the economics of enrichment.

Live stream volume:

500 events/sec

Backfill volume:

5 years of history at maximum cluster throughput

If enrichment calls an external API, backfill can destroy that API.

Backfill-safe enrichment usually needs one of these:

StrategyDescription
Versioned reference snapshotJoin against historical reference dataset
Preloaded broadcast stateLoad reference before backfill
Offline batch joinUse batch engine/lakehouse table
Synthetic replay modeReplace live lookup with deterministic reference source
Throttled async lookupAccept slower backfill with strict QPS cap

Do not run historical backfill through a live lookup path unless it was designed for it.


24. Enrichment State Cost Model

State cost depends on:

number_of_keys
× versions_per_key
× serialized_value_size
× retention_horizon
× replication/checkpoint overhead

Example:

500,000 facilities
× 12 versions average
× 2 KB/profile
≈ 12 GB logical state

After RocksDB overhead, indexes, checkpointing, and serialization, real footprint can be much larger.

Questions before choosing stateful enrichment:

  • How many reference keys exist?
  • How many versions per key?
  • How long must history be retained?
  • What is the largest payload?
  • Are hot keys expected?
  • How fast does reference churn?
  • What is checkpoint interval and storage cost?
  • How long can recovery take?

25. Hot Key Problems

Join key skew can create one overloaded operator.

Examples:

  • jurisdictionCode = NATIONAL
  • tenantId = default
  • facilityType = UNKNOWN
  • caseOwner = unassigned

Symptoms:

one subtask has high busy time
one key has huge state
watermark stalls
checkpoint alignment slows
lag grows on one partition

Mitigations:

ApproachTrade-off
Better keyBest fix when domain allows
Salt keyRequires downstream recombine
Split hot categoryDomain-specific routing
Broadcast referenceRemoves keyed lookup pressure for reference side
Pre-aggregateReduces event volume
Dedicated pipeline for hot keyOperational complexity

26. Privacy and Security Boundary

Enrichment often increases sensitivity.

Input:

caseId, facilityId, eventType

After enrichment:

caseId, facilityId, facilityName, address, ownerName, riskBand, jurisdiction

That output may have different classification than input.

Enrichment contract must define:

  • allowed fields,
  • masking/tokenization rules,
  • output classification,
  • access policy,
  • retention policy,
  • audit logging,
  • downstream sharing limits.

Do not treat enrichment as harmless decoration. It often creates sensitive derived data.


27. Observability

Minimum metrics:

enrichment_input_total
enrichment_output_total
enrichment_missing_reference_total
enrichment_partial_output_total
enrichment_lookup_latency_ms
enrichment_lookup_timeout_total
enrichment_reference_update_total
enrichment_reference_lag_ms
enrichment_state_size_bytes
enrichment_cache_hit_ratio
enrichment_quarantine_total
enrichment_version_unknown_total

Useful dimensions:

pipeline
reference_dataset
reference_version
tenant
jurisdiction
missing_reason
lookup_mode
output_mode

Dashboards should show:

  • input vs output rate,
  • missing reference spike,
  • reference lag,
  • async timeout rate,
  • state size trend,
  • checkpoint duration,
  • hot key symptoms,
  • watermark lag,
  • quarantine backlog.

28. Testing Enrichment

28.1 Unit Tests

Test pure mapping:

input event + reference value => enriched output

Include:

  • missing reference,
  • deleted reference,
  • stale reference,
  • future-dated reference,
  • correction event,
  • duplicate event,
  • replay mode.

28.2 Temporal Tests

Given:

rule-v1 valid from 2026-01-01
rule-v2 valid from 2026-06-01

Assert:

event at 2026-05-01 uses v1
event at 2026-06-01 uses v2
event at 2025-12-01 is missing/no-rule

28.3 Replay Tests

Run same input twice:

same input + same reference snapshot + same transform version => same output

28.4 Failure Tests

Inject:

  • reference update reorder,
  • duplicate reference update,
  • missing reference,
  • lookup timeout,
  • lookup slow response,
  • external service outage,
  • checkpoint restore,
  • late main event,
  • late reference update.

29. Production Runbook

When enrichment output is wrong:

  1. Identify affected output key range.
  2. Identify transform version.
  3. Identify reference dataset and version used.
  4. Check reference update lag at event time.
  5. Check missing/quarantine counters.
  6. Compare output with deterministic replay from reference snapshot.
  7. Decide whether to emit correction, rebuild materialized view, or reprocess topic range.
  8. Document root cause in reference contract or transform contract.

When missing reference spikes:

  1. Check source reference pipeline health.
  2. Check schema changes in reference events.
  3. Check join key normalization.
  4. Check watermark/late data policy.
  5. Check deployment version mismatch.
  6. Check new tenant/jurisdiction/code not onboarded.
  7. Check source deletion/tombstone semantics.

30. Common Anti-Patterns

Anti-PatternWhy It Fails
Calling production API for every eventOverload, nondeterministic replay
Latest-state join for historical calculationWrong historical output
Silent default on missing referenceHides data quality issue
No reference version in outputCannot audit enrichment
Unbounded stream-stream joinState grows forever
Broadcast huge tableMemory/checkpoint explosion
Processing-time temporal logicNon-reproducible output
Cache with hidden TTLStale and unauditable enrichment
Joining on human-readable nameKey drift and collisions
No side output for bad enrichmentEither job fails or data disappears

31. Architecture Review Checklist

Before approving an enrichment pipeline, ask:

  • What is the join key and is it stable?
  • Is the reference latest-state or temporal-state?
  • What time basis is used?
  • What happens when reference is missing?
  • Is output append, upsert, retract, correction, or partial?
  • Is reference version included in output?
  • Can output be reproduced during audit?
  • Can backfill run without overloading external systems?
  • How large can state become?
  • What is the TTL/retention policy?
  • What are the top missing-reference causes?
  • How are reference deletes handled?
  • How are reference corrections handled?
  • Does enrichment increase data sensitivity?
  • What metrics prove correctness and freshness?

32. Practical Decision Tree


33. Closing Mental Model

Flink joins are not about combining two streams. They are about making a statement:

This output fact was produced from this input fact, using this reference state, under this time policy, with this missing-reference policy, at this transform version.

That sentence is what production-grade enrichment must preserve.

API choices come later:

  • Broadcast state for small replicated reference data.
  • Async I/O for large remote lookup.
  • Temporal joins for historical correctness.
  • Keyed state and timers for custom correlation.
  • Offline join for deterministic backfill.

The real skill is knowing which correctness boundary you are building.


References

  • Apache Flink documentation: Broadcast State, Async I/O, joins, state, and event-time processing.
  • Apache Flink SQL documentation: temporal joins and lookup joins.
  • Apache Flink project overview: stateful computations over bounded and unbounded streams.
Lesson Recap

You just completed lesson 47 in deepen practice. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.

Continue The Track

Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.