Deepen PracticeOrdered learning track

Observability for Data Pipelines

Learn Java Data Pipeline Pattern - Part 066

Observability for production Java data pipelines: logs, metrics, traces, lag, throughput, watermark, checkpoint, data quality, lineage, run manifests, OpenTelemetry, dashboards, alerts, and debugging workflows.

11 min read2170 words
PrevNext
Lesson 6684 lesson track46–69 Deepen Practice
#java#data-pipeline#observability#opentelemetry+4 more

Part 066 — Observability for Data Pipelines

A pipeline is observable when you can answer why a data product is wrong, late, incomplete, expensive, or unavailable without guessing.

Not merely:

Is the job running?

But:

Which source position has been processed?
Which asset version is published?
Which records were rejected?
Which transform version produced this output?
Which consumers are affected?
Which upstream asset caused the delay?
Which checkpoint restored this job?
Which partition/key/window is hot?
Which data quality rule blocked publication?
Which lineage path explains this field value?

Classic application observability uses logs, metrics, and traces. Data pipeline observability needs those, but also needs data-specific signals:

  • source lag,
  • freshness,
  • completeness,
  • watermark,
  • checkpoint health,
  • output count,
  • rejection rate,
  • schema drift,
  • state size,
  • partition skew,
  • file counts,
  • lineage,
  • run manifest,
  • data quality results,
  • reconciliation results,
  • publication state.

A pipeline without these signals is not operable. It may run, but it cannot be trusted.


1. Observability Mental Model

Think in four layers.

Most immature systems stop at L1 and L2.

They know the pod is healthy. They know the Airflow task succeeded. They know the Kafka consumer is running.

But they do not know the published table is missing 12% of expected records.

Production-grade pipeline observability starts from the consumer-visible data product and works backward.


2. The Debugging Questions

Every observability design should be tested against actual incident questions.

Freshness incident

Why is case_projection 45 minutes stale?

You need:

  • current published source position,
  • source high-watermark,
  • consumer group lag,
  • processor throughput,
  • retry/DLQ volume,
  • sink latency,
  • checkpoint duration,
  • backpressure status,
  • deployment/version timeline,
  • upstream availability.

Completeness incident

Why is partition 2026-07-03 missing 4,211 cases?

You need:

  • expected count evidence,
  • observed output count,
  • rejected count,
  • quarantine count,
  • source extraction manifest,
  • transform filtering counts,
  • join miss counts,
  • publish manifest,
  • run ID and transform version.

Accuracy incident

Why does case C-9001 show status CLOSED instead of ESCALATED?

You need:

  • source event history for key,
  • pipeline event envelope metadata,
  • transform decision log or audit rule trace,
  • reference data version,
  • correction/retraction events,
  • sink write history,
  • lineage path to output field,
  • data quality rule results.

Cost incident

Why did yesterday's job cost 9x more?

You need:

  • input volume,
  • output volume,
  • partition count,
  • file count,
  • shuffle/read/write bytes,
  • state size growth,
  • retry count,
  • query plan changes,
  • skew metrics,
  • backfill/replay markers.

If your telemetry cannot answer these, you do not have observability. You have monitoring fragments.


3. Signals: Logs, Metrics, Traces, Events, Manifests

A mature pipeline emits five classes of telemetry.

SignalPurposeExample
Logslocal narrative for specific events/errorsrecord rejected because enum unknown
Metricsaggregated behavior over timelag, throughput, failure rate
Tracescausal path across services/operatorsAPI ingest -> transform -> sink write
Domain eventssignificant state changesasset published, quality gate failed
Manifestsimmutable run/publication evidencesource offsets, counts, version, snapshot

Do not force all observability into logs.

Logs are expensive to query, hard to aggregate, and often missing structure.

Use metrics for trends and alerts. Use traces for causality. Use manifests for audit and replay evidence. Use logs for detailed local context.


4. The Pipeline Observability Taxonomy

Runtime metrics       = is the machinery healthy?
Data metrics          = is the data healthy?
Control-plane events  = what did the platform decide?
Lineage metadata      = what produced what?
Audit evidence        = can we reconstruct the output?

Examples:

Runtime: records_per_second, checkpoint_duration, retry_count, sink_latency
Data: input_count, output_count, reject_count, null_rate, duplicate_count
Control: run_started, run_blocked, backfill_approved, asset_published
Lineage: job -> input dataset -> output dataset, column mapping, code version
Audit: run manifest, source position, quality gate result, reconciliation result

A pipeline may be runtime-healthy and data-unhealthy.

That is the central reason pipeline observability differs from ordinary service observability.


5. Metrics Design

Metrics should be low-cardinality, stable, and tied to action.

Bad metric labels:

record_id
user_id
full_error_message
raw_file_name
SQL query text

Good labels:

pipeline
asset
stage
source
sink
partition
quality_rule_id
severity
tenant_class
processing_mode

For high-cardinality debugging, use logs, traces, manifests, or sampled diagnostic stores.

Core metric groups

pipeline_runtime_*
pipeline_source_*
pipeline_transform_*
pipeline_sink_*
pipeline_checkpoint_*
pipeline_quality_*
pipeline_reconciliation_*
pipeline_asset_*
pipeline_slo_*

Example metric names

pipeline_records_in_total
pipeline_records_out_total
pipeline_records_rejected_total
pipeline_processing_duration_seconds
pipeline_source_lag_seconds
pipeline_sink_write_duration_seconds
pipeline_checkpoint_duration_seconds
pipeline_checkpoint_failures_total
pipeline_watermark_lag_seconds
pipeline_quality_violations_total
pipeline_reconciliation_mismatch_total
pipeline_asset_publish_total
pipeline_asset_freshness_lag_seconds
pipeline_slo_budget_consumed_ratio

Metric names should encode units.

Good: pipeline_source_lag_seconds
Bad:  source_lag

6. Java Metrics Instrumentation

Keep instrumentation at pipeline boundaries.

public interface PipelineMetrics {
    void recordInput(String pipeline, String stage, long count);
    void recordOutput(String pipeline, String stage, long count);
    void recordRejected(String pipeline, String ruleId, String severity, long count);
    void recordSourceLag(String pipeline, Duration lag);
    void recordSinkLatency(String pipeline, String sink, Duration latency);
    void recordCheckpointDuration(String pipeline, Duration duration);
    void recordAssetFreshness(String asset, Duration lag);
}

A simple adapter can later map this interface to Micrometer, OpenTelemetry Metrics, Prometheus, CloudWatch, or another backend.

Do not scatter vendor-specific metric calls throughout transform logic.

public final class InstrumentedProcessor<I, O> implements Processor<I, O> {
    private final String pipelineName;
    private final String stageName;
    private final Processor<I, O> delegate;
    private final PipelineMetrics metrics;

    public InstrumentedProcessor(
            String pipelineName,
            String stageName,
            Processor<I, O> delegate,
            PipelineMetrics metrics
    ) {
        this.pipelineName = pipelineName;
        this.stageName = stageName;
        this.delegate = delegate;
        this.metrics = metrics;
    }

    @Override
    public List<Envelope<O>> process(Envelope<I> input) throws Exception {
        metrics.recordInput(pipelineName, stageName, 1);
        long started = System.nanoTime();
        try {
            List<Envelope<O>> out = delegate.process(input);
            metrics.recordOutput(pipelineName, stageName, out.size());
            return out;
        } catch (ValidationException ex) {
            metrics.recordRejected(pipelineName, ex.ruleId(), ex.severity().name(), 1);
            throw ex;
        } finally {
            long elapsed = System.nanoTime() - started;
            // record processing duration via histogram in concrete adapter
        }
    }
}

Instrumentation should be composable. The business transform should not need to know the metrics backend.


7. Logs: Structured, Sparse, Useful

Pipeline logs should be structured and correlated.

Minimum fields:

{
  "timestamp": "2026-07-04T04:12:33.000Z",
  "level": "WARN",
  "pipeline": "case-cdc-to-projection",
  "runId": "run-20260704-001",
  "asset": "serving.case_projection",
  "stage": "validate-status-transition",
  "recordKey": "case-9001",
  "eventId": "evt-abc",
  "sourcePosition": "lsn:16/B374D848",
  "traceId": "...",
  "ruleId": "CASE_STATUS_TRANSITION_004",
  "message": "invalid transition CLOSED -> ESCALATED; record quarantined"
}

Do not log raw payload by default.

Reasons:

  • PII leakage,
  • high cost,
  • noisy incidents,
  • retention risk,
  • hard-to-control access.

Use safe diagnostic references:

record_key_hash
payload_hash
source_position
quarantine_ref
manifest_ref
trace_id

Log levels

DEBUG = local development or short-lived diagnostic mode
INFO  = lifecycle event: run started, asset published, checkpoint committed
WARN  = recoverable anomaly: retry, quarantine, schema drift warning
ERROR = pipeline effect failed or record permanently rejected

Do not log every successful record in production.


8. Tracing Pipeline Work

Tracing is useful when a pipeline crosses process boundaries:

  • API ingestion service,
  • Kafka producer,
  • Kafka consumer,
  • transform service,
  • sink writer,
  • metadata store,
  • quality service,
  • publication service.

For batch jobs, traces can also model large steps:

run -> extract partition -> transform stage -> quality gate -> publish snapshot

Trace spans should not be created for every record in high-throughput streams unless sampled carefully.

Span model

pipeline.run
  source.read
  transform.validate
  transform.enrich
  sink.write
  checkpoint.commit
  asset.publish

Attributes:

pipeline.name
run.id
asset.name
asset.version
source.name
source.position
processing.mode
partition.id
transform.version
quality.rule.id

Java trace context in envelope

public record TraceContext(
        String traceId,
        String spanId,
        String traceparent,
        Map<String, String> baggage
) {}

public record Envelope<T>(
        String eventId,
        String key,
        T payload,
        RecordPosition position,
        Instant eventTime,
        Instant ingestionTime,
        TraceContext traceContext,
        Map<String, String> headers
) {}

Trace propagation matters across Kafka and file boundaries. Put traceparent or equivalent context in headers/metadata where appropriate.


9. OpenTelemetry in Pipeline Systems

OpenTelemetry gives a vendor-neutral model for telemetry: traces, metrics, and logs.

A practical Java setup often has:

Java app/instrumentation -> OpenTelemetry SDK/agent -> Collector -> backend(s)

Why this matters for data platforms:

  • one telemetry model across services and jobs,
  • correlation between ingest API, Kafka, processor, sink, and metadata store,
  • vendor neutrality,
  • ability to enrich telemetry with pipeline attributes,
  • consistent context propagation.

But OpenTelemetry alone does not give data observability. You still need pipeline-specific semantic signals.

OpenTelemetry tells you how operations behaved.
Pipeline metadata tells you what data was produced and whether it is trustworthy.

Use both.


10. Lag Metrics

Lag is not one metric.

Offset lag       = broker high-watermark offset - consumer committed/processed offset
Time lag         = now - event/source timestamp of last processed record
Watermark lag    = now/event-time frontier - current watermark
Sink lag         = last transformed position - last published position
Consumer lag     = producer-visible progress - consumer-visible progress
Business lag     = deadline - actual publication time

Kafka offset lag can be high during backfill and not indicate an incident. Time lag can be low while offset lag is high if old partitions are being replayed. Watermark lag can reveal idle or late sources. Sink lag can reveal slow external writes even when transform keeps up.

So dashboards should show lag decomposition.

Alert on consumer-impacting lag, not every backlog.


11. Throughput Metrics

Throughput tells whether the pipeline can keep up.

Track:

records_in_per_second
records_out_per_second
bytes_in_per_second
bytes_out_per_second
records_rejected_per_second
records_retried_per_second
records_quarantined_per_second
sink_writes_per_second

Throughput must be interpreted with lag.

ThroughputLagMeaning
highlowhealthy high volume
lowlowlow source volume or idle
highhighcatching up, but maybe insufficient capacity
lowhighbottleneck, blocked source/sink, backpressure, failure

For batch:

records_per_partition
partitions_per_hour
bytes_scanned
bytes_written
shuffle_bytes
files_written

For stateful streaming:

records_per_key_distribution
hot_key_count
state_reads_per_second
state_writes_per_second
state_size_bytes

12. Watermark Observability

Watermarks are the most misunderstood stream-processing signal.

A watermark is not “current time.” It is the pipeline's belief about event-time progress.

Track:

current_watermark_timestamp
watermark_lag_seconds
late_events_total
late_events_by_reason
idle_partitions
allowed_lateness_drops_total
side_output_late_events_total

Questions watermark metrics answer:

Are windows closing?
Is one partition holding back event-time progress?
Are events arriving later than expected?
Did a source become idle without being marked idle?
Is allowed lateness too strict?

For regulatory systems, late event handling must be explicit:

late_but_accepted
late_and_corrected
late_and_quarantined
late_and_dropped_by_policy

Dropping late data silently is an audit risk.


13. Checkpoint Observability

Checkpoint health is reliability health for stateful pipelines.

Track:

checkpoint_completed_total
checkpoint_failed_total
last_checkpoint_duration_seconds
last_checkpoint_size_bytes
checkpoint_alignment_time_seconds
checkpoint_start_delay_seconds
checkpoint_restore_duration_seconds
checkpoint_age_seconds
state_backend_size_bytes
savepoint_created_total

Important patterns:

SymptomPossible Cause
duration increasingstate growth, slow storage, backpressure
size increasing unexpectedlystate leak, missing TTL, key explosion
frequent failuresunstable storage, sink coordination issue, timeout too low
old checkpoint agejob cannot complete checkpoint, recovery risk rising
restore slowstate too large, bad state backend/configuration

Checkpoint metrics should be linked to deploys and backfills.

A new code version that doubles state size is a release risk.


14. Data Quality Observability

Quality checks need metrics, logs, sample evidence, and policy state.

Track:

quality_rule_evaluations_total
quality_violations_total{rule_id,severity}
quality_violation_rate{rule_id,severity}
quality_gate_status{asset}
quarantined_records_total
null_rate{field}
unknown_enum_total{field}
referential_miss_total{reference_asset}
duplicate_key_total
schema_drift_total

Do not create a metric label for every field in a huge schema if it explodes cardinality. Use a controlled registry of monitored fields.

Quality observability should answer:

Which rule changed?
Which input caused the spike?
Which asset publication was blocked?
Which consumers are affected?
Is this a real source issue or a transform bug?

Quality gate event

{
  "eventType": "quality_gate_failed",
  "asset": "gold.case_daily_summary",
  "runId": "run-20260704-001",
  "gateId": "case_summary_publish_gate",
  "criticalViolations": 2,
  "highViolations": 17,
  "blockedPublication": true,
  "evidenceRef": "quality-results/run-20260704-001"
}

Quality telemetry should be part of publish evidence, not separate side notes.


15. Reconciliation Observability

Reconciliation is where pipeline correctness becomes provable.

Track:

reconciliation_checks_total
reconciliation_failures_total
expected_count
observed_count
count_difference
expected_checksum
observed_checksum
balance_difference
missing_key_count
extra_key_count

Do not only show pass/fail. Store evidence.

public record ReconciliationResult(
        String runId,
        String assetName,
        String checkName,
        long expectedCount,
        long observedCount,
        BigDecimal expectedAmount,
        BigDecimal observedAmount,
        String expectedChecksum,
        String observedChecksum,
        boolean passed,
        String evidenceRef
) {}

For high-risk assets, reconciliation results should be immutable.


16. Lineage Observability

Lineage tells what produced what.

Minimum lineage for each asset publication:

output asset name/version
input asset names/versions
pipeline/job name
run ID
code version
transform version
source positions
schema versions
quality gate results
publication time
owner

Lineage enables:

  • impact analysis,
  • root cause analysis,
  • audit reconstruction,
  • schema change review,
  • backfill planning,
  • consumer notification,
  • deletion/privacy propagation.

A useful lineage graph is not just pretty.

It must answer operational questions:

If silver.case_lifecycle_events is wrong for 2026-07-01, which gold assets and reports are affected?
If field case_status changes semantics, which consumers assume the old meaning?
If backfill run B supersedes run A, which downstream assets need restatement?

17. Run and Publication Events

Emit structured control-plane events.

pipeline_run_requested
pipeline_run_started
pipeline_run_heartbeat
pipeline_run_succeeded
pipeline_run_failed
pipeline_run_cancelled
quality_gate_passed
quality_gate_failed
asset_publish_started
asset_published
asset_publish_failed
backfill_campaign_started
backfill_campaign_completed
slo_violation_detected

These events should not be hidden inside logs only.

They should be queryable by platform APIs.

Example:

{
  "eventType": "asset_published",
  "asset": "silver.case_current_state",
  "assetVersion": "snapshot-928102",
  "runId": "run-20260704-003",
  "publishedAt": "2026-07-04T05:00:00Z",
  "sourcePositions": {
    "case_db": "lsn:16/B374D848"
  },
  "inputCounts": {
    "case_lifecycle_events": 901233
  },
  "outputCounts": {
    "case_current_state": 120044
  }
}

18. Dashboards That Actually Help

A good dashboard follows the incident workflow.

Asset overview dashboard

Shows:

  • asset status,
  • freshness lag,
  • completeness status,
  • quality gate status,
  • latest published version,
  • latest source position,
  • affected consumers,
  • current SLO budget burn,
  • last successful run,
  • last failed run.

Runtime dashboard

Shows:

  • throughput,
  • lag decomposition,
  • error/retry/DLQ rate,
  • sink latency,
  • checkpoint health,
  • JVM memory/GC,
  • backpressure,
  • worker restarts,
  • partition skew.

Data quality dashboard

Shows:

  • critical/high violation counts,
  • rule trends,
  • source-specific failures,
  • quarantine backlog,
  • drift detection,
  • top affected fields,
  • release/change correlation.

Backfill dashboard

Shows:

  • campaign progress,
  • partitions completed,
  • validation status,
  • cost/compute usage,
  • production impact,
  • publish readiness,
  • rollback target.

Dashboards should move from symptom to cause.

A wall of charts without decision flow creates operational blindness.


19. Alert Design

Alerts should correspond to SLO risk or data loss risk.

Good alert categories:

SLO fast burn
SLO violation
source lag approaching retention limit
checkpoint not completing
critical quality gate failure
asset publish failed after successful compute
reconciliation mismatch
DLQ/quarantine backlog exceeds threshold
schema drift on critical source
state size growth abnormal
sink error budget burning

Bad alert categories:

one retry happened
one record rejected
DAG task failed but automatic retry is running
consumer lag briefly above fixed threshold during expected backfill
CPU high for one minute with no SLO impact

Alert payload should include context:

asset
pipeline
run_id
stage
current_sli
slo_target
budget_consumed
affected_consumers
last_good_version
source_position
runbook
suggested_triage_steps

20. Correlation IDs and Causality

Every meaningful telemetry item should correlate to one or more identifiers.

run_id
pipeline_name
asset_name
asset_version
source_position
event_id
record_key_hash
trace_id
transform_version
schema_version
backfill_campaign_id

For a single record investigation, you want to traverse:

record key -> source event -> envelope -> transform logs -> quality result -> sink write -> asset version -> consumer read

For a batch investigation, you want:

asset version -> run manifest -> input manifests -> quality results -> reconciliation -> lineage -> publish event

For a streaming incident, you want:

consumer-visible lag -> topic partition lag -> operator throughput -> checkpoint/backpressure -> sink latency -> external dependency

This traversal only works if IDs are propagated deliberately.


21. Observability for Different Pipeline Types

File ingestion

Track:

files_discovered_total
files_imported_total
files_rejected_total
partial_files_detected_total
manifest_files_expected
manifest_files_imported
file_bytes_imported
file_age_seconds

Important dimensions:

source_system
landing_zone
file_type
business_date

API ingestion

Track:

api_requests_total
api_errors_total
rate_limit_remaining
cursor_lag_seconds
pages_fetched_total
entities_upserted_total
entities_deleted_total
token_refresh_failures_total

CDC ingestion

Track:

source_lsn_lag_seconds
snapshot_rows_scanned_total
snapshot_completed
transactions_processed_total
ddl_events_total
heartbeat_age_seconds
schema_history_errors_total

Kafka stream processing

Track:

consumer_offset_lag
records_consumed_rate
records_produced_rate
commit_latency
rebalance_total
partition_assignment_count
state_store_size
restore_duration

Track:

num_records_in_per_second
num_records_out_per_second
current_input_watermark
checkpoint_duration
checkpoint_size
backpressured_time_ratio
busy_time_ratio
idle_time_ratio
state_size
late_records

Spark batch/structured streaming

Track:

input_rows
processed_rows_per_second
batch_duration
state_operator_rows
watermark
shuffle_read_bytes
shuffle_write_bytes
files_written
query_progress

Lakehouse table

Track:

snapshot_created_total
snapshot_age_seconds
data_files_count
small_files_count
manifest_count
orphan_files_count
compaction_duration
snapshot_expiration_events

22. JVM and Runtime Signals

Java pipeline code must still expose normal JVM signals.

Track:

heap_used
non_heap_used
gc_pause_seconds
gc_count
thread_count
blocked_threads
classloader_metrics
executor_queue_depth
executor_active_threads
connection_pool_active
connection_pool_wait_time
http_client_latency
db_pool_exhaustion

Why it matters:

  • GC pauses can create Kafka consumer poll issues.
  • Thread pool saturation can look like sink latency.
  • Connection pool exhaustion can cause retry storms.
  • Memory growth can indicate unbounded dedupe/state/cache.
  • Blocking calls inside async pipelines can collapse throughput.

Pipeline observability is still application observability plus data semantics.


23. Sampling Strategy

High-volume pipelines cannot trace/log every record deeply.

Use sampling intentionally:

Always sample: errors, quarantines, critical records, publish events, quality gate failures
Probabilistic sample: successful records
Key-based sample: specific tenant/case/customer under investigation
Window sample: diagnostic mode for limited duration
Backfill sample: per partition summary + selected record traces

Diagnostic sampling must have expiration.

diagnostic_mode:
  enabled: true
  pipeline: case-cdc-to-projection
  key_filter_hashes:
    - a8f44...
  expires_at: 2026-07-04T08:00:00Z
  approved_by: platform-oncall

Never leave verbose payload logging enabled indefinitely.


24. Privacy and Security in Observability

Telemetry can leak data.

Risks:

  • raw payload in logs,
  • PII in metric labels,
  • sensitive IDs in traces,
  • unrestricted DLQ access,
  • quality sample records exposed broadly,
  • lineage revealing restricted datasets,
  • debug mode exporting payloads to third-party tools.

Controls:

hash record keys in logs
separate diagnostic store access
redact payload fields by classification
prohibit high-cardinality PII labels
set retention by sensitivity
encrypt evidence stores
link telemetry access to asset classification
review observability exports as data flows

Observability is part of the data platform attack surface.


25. Observability as Code

Define telemetry expectations with the pipeline definition.

pipeline: case-cdc-to-projection
asset: serving.case_projection
telemetry:
  required_metrics:
    - pipeline_source_lag_seconds
    - pipeline_records_in_total
    - pipeline_records_out_total
    - pipeline_records_rejected_total
    - pipeline_sink_write_duration_seconds
    - pipeline_checkpoint_duration_seconds
    - pipeline_asset_freshness_lag_seconds
  required_events:
    - pipeline_run_started
    - pipeline_run_succeeded
    - pipeline_run_failed
    - asset_published
    - quality_gate_failed
  required_manifest_fields:
    - sourcePositions
    - inputCounts
    - outputCounts
    - rejectedCounts
    - transformVersion
    - publishedSnapshotId

CI can validate instrumentation coverage.

A production pipeline should not launch without required observability.


26. Minimal Java Observability Kit

A practical Java kit can include:

PipelineLogger
PipelineMetrics
PipelineTracer
RunManifestWriter
QualityResultWriter
LineageEmitter
PublicationEventEmitter
SloMarkerWriter

Example façade:

public interface PipelineTelemetry {
    PipelineRunScope startRun(PipelineRunStart start);
    void recordQualityResult(QualityResult result);
    void recordReconciliation(ReconciliationResult result);
    void publishAssetEvent(AssetPublicationEvent event);
    void recordSloMarker(SloMarker marker);
}

public interface PipelineRunScope extends AutoCloseable {
    String runId();
    void heartbeat(Map<String, String> sourcePositions);
    void recordInputCount(String name, long value);
    void recordOutputCount(String name, long value);
    void recordRejectedCount(String name, long value);
    void fail(Throwable error);
    void succeed(PipelineRunCompletion completion);
    @Override void close();
}

Usage:

try (PipelineRunScope run = telemetry.startRun(start)) {
    ExtractResult extracted = extractor.extract(run.runId());
    run.recordInputCount("source_rows", extracted.count());

    TransformResult transformed = transformer.transform(extracted);
    run.recordOutputCount("valid_rows", transformed.validCount());
    run.recordRejectedCount("invalid_rows", transformed.rejectedCount());

    QualityResult quality = qualityGate.evaluate(transformed);
    telemetry.recordQualityResult(quality);
    quality.throwIfBlocking();

    Publication publication = publisher.publish(transformed);
    telemetry.publishAssetEvent(publication.toEvent());

    run.succeed(publication.toCompletion());
} catch (Throwable t) {
    // fail is ideally called before close if exception path needs structured info
    throw t;
}

This keeps observability in the runner/control boundary, not buried inside business logic.


27. Incident Workflow

A mature incident workflow uses telemetry in a predictable order.

Step 1: Identify user-visible impact

Which asset/SLO/consumer is affected?

Step 2: Locate failing boundary

source -> ingest -> transform -> quality -> sink -> publish -> consumer

Step 3: Determine data effect

late, missing, wrong, duplicate, inaccessible, private-data exposure, cost runaway

Step 4: Find last good version

asset version, run ID, source position, transform version

Step 5: Choose remediation

resume, retry, replay, backfill, quarantine, rollback, restate, publish correction

Step 6: Preserve evidence

manifests, quality results, source positions, decision logs, incident notes

Step 7: Update controls

new metric, quality rule, SLO, runbook, test, contract, capacity limit

Observability should shorten this path.


28. Anti-Patterns

Anti-pattern 1: Logs as the only evidence

Logs are not a reliable source of truth for published data.

Fix: write run manifests and publication events.

Anti-pattern 2: Green job dashboard

A green job dashboard can hide stale or wrong data.

Fix: asset-level freshness/completeness/quality status.

Anti-pattern 3: High-cardinality metric explosion

Putting record IDs or raw keys in metric labels kills metric systems.

Fix: use controlled labels and diagnostic stores.

Anti-pattern 4: No source position

Without source position, you cannot prove what data was included.

Fix: persist source offset/cursor/LSN/manifest ID.

Anti-pattern 5: No publish event

Without publish events, you cannot distinguish computed output from consumer-visible output.

Fix: explicit asset publication telemetry.

Anti-pattern 6: Trace everything forever

Full per-record tracing for high-volume streams is usually unaffordable and risky.

Fix: sampling and targeted diagnostic mode.

Anti-pattern 7: Observability afterthought

Adding telemetry after incidents leads to inconsistent signals.

Fix: observability-as-code in pipeline definitions.


29. Production Dashboard Layout

A useful top-level page for one asset:

[Asset] serving.case_projection
Owner: case-platform
Criticality: Tier 1
Latest Published Version: snapshot-928102
Latest Source Position: lsn:16/B374D848
Freshness Lag: 48s / target 120s
Completeness: pass
Quality Gate: pass
Availability Probe: pass
SLO Budget Consumed: 12% monthly
Affected Consumers: ops-dashboard, search-api, breach-alerts

Then sections:

1. SLO status
2. Source lag decomposition
3. Runtime health
4. Quality and reconciliation
5. Recent runs/publications
6. Lineage upstream/downstream
7. Recent deploys/config changes
8. Backfill/replay activity
9. Runbook and owner contact

The dashboard should answer: “Is it safe? If not, where do I go next?”


30. Review Checklist

Before promoting a pipeline to production, verify:

1. Every run has a unique run_id.
2. Every output asset version has a publication event.
3. Source positions are persisted.
4. Input/output/rejected counts are captured.
5. Quality rule results are queryable.
6. Reconciliation evidence exists for critical assets.
7. Freshness metrics are asset-level, not just job-level.
8. Lag is decomposed by source, processing, sink, and publish boundary.
9. Checkpoint/state metrics are visible for stateful jobs.
10. Watermark/late event metrics exist for event-time jobs.
11. DLQ/quarantine backlog is visible.
12. Logs are structured and do not leak sensitive payloads.
13. Metrics use safe cardinality.
14. Traces propagate run/asset/context across services.
15. Dashboards map to incident workflows.
16. Alerts are tied to SLO risk or data loss risk.
17. Observability requirements are declared as code.
18. Access to diagnostic data follows data classification.

If source positions, publication events, and quality results are missing, the pipeline is not meaningfully observable.


31. Key Takeaways

Pipeline observability is not just application observability applied to batch jobs.

It is the ability to explain data product state.

The essential shift:

from: pod/job/task health
  to: asset/data/consumer health

Use logs, metrics, and traces, but do not stop there.

A production Java pipeline should emit:

  • runtime metrics,
  • data quality metrics,
  • source position markers,
  • checkpoint/watermark signals,
  • run manifests,
  • publication events,
  • reconciliation results,
  • lineage events,
  • SLO evaluations.

The final test is simple:

When a consumer says “this data is wrong,” can you reconstruct why, how far the impact goes, and what version should replace it?

If yes, your pipeline is observable. If no, your pipeline is merely instrumented.

Lesson Recap

You just completed lesson 66 in deepen practice. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.

Continue The Track

Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.