Deepen PracticeOrdered learning track

Lineage and Impact Analysis

Learn Java Data Pipeline Pattern - Part 067

Lineage and impact analysis for production Java data pipelines: OpenLineage object model, asset graphs, run-level lineage, column-level lineage, blast radius analysis, schema impact, privacy lineage, and Java implementation patterns.

19 min read3671 words
PrevNext
Lesson 6784 lesson track46–69 Deepen Practice
#java#data-pipeline#lineage#impact-analysis+4 more

Part 067 — Lineage and Impact Analysis

A data pipeline without lineage is a distributed system with amnesia.

It may be able to produce data, but when the data is wrong, late, sensitive, stale, duplicated, or structurally changed, the platform cannot answer the important questions:

Where did this value come from?
Which job wrote this table?
Which source position was used?
Which transform version produced this field?
Which downstream reports depend on this column?
Which assets become invalid if this upstream dataset changes?
Which consumers see PII derived from this source?
Which pipeline run introduced the bad output?
Which backfill superseded the previous result?

Lineage is not a pretty diagram. It is the evidence graph of a data platform.

Impact analysis is the operational use of that graph.

A top-tier engineer does not implement lineage as an afterthought. They treat it as part of the pipeline control plane, observability model, contract system, and compliance evidence trail.


1. The Core Model

Data lineage answers:

What produced what, from what, when, using which code/config/schema, under which run context?

Impact analysis answers:

If this thing changes or fails, what else is affected, how badly, and what action should be taken?

They are two sides of the same graph.

Lineage is the graph.

Impact analysis is asking:

If source.case.status changes type from string to enum, who breaks?
If case-normalizer v8 produced wrong event_time, which published assets are tainted?
If gold.case_sla_breach fails freshness SLO, which dashboards and APIs are degraded?

2. Lineage Is Not Just Dependency

Many systems confuse these:

ThingMeaningExample
DependencyStatic relationgold.case_sla_breach depends on silver.case_event
LineageRuntime evidence of productionRun 2026-07-04T01:00Z read snapshot s-189 and produced snapshot s-190
ProvenanceOrigin of a specific value/recordsla_deadline derived from case.accepted_at + policy.response_days
Impact analysisDownstream blast radiuspolicy.response_days change affects gold.case_sla_breach, dashboard, alerts
Data catalogInventory and descriptionTable owner, schema, tags, description
ObservabilityRuntime state and signalsFreshness, row count, rejected records, lag

A dependency graph says what should depend on what.

Runtime lineage says what actually happened.

For production systems, static dependency is insufficient. You need run-level evidence.


3. Lineage Granularity Levels

Lineage has levels. Each level answers a different class of question.

LevelUnitQuestions Answered
System lineageSystem/serviceWhich platform writes to the warehouse?
Pipeline/job lineageJob/DAG/workflowWhich job produces this asset?
Run lineageExecution instanceWhich run produced this version?
Dataset lineageTable/topic/file assetWhich datasets are inputs/outputs?
Partition lineageDate/hour/tenant shardWhich partition is affected?
Column lineageField/columnWhich downstream columns use this field?
Record lineageRecord/eventWhere did this specific case event come from?
Value lineageCell/valueHow was this value calculated?

Do not try to implement value-level lineage everywhere. It is expensive and often unnecessary.

Use the least expensive level that can answer the operational question.

A practical enterprise setup usually needs:

  • dataset-level lineage for platform navigation,
  • run-level lineage for incident investigation,
  • partition-level lineage for backfill and invalidation,
  • column-level lineage for schema impact and privacy,
  • record-level lineage only for regulated/audited workflows.

4. The Minimum Useful Lineage Event

A lineage event should capture:

run identity
job identity
input datasets
output datasets
code version
config version
schema versions
source positions or snapshots
quality result
publication state
owner/service identity
timestamp

Minimal event shape:

{
  "eventType": "COMPLETE",
  "eventTime": "2026-07-04T02:10:12Z",
  "run": {
    "runId": "run-20260704-0200-case-normalizer"
  },
  "job": {
    "namespace": "regulatory-data-platform",
    "name": "case-normalizer"
  },
  "inputs": [
    {
      "namespace": "kafka://prod-cluster",
      "name": "case-events.v3"
    }
  ],
  "outputs": [
    {
      "namespace": "iceberg://prod-catalog/enforcement",
      "name": "silver.case_event"
    }
  ]
}

This is close to the OpenLineage mental model: run, job, input dataset, output dataset, and facets.

The event by itself is not enough for a serious platform, but it is the correct primitive.


5. OpenLineage Mental Model

OpenLineage is useful because it gives a shared vocabulary:

Job    = abstract process that consumes/produces datasets
Run    = one execution instance of a job
Dataset = input or output data asset
Facet  = additional metadata attached to run/job/dataset

OpenLineage events are emitted as runs transition through lifecycle states.

In a pipeline platform, OpenLineage should not be treated as a diagramming feature. Treat it as a standard event protocol for metadata emission.

Useful facets for production pipelines

Facet TypeExample
Source versionKafka offsets, Iceberg snapshot ID, API cursor
SchemaInput/output schema version
CodeGit SHA, artifact version, container digest
Configconfig version, feature flags
Qualitychecks run, severity, pass/fail count
Errorexception class, stage, error code
Ownershipteam, service account, on-call group
Securityclassification, PII tags, tenant boundary
Performancerow count, bytes, duration, cost estimate

The standard facets rarely cover every internal requirement. That is normal. Extend carefully, keep extensions stable, and document them.


6. Asset Graph vs Run Graph

Lineage often needs two graphs.

Asset graph

Long-lived relation between data assets.

Asset graph is used for:

  • dependency navigation,
  • ownership mapping,
  • contract review,
  • schema impact,
  • downstream notification,
  • access review,
  • documentation.

Run graph

Concrete execution evidence.

Run graph is used for:

  • incident diagnosis,
  • rollback/supersession,
  • audit evidence,
  • backfill tracking,
  • reproducibility,
  • root cause analysis.

A serious platform stores both.


7. Lineage as Evidence, Not Decoration

The lineage system must support evidence-grade questions.

Poor lineage record:

job A produced table B

Better lineage record:

runId=case-normalizer/2026-07-04T02:00
job=case-normalizer
codeVersion=git:abc123
artifact=case-normalizer:2.8.1
configVersion=normalizer-config:v17
input=topic case-events.v3 offsets {0: 8001-9110, 1: 7010-8022}
inputSchema=case-event-avro:3.2.0
output=iceberg silver.case_event snapshot 885
outputSchema=silver-case-event:5.1.0
qualityResult=PASS
rowCount=17_812
rejected=21
publishedAt=2026-07-04T02:07:18Z

This is enough to investigate.

The lineage event should not rely only on log parsing. Logs are helpful, but lineage must be explicit, structured, and queryable.


8. Java Domain Model for Lineage

Start with stable domain objects.

public record DatasetRef(
        String namespace,
        String name,
        DatasetType type,
        Optional<String> version
) {}

public enum DatasetType {
    KAFKA_TOPIC,
    ICEBERG_TABLE,
    POSTGRES_TABLE,
    OBJECT_PREFIX,
    API_ENDPOINT,
    SEARCH_INDEX,
    DASHBOARD,
    REPORT,
    MODEL,
    UNKNOWN
}

public record JobRef(
        String namespace,
        String name,
        String owner,
        String service
) {}

public record RunRef(
        String runId,
        Instant startedAt,
        Optional<String> parentRunId,
        Optional<String> backfillId
) {}

public record CodeRef(
        String gitSha,
        String artifactName,
        String artifactVersion,
        Optional<String> containerDigest
) {}

public record LineageEvent(
        LineageEventType type,
        Instant eventTime,
        RunRef run,
        JobRef job,
        List<DatasetRef> inputs,
        List<DatasetRef> outputs,
        CodeRef code,
        Map<String, Object> facets
) {}

public enum LineageEventType {
    START,
    RUNNING,
    COMPLETE,
    FAIL,
    ABORT
}

Keep the internal model richer than the external protocol.

Then build adapters:

Internal LineageEvent
    -> OpenLineage JSON
    -> platform lineage store
    -> audit store
    -> observability event

Do not let an external spec dictate every internal invariant.


9. Dataset Naming

Lineage becomes useless when dataset identity is inconsistent.

Bad:

case_event
prod.case_event
silver.case_event
s3://bucket/path/case_event
iceberg.case_event
Case Event Silver

Good:

namespace = iceberg://prod-catalog/enforcement
name      = silver.case_event

For Kafka:

namespace = kafka://prod-msk-a
name      = regulatory.case-events.v3

For PostgreSQL:

namespace = postgres://case-db-prod
name      = public.case

For S3/Object storage prefix:

namespace = s3://regulatory-prod-landing
name      = vendor-a/cases/YYYY/MM/DD/

Rules:

  • Dataset identity must be stable.
  • Environment must be explicit.
  • Do not put run ID in dataset name.
  • Do not put snapshot ID in logical dataset name.
  • Version belongs in metadata/facet unless version is part of logical contract.
  • PII classification belongs in metadata, not name.

10. Source Position Lineage

Dataset-level lineage is not enough. You often need source position.

For Kafka:

{
  "kafka": {
    "topic": "case-events.v3",
    "partitions": {
      "0": {"from": 882001, "toExclusive": 883120},
      "1": {"from": 450010, "toExclusive": 451004}
    }
  }
}

For CDC:

{
  "cdc": {
    "connector": "case-db-postgres",
    "sourceLsnFrom": "16/B374D848",
    "sourceLsnTo": "16/B389FF20",
    "snapshot": false
  }
}

For Iceberg:

{
  "iceberg": {
    "table": "silver.case_event",
    "snapshotId": 884,
    "sequenceNumber": 1901
  }
}

For API ingestion:

{
  "api": {
    "endpoint": "GET /cases",
    "cursorFrom": "eyJwYWdlIjo1MDA...",
    "cursorTo": "eyJwYWdlIjo1MjA...",
    "watermark": "2026-07-04T01:59:59Z"
  }
}

For file ingestion:

{
  "file": {
    "manifestId": "vendor-a-20260704-0200",
    "files": [
      {"path": "landing/vendor-a/cases/2026-07-04/file-01.csv", "sha256": "..."}
    ]
  }
}

This lets you reproduce or bound the run.


11. Column-Level Lineage

Column-level lineage answers:

If input column X changes, which output columns break?
Which output column contains derived PII?
Which metric depends on policy table field response_days?

Example:

Column lineage can be:

  • manually declared,
  • inferred from SQL AST,
  • inferred from Spark/Flink plan,
  • emitted by transformation code,
  • approximated through mapping metadata.

For Java pipelines, explicit declaration is often more reliable than magical inference.

public record ColumnLineage(
        DatasetColumn output,
        List<DatasetColumn> inputs,
        TransformationKind kind,
        String expressionId,
        String description
) {}

public record DatasetColumn(
        DatasetRef dataset,
        String column
) {}

public enum TransformationKind {
    COPY,
    RENAME,
    CAST,
    HASH,
    MASK,
    DERIVE,
    AGGREGATE,
    FILTER_DEPENDENCY,
    JOIN_DEPENDENCY,
    CONSTANT,
    UNKNOWN
}

Example declaration:

List<ColumnLineage> lineage = List.of(
    new ColumnLineage(
        col(silverCaseEvent, "sla_deadline"),
        List.of(col(sourceCase, "accepted_at"), col(policyTable, "response_days")),
        TransformationKind.DERIVE,
        "sla_deadline_v2",
        "accepted_at plus policy response days using business calendar"
    ),
    new ColumnLineage(
        col(goldBreach, "is_breached"),
        List.of(col(silverCaseEvent, "sla_deadline"), col(silverCaseEvent, "closed_at")),
        TransformationKind.DERIVE,
        "breach_flag_v4",
        "closed_at after sla_deadline or still open after deadline"
    )
);

Column lineage should be versioned with the transformation.


12. Record-Level Lineage

Record-level lineage is costly but valuable in regulated workflows.

Example envelope fields:

public record RecordProvenance(
        String sourceSystem,
        String sourceEntity,
        String sourceRecordId,
        Optional<String> sourceVersion,
        Optional<String> sourcePosition,
        String producingRunId,
        String transformVersion,
        List<String> inputEventIds
) {}

For a regulatory enforcement case:

{
  "outputRecordId": "case-sla-breach:CASE-9001:2026-07-04",
  "sourceRecordId": "CASE-9001",
  "inputEventIds": [
    "case-accepted:CASE-9001:9",
    "policy-updated:POLICY-ENF-2026:2"
  ],
  "producingRunId": "breach-detector/20260704T0200",
  "transformVersion": "breach-detector-v4.3.1"
}

Use record-level lineage when:

  • auditors ask for “why did this decision happen?”
  • downstream action is high-impact,
  • output must be explainable,
  • correction propagation must be precise,
  • legal/regulatory defensibility matters.

Avoid storing full sensitive input values unless necessary. Store references, hashes, and evidence pointers.


13. Lineage Emission Points

Emit lineage at important lifecycle transitions.

Do not wait until the end to emit everything. Emit:

  • START when run begins,
  • RUNNING progress events for long runs,
  • COMPLETE after output is committed,
  • FAIL with failure stage and partial evidence,
  • ABORT for cancelled/superseded runs.

The final event must reflect committed reality, not intention.


14. The Commit Boundary Problem

Lineage must not claim output exists before it is safely published.

Bad sequence:

emit COMPLETE lineage
write output fails

Now metadata lies.

Better sequence:

write staging
validate staging
publish output atomically
observe published version
emit COMPLETE lineage with published version

For Iceberg:

write data files
commit snapshot
record snapshot id
emit COMPLETE lineage with snapshot id

For Kafka output:

produce events
transaction commit succeeds
record output topic + partition offset range if available
emit COMPLETE lineage

For external API side effect:

write effect ledger
perform idempotent API call
confirm outcome or mark unknown
emit lineage with side effect status

Lineage should match durable side effects.


15. Java Lineage Emitter

A simple interface:

public interface LineageEmitter {
    void emit(LineageEvent event) throws LineageEmissionException;
}

public final class CompositeLineageEmitter implements LineageEmitter {
    private final List<LineageEmitter> delegates;

    public CompositeLineageEmitter(List<LineageEmitter> delegates) {
        this.delegates = List.copyOf(delegates);
    }

    @Override
    public void emit(LineageEvent event) {
        RuntimeException failure = null;

        for (LineageEmitter delegate : delegates) {
            try {
                delegate.emit(event);
            } catch (RuntimeException e) {
                if (failure == null) failure = e;
                else failure.addSuppressed(e);
            }
        }

        if (failure != null) {
            throw failure;
        }
    }
}

But decide carefully: should lineage emission failure fail the pipeline?

The answer depends on the asset criticality.

Asset TypeLineage Emission Failure Policy
Experimental analyticswarn and continue
Internal operational tablewarn, alert, retry async
Regulatory reportfail publication or mark output non-certified
Legal/audit evidencefail closed
Backfill campaigncontinue only if run manifest is durable elsewhere

Do not use one global policy.


16. Durable Lineage Outbox

Lineage emission itself can fail.

If the pipeline commits output but fails to send lineage to the metadata service, you need recovery.

Use a lineage outbox.

Outbox table shape:

CREATE TABLE pipeline_lineage_outbox (
    id UUID PRIMARY KEY,
    run_id TEXT NOT NULL,
    event_type TEXT NOT NULL,
    event_time TIMESTAMPTZ NOT NULL,
    payload JSONB NOT NULL,
    status TEXT NOT NULL,
    attempts INT NOT NULL DEFAULT 0,
    last_error TEXT,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    sent_at TIMESTAMPTZ
);

Pipeline:

commit output
insert lineage outbox event in same durable control transaction if possible
relay sends to lineage sink

For lakehouse jobs where output commit and DB outbox are not in the same transaction, use run manifest as the recovery source.


17. Run Manifest as Lineage Source of Truth

Every serious pipeline run should produce a run manifest.

{
  "runId": "gold-sla-breach/20260704T0200",
  "job": "gold-sla-breach",
  "status": "PUBLISHED",
  "startedAt": "2026-07-04T02:00:00Z",
  "finishedAt": "2026-07-04T02:06:42Z",
  "inputs": [
    {
      "dataset": "iceberg://prod/enforcement/silver.case_event",
      "snapshotId": 884
    }
  ],
  "outputs": [
    {
      "dataset": "iceberg://prod/enforcement/gold.case_sla_breach",
      "snapshotId": 885
    }
  ],
  "code": {
    "gitSha": "abc123",
    "artifact": "breach-detector:4.3.1"
  },
  "config": {
    "version": "breach-config:17"
  },
  "quality": {
    "status": "PASS",
    "failedCriticalChecks": 0
  },
  "metrics": {
    "inputRows": 1200041,
    "outputRows": 9904,
    "rejectedRows": 31
  }
}

The manifest can be:

  • stored in object storage,
  • indexed into a run store,
  • emitted as OpenLineage facets,
  • attached to audit evidence,
  • used for rerun/reproduce operations.

If lineage service is down, the run manifest lets you reconstruct lineage later.


18. Impact Analysis Use Cases

Impact analysis should be designed around concrete decisions.

Schema change impact

Field case.priority renamed to risk_priority.
Who breaks?

Need:

  • column lineage,
  • schema contract registry,
  • consumer assumption registry,
  • downstream asset graph,
  • severity classification.

Data incident impact

Normalizer v8 parsed timezone incorrectly for accepted_at.
Which outputs are tainted?

Need:

  • run lineage,
  • code version,
  • output snapshot/version,
  • downstream materialization runs,
  • time window affected,
  • partition lineage.

Freshness impact

silver.case_event is 4 hours late.
Which SLA dashboards or alerting services are degraded?

Need:

  • asset dependency graph,
  • SLO registry,
  • consumer criticality,
  • dashboard/API lineage.

Privacy impact

Source field subject_national_id is reclassified as restricted PII.
Which downstream assets contain direct or derived values?

Need:

  • column lineage,
  • transformation kind,
  • masking/hash annotations,
  • access policy registry,
  • retention rules.

Backfill impact

Backfill campaign BF-20260704 will rewrite 180 days of gold assets.
Who must be notified?

Need:

  • downstream consumers,
  • publication windows,
  • dashboard refresh dependencies,
  • external exports,
  • restatement policy.

19. Impact Analysis Algorithm

At its simplest, impact analysis is graph traversal.

public record ImpactRequest(
        AssetNode changedAsset,
        Optional<String> changedColumn,
        ChangeType changeType,
        Severity sourceSeverity,
        Instant effectiveAt
) {}

public enum ChangeType {
    SCHEMA_BREAKING,
    SCHEMA_COMPATIBLE,
    DATA_INCORRECT,
    FRESHNESS_BREACH,
    PRIVACY_RECLASSIFICATION,
    ASSET_DEPRECATED,
    BACKFILL_RESTATEMENT
}

public record ImpactResult(
        AssetNode asset,
        ImpactPath path,
        ImpactSeverity severity,
        RecommendedAction action,
        String reason
) {}

Traversal:

public final class ImpactAnalyzer {
    private final LineageGraph graph;
    private final PolicyEvaluator policyEvaluator;

    public List<ImpactResult> downstreamImpact(ImpactRequest request) {
        var results = new ArrayList<ImpactResult>();
        var queue = new ArrayDeque<ImpactPath>();
        var visited = new HashSet<AssetNode>();

        queue.add(ImpactPath.start(request.changedAsset()));

        while (!queue.isEmpty()) {
            ImpactPath path = queue.removeFirst();
            AssetNode current = path.last();

            if (!visited.add(current)) {
                continue;
            }

            for (LineageEdge edge : graph.outgoing(current)) {
                var next = edge.to();
                var nextPath = path.extend(edge, next);

                ImpactResult result = policyEvaluator.evaluate(request, nextPath);
                results.add(result);

                if (result.severity().shouldPropagate()) {
                    queue.addLast(nextPath);
                }
            }
        }

        return results;
    }
}

The hard part is not traversal. The hard part is policy.


20. Impact Severity

Not every downstream node is equally affected.

SeverityMeaningExample Action
InformationalPotentially relevant, no immediate breaknotify owner
WarningData may be stale or partialmark degraded
BlockingPublication should stopblock downstream publish
TaintedOutput may be wrongquarantine or supersede
RestrictedPrivacy/security classification changedrevoke access / reclassify
CriticalExternal/regulatory impactincident + management/audit notification

Example policy:

public enum ImpactSeverity {
    INFO,
    WARNING,
    BLOCKING,
    TAINTED,
    RESTRICTED,
    CRITICAL;

    public boolean shouldPropagate() {
        return this != INFO;
    }
}

Severity should consider:

  • asset criticality,
  • consumer type,
  • freshness SLO,
  • quality gate result,
  • transformation semantics,
  • column usage,
  • privacy classification,
  • publication status,
  • external reporting deadlines.

21. Column-Level Impact Propagation

Dataset-level traversal overestimates impact.

If case.internal_notes changes, not every downstream table breaks.

Column-level impact needs field mapping.

public boolean edgeUsesColumn(LineageEdge edge, String column) {
    return edge.columnLineage().stream()
        .anyMatch(cl -> cl.inputs().stream()
            .anyMatch(input -> input.column().equals(column)));
}

Example:

Changed ColumnDownstream FieldImpact
case.accepted_atcase_event.sla_deadlinehigh
case.internal_notesnoneno direct impact
case.subject_national_idcase_subject_hashprivacy-derived impact
case.priorityrisk_bucketsemantic impact

Column-level impact should account for transformation kind.

Transformation KindImpact Behavior
COPYdirect propagation
CASTschema/value risk
MASKprivacy impact may be reduced but not removed
HASHderived sensitive data may remain sensitive
AGGREGATEmany input records affect one output
FILTER_DEPENDENCYinput controls inclusion even if not output
JOIN_DEPENDENCYinput affects enrichment or cardinality

A field that is used in a filter but not emitted can still have downstream impact.


22. Schema Change Impact Matrix

ChangeDataset-Level ImpactColumn-Level ImpactTypical Action
Add nullable fieldlownone unless consumednotify
Add required fieldhigh for producers/writersdownstream schema parser riskcompatibility gate
Remove fieldhighconsumers using field breakblock until migrated
Rename fieldbreaking unless alias supportedold field consumers breakdual-write/alias
Type wideningmediumparser/semantics risktest consumers
Type narrowinghighpotential loss/rejectionblock
Enum value addedmedium/highconsumers with exhaustive match breakcompatibility test
Meaning changedhighnot detectable by schema alonecontract review
Unit changedcriticalsilent data corruptionnew field/version

Lineage alone cannot detect semantic changes. It must integrate with data contracts.


23. Impact Analysis for Data Quality Failures

Quality failure should propagate differently based on rule type.

Failed RuleExamplePropagation
Completenessrow count lower than expecteddownstream count/aggregate assets tainted
Uniquenessduplicate case IDjoins/projections may be wrong
Referential integritymissing policy IDenrichment outputs tainted
Freshnesssource latedownstream assets degraded
Validityinvalid status valuesemantic outputs tainted
Distribution driftpriority mix changedwarning unless business-critical
Privacyunmasked PII presentrestricted/critical propagation

Quality result should be attached to lineage.

public record QualityFacet(
        String suiteId,
        String suiteVersion,
        QualityStatus status,
        int totalChecks,
        int failedChecks,
        List<QualityFailure> failures
) {}

Then impact analysis can answer:

Quality gate failed on silver.case_event partition 2026-07-04.
Which gold assets depend on that partition?
Which dashboards should be marked degraded?

24. Partition-Level Impact

Partition lineage prevents overreaction.

If only one date partition is tainted, do not invalidate a whole table unless necessary.

public record PartitionRef(
        DatasetRef dataset,
        Map<String, String> spec
) {}

Examples:

{"dataset":"silver.case_event", "partition":{"event_date":"2026-07-04"}}
{"dataset":"gold.case_sla_breach", "partition":{"report_date":"2026-07-04"}}
{"dataset":"case_events.v3", "partition":{"kafka_partition":"4"}}

Partition lineage is tricky because transformation can expand or compress partitions:

TransformInput PartitionOutput Partition
daily normalizesame daysame day
7-day rolling metriclast 7 daysone report day
monthly aggregationall days in monthmonth
late correctionold event daycurrent correction partition and old effective partition
bitemporal tablevalid date + recorded datemultiple query views

Record the partition mapping in the run manifest.


25. Lineage for Streaming Pipelines

Streaming lineage cannot emit one lineage event per record at high volume.

Use interval lineage.

job=case-normalizer
run=stream-run-20260704
interval=2026-07-04T02:00:00Z/PT5M
input topic offsets:
  p0: 8001-9200
  p1: 7010-8112
output topic offsets:
  p0: 10001-11202
checkpoint=chk-9002
watermark=2026-07-04T01:58:00Z
quality=pass-with-warnings

For long-running Flink jobs, lineage can be emitted:

  • at job start,
  • periodically per checkpoint interval,
  • per committed sink transaction,
  • when savepoint is taken,
  • when job version changes,
  • when quality status changes.

Do not model an infinite stream as one run with no structure. Introduce logical intervals.


26. Lineage for Kafka Topics

Kafka topics are datasets.

Important lineage metadata:

  • topic name,
  • cluster namespace,
  • partitions,
  • offset range,
  • key schema,
  • value schema,
  • header contract,
  • retention policy,
  • compaction policy,
  • producer service,
  • consumer group,
  • transactional ID where relevant.

Producer lineage:

source dataset(s) -> producer job -> output topic offset range

Consumer lineage:

input topic offset range -> consumer job -> output dataset(s)

Kafka lineage should not rely only on consumer group names. Consumer group names are runtime identities, not semantic ownership.


27. Lineage for CDC

CDC lineage should capture:

  • source database,
  • source schema/table,
  • transaction log position,
  • connector name/version,
  • snapshot vs streaming phase,
  • transaction metadata if enabled,
  • schema history version,
  • topic output,
  • operation type semantics.

CDC lineage is not the same as domain event lineage.

The raw CDC topic tells you database row changes.

The canonical event topic tells you domain facts.

Both can be lineage nodes, but they mean different things.


28. Lineage for Lakehouse Tables

Lakehouse lineage should capture snapshot IDs.

For Iceberg-like table formats:

input table snapshot(s)
output table snapshot
operation: append / overwrite / merge / delete / compact
manifest/run id

Example:

{
  "inputSnapshots": [
    {"table": "silver.case_event", "snapshotId": 884},
    {"table": "reference.policy", "snapshotId": 102}
  ],
  "outputSnapshots": [
    {"table": "gold.case_sla_breach", "snapshotId": 885}
  ],
  "operation": "REPLACE_PARTITIONS",
  "partitions": ["report_date=2026-07-04"]
}

This allows time-travel debugging:

Which input snapshots produced output snapshot 885?

Without snapshot-level lineage, reproducibility is often impossible.


29. Lineage for Airflow, Spark, Flink, and Beam

Airflow

Airflow controls task execution. It can emit DAG/task-level lineage.

Good for:

  • DAG run lineage,
  • task dependency,
  • scheduling evidence,
  • task-level input/output metadata.

Not enough for:

  • record-level provenance,
  • detailed Spark/Flink operator lineage,
  • exact output snapshot unless job reports it.

Spark

Spark jobs should emit:

  • input table/file snapshots,
  • output table snapshots,
  • SQL plan or transform version,
  • row counts,
  • quality results,
  • partition write info.

Flink jobs should emit:

  • job ID,
  • checkpoint/savepoint ID,
  • source offset/watermark range,
  • sink commit ID,
  • operator version,
  • state schema version.

Beam

Beam jobs should emit:

  • runner/job identity,
  • transform graph identity,
  • input/output PCollections mapped to real datasets,
  • window/trigger semantics,
  • state/timer metadata if relevant.

Never assume the orchestrator knows all data lineage. Often the worker knows the real datasets and positions.


30. Manual Lineage Is Not Evil

Automatic lineage is attractive but incomplete.

Manual lineage is acceptable when:

  • transformation is in Java code,
  • SQL parser cannot infer complex UDF behavior,
  • API output becomes a data asset,
  • dashboard/report dependencies are outside pipeline engine,
  • external side effects matter,
  • privacy semantics need human annotation.

But manual lineage must be code-reviewed and versioned.

Example manifest:

job: breach-detector
version: 4.3.1
inputs:
  - dataset: iceberg://prod/enforcement/silver.case_event
  - dataset: iceberg://prod/reference/policy_calendar
outputs:
  - dataset: iceberg://prod/enforcement/gold.case_sla_breach
columnLineage:
  - output: gold.case_sla_breach.case_id
    inputs:
      - silver.case_event.case_id
    kind: COPY
  - output: gold.case_sla_breach.is_breached
    inputs:
      - silver.case_event.sla_deadline
      - silver.case_event.closed_at
    kind: DERIVE

This file should be validated in CI.


31. Lineage Drift

Lineage can become wrong.

Common causes:

  • job reads new input but manifest not updated,
  • table renamed without alias,
  • SQL dynamic string escapes parser,
  • Java transform changes but column lineage manifest is stale,
  • dashboard connects directly to raw table,
  • ad hoc notebook writes production table,
  • backfill writes output without lineage emission,
  • failed run emits incomplete lineage as success.

Detect drift with:

  • runtime observed datasets vs declared datasets,
  • query logs vs catalog dependencies,
  • table write audit logs,
  • object storage write events,
  • schema registry usage,
  • CI checks comparing transform manifest to code/config,
  • periodic lineage reconciliation.

Lineage itself needs quality checks.


32. Lineage Quality Checks

Lineage quality rules:

Every published asset version must have a producing run.
Every producing run must have at least one code version.
Every certified asset must have owner and SLO.
Every sensitive output must have upstream sensitive lineage or documented derivation.
Every gold asset must have at least one quality result.
Every backfill output must link to a backfill campaign.
Every table write outside platform must be flagged.

Java validator:

public final class LineageQualityValidator {
    public List<LineageViolation> validate(AssetPublication publication) {
        var violations = new ArrayList<LineageViolation>();

        if (publication.producingRun().isEmpty()) {
            violations.add(error("MISSING_PRODUCING_RUN"));
        }
        if (publication.codeRef().isEmpty()) {
            violations.add(error("MISSING_CODE_REF"));
        }
        if (publication.asset().certified() && publication.owner().isEmpty()) {
            violations.add(error("CERTIFIED_ASSET_MISSING_OWNER"));
        }
        if (publication.asset().classification().isSensitive()
                && publication.privacyLineage().isEmpty()) {
            violations.add(error("SENSITIVE_ASSET_MISSING_PRIVACY_LINEAGE"));
        }

        return violations;
    }
}

Lineage quality should be part of publication gates.


33. Privacy and Sensitive Data Lineage

Privacy lineage asks:

Where did sensitive data go?
Was it copied, masked, hashed, aggregated, tokenized, or leaked?

Model field classification:

public enum Sensitivity {
    PUBLIC,
    INTERNAL,
    CONFIDENTIAL,
    PII,
    RESTRICTED_PII,
    SECRET
}

public record PrivacyTransform(
        DatasetColumn output,
        List<DatasetColumn> inputs,
        Sensitivity outputSensitivity,
        PrivacyOperation operation
) {}

public enum PrivacyOperation {
    COPY,
    MASK,
    TOKENIZE,
    HASH,
    ENCRYPT,
    AGGREGATE,
    DROP,
    DERIVE,
    UNKNOWN
}

Important rule:

Hashed PII may still be sensitive.

A stable hash of national ID can still be linkable. Do not mark it public by default.

Privacy impact analysis:

source.subject_national_id reclassified as RESTRICTED_PII
-> case_subject_hash derives from it via HASH
-> gold.case_subject_risk contains derived restricted identifier
-> export.daily_case_risk is sent to external vendor
-> access/export policy review required

This is where lineage becomes governance-critical.


34. Consumer Registry

Impact analysis is incomplete without consumers.

A table can be healthy, but if no one knows who uses it, you cannot manage change.

Consumer registry should track:

  • asset consumers,
  • consumer type,
  • owner,
  • criticality,
  • expected freshness,
  • contract assumptions,
  • access mode,
  • notification channel,
  • breakage tolerance,
  • regulatory exposure.

Example:

asset: iceberg://prod/enforcement/gold.case_sla_breach
consumers:
  - name: regulatory-sla-dashboard
    type: DASHBOARD
    owner: enforcement-analytics
    criticality: HIGH
    freshnessSlo: PT2H
  - name: case-risk-api
    type: API
    owner: case-platform
    criticality: CRITICAL
    freshnessSlo: PT15M
  - name: monthly-regulatory-report
    type: REPORT
    owner: compliance-reporting
    criticality: CRITICAL
    deadline: business-day-3

Consumers are lineage leaves.

Do not stop lineage at warehouse tables.


35. Change Review Workflow

When a producer changes an asset, impact analysis should run before merge/deploy.

Inputs:

  • changed schema,
  • changed transform manifest,
  • changed quality contract,
  • changed privacy tag,
  • changed retention policy,
  • changed output asset.

Outputs:

  • affected consumers,
  • required migrations,
  • rollout plan,
  • dual-run requirement,
  • backfill requirement,
  • notification list.

36. Incident Workflow

When an incident happens:

Lineage must answer quickly:

  • first bad run,
  • last good run,
  • affected input range,
  • affected output versions,
  • downstream assets produced from bad output,
  • external consumers notified,
  • replacement/superseding runs.

If lineage cannot answer these, incident response becomes manual archaeology.


37. Supersession and Tainted Data

Wrong output is not always deleted. In regulated platforms, it is often superseded.

Model supersession:

public record Supersession(
        DatasetVersion badVersion,
        DatasetVersion replacementVersion,
        String reason,
        String incidentId,
        Instant supersededAt
) {}

Graph:

Do not pretend bad data never existed. Record:

  • what was wrong,
  • who saw it,
  • when it was replaced,
  • what replacement version should be used,
  • whether external reports must be corrected.

38. Lineage Storage Architecture

A lineage platform commonly has:

Implementation choices:

ComponentOptions
Event protocolOpenLineage, internal JSON, protobuf
TransportHTTP, Kafka, queue, outbox relay
Storerelational DB, graph DB, document store, metadata platform
QueryREST/GraphQL, SQL, graph traversal API
UImetadata platform UI, custom impact view
Governancepolicy engine, approvals, tickets

Do not over-optimize storage before defining questions and invariants.


39. Relational Lineage Store Shape

A relational store can go far.

CREATE TABLE dataset (
    dataset_id UUID PRIMARY KEY,
    namespace TEXT NOT NULL,
    name TEXT NOT NULL,
    type TEXT NOT NULL,
    owner TEXT,
    classification TEXT,
    UNIQUE(namespace, name)
);

CREATE TABLE pipeline_job (
    job_id UUID PRIMARY KEY,
    namespace TEXT NOT NULL,
    name TEXT NOT NULL,
    owner TEXT,
    service TEXT,
    UNIQUE(namespace, name)
);

CREATE TABLE pipeline_run (
    run_id TEXT PRIMARY KEY,
    job_id UUID NOT NULL REFERENCES pipeline_job(job_id),
    status TEXT NOT NULL,
    started_at TIMESTAMPTZ NOT NULL,
    finished_at TIMESTAMPTZ,
    code_version TEXT,
    config_version TEXT,
    backfill_id TEXT
);

CREATE TABLE run_input (
    run_id TEXT NOT NULL REFERENCES pipeline_run(run_id),
    dataset_id UUID NOT NULL REFERENCES dataset(dataset_id),
    version_ref TEXT,
    position JSONB,
    PRIMARY KEY(run_id, dataset_id)
);

CREATE TABLE run_output (
    run_id TEXT NOT NULL REFERENCES pipeline_run(run_id),
    dataset_id UUID NOT NULL REFERENCES dataset(dataset_id),
    version_ref TEXT,
    position JSONB,
    publication_status TEXT NOT NULL,
    PRIMARY KEY(run_id, dataset_id, version_ref)
);

CREATE TABLE dataset_edge (
    from_dataset_id UUID NOT NULL REFERENCES dataset(dataset_id),
    to_dataset_id UUID NOT NULL REFERENCES dataset(dataset_id),
    edge_type TEXT NOT NULL,
    confidence TEXT NOT NULL,
    updated_at TIMESTAMPTZ NOT NULL,
    PRIMARY KEY(from_dataset_id, to_dataset_id, edge_type)
);

Column lineage:

CREATE TABLE column_lineage (
    id UUID PRIMARY KEY,
    from_dataset_id UUID NOT NULL,
    from_column TEXT NOT NULL,
    to_dataset_id UUID NOT NULL,
    to_column TEXT NOT NULL,
    transform_kind TEXT NOT NULL,
    transform_version TEXT NOT NULL,
    expression_id TEXT,
    updated_at TIMESTAMPTZ NOT NULL
);

This is enough to build useful impact analysis without starting with a graph database.


40. Graph Traversal Queries

Downstream assets:

WITH RECURSIVE downstream(depth, from_dataset_id, to_dataset_id, path) AS (
    SELECT
        1,
        from_dataset_id,
        to_dataset_id,
        ARRAY[from_dataset_id, to_dataset_id]
    FROM dataset_edge
    WHERE from_dataset_id = :changed_dataset_id

    UNION ALL

    SELECT
        d.depth + 1,
        e.from_dataset_id,
        e.to_dataset_id,
        d.path || e.to_dataset_id
    FROM downstream d
    JOIN dataset_edge e ON e.from_dataset_id = d.to_dataset_id
    WHERE d.depth < 10
      AND NOT e.to_dataset_id = ANY(d.path)
)
SELECT * FROM downstream;

Column impact:

WITH RECURSIVE col_downstream AS (
    SELECT
        from_dataset_id,
        from_column,
        to_dataset_id,
        to_column,
        1 AS depth
    FROM column_lineage
    WHERE from_dataset_id = :dataset_id
      AND from_column = :column

    UNION ALL

    SELECT
        cl.from_dataset_id,
        cl.from_column,
        cl.to_dataset_id,
        cl.to_column,
        cd.depth + 1
    FROM col_downstream cd
    JOIN column_lineage cl
      ON cl.from_dataset_id = cd.to_dataset_id
     AND cl.from_column = cd.to_column
    WHERE cd.depth < 10
)
SELECT * FROM col_downstream;

Keep a depth limit and cycle protection.


41. Lineage API Design

Useful API endpoints:

GET /assets/{id}/upstream
GET /assets/{id}/downstream
GET /assets/{id}/runs?from=&to=
GET /runs/{runId}
GET /runs/{runId}/inputs
GET /runs/{runId}/outputs
POST /impact/analyze
POST /lineage/events
POST /lineage/reconcile
GET /columns/{asset}/{column}/downstream
GET /incidents/{incidentId}/affected-assets

Impact request:

{
  "asset": "iceberg://prod/enforcement/silver.case_event",
  "column": "accepted_at",
  "changeType": "DATA_INCORRECT",
  "effectiveWindow": {
    "from": "2026-07-01T00:00:00Z",
    "to": "2026-07-04T02:00:00Z"
  },
  "sourceSeverity": "TAINTED"
}

Impact response:

{
  "affected": [
    {
      "asset": "iceberg://prod/enforcement/gold.case_sla_breach",
      "severity": "CRITICAL",
      "path": ["silver.case_event", "gold.case_sla_breach"],
      "reason": "uses accepted_at to compute sla_deadline and breach flag",
      "recommendedAction": "block publication and reprocess affected partitions"
    }
  ]
}

42. Lineage and Access Control

Lineage APIs expose sensitive architecture and data relationships.

Protect them.

Rules:

  • Users should only see assets they are allowed to know exist.
  • Sensitive column lineage should respect classification.
  • PII paths should require elevated permission.
  • Audit access to lineage for regulated assets.
  • Do not leak secret names, credentials, or full URLs with tokens.
  • External vendor lineage must be sanitized.

Impact analysis may need privileged computation but filtered presentation.

System computes full blast radius.
User sees only assets they are authorized to view.
Privileged incident commander sees restricted paths.

43. Anti-Patterns

Anti-pattern: lineage as a static diagram

Problem:

Architecture diagram says A -> B -> C, but no run evidence exists.

Fix:

Emit run-level lineage with input/output versions.

Anti-pattern: table-only lineage

Problem:

No downstream dashboard/API/report consumers are tracked.

Fix:

Add consumer registry and non-table assets.

Anti-pattern: ignoring failed runs

Problem:

Only successful runs are recorded. Incident root cause disappears.

Fix:

Emit START/FAIL/ABORT events with partial evidence.

Anti-pattern: lineage after publication only

Problem:

Lineage service outage causes permanent metadata gap.

Fix:

Use run manifest and lineage outbox.

Anti-pattern: no column-level privacy lineage

Problem:

Sensitive data is copied/derived downstream without visibility.

Fix:

Track column lineage and privacy operations.

Anti-pattern: trusting inference blindly

Problem:

SQL parser misses UDF/API/dynamic transformation.

Fix:

Combine automatic capture with versioned manual declarations.

44. Production Checklist

Before calling lineage production-grade, verify:

  • Every certified asset has owner, classification, and SLO.
  • Every published asset version has producing run.
  • Every run records code version and config version.
  • Every run records input datasets and output datasets.
  • Kafka pipelines record topic and offset ranges or logical intervals.
  • CDC pipelines record source table and log position/snapshot state.
  • Lakehouse pipelines record input/output snapshot IDs.
  • Backfill runs link to backfill campaign ID.
  • Failed runs emit failure lineage.
  • Lineage emission is retried or recoverable through outbox/manifest.
  • Column-level lineage exists for critical and sensitive assets.
  • Privacy lineage tracks copy/mask/hash/tokenize/drop/derive.
  • Consumer registry includes dashboards, reports, APIs, exports, ML features.
  • Impact analysis can traverse downstream and upstream.
  • Schema changes run impact analysis in CI.
  • Data incidents can identify affected output versions.
  • Supersession is modeled for wrong outputs.
  • Access to lineage is permissioned and audited.
  • Lineage quality checks run regularly.

45. Mental Model Recap

Lineage is not:

a diagram generated by a catalog tool

Lineage is:

a queryable evidence graph of dataset production, transformation, ownership, quality, code version, runtime execution, and downstream usage

Impact analysis is not:

manually asking five teams who might be affected

Impact analysis is:

policy-driven traversal of the evidence graph to determine blast radius, severity, and required action

The engineering invariant:

No important data output should be published without enough lineage to explain how it was produced and who may be affected if it is wrong.

46. Further Reading

Use these as factual anchors, not as substitutes for internal design:

  • OpenLineage specification and facets: https://openlineage.io/docs/spec/
  • OpenLineage facets: https://openlineage.io/docs/spec/facets/
  • Airflow OpenLineage provider: https://airflow.apache.org/docs/apache-airflow-providers-openlineage/stable/index.html
  • OpenMetadata lineage guides: https://docs.open-metadata.org/
  • Apache Iceberg metadata and snapshots: https://iceberg.apache.org/spec/
  • OpenTelemetry: https://opentelemetry.io/docs/

End of Part 067

Lesson Recap

You just completed lesson 67 in deepen practice. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.

Continue The Track

Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.