Deepen PracticeOrdered learning track

Airflow DAG Design for Java Pipelines

Learn Java Data Pipeline Pattern - Part 058

Production-grade Airflow DAG design for Java data pipelines: task boundaries, Java job submission, run manifests, sensors, deferrable operators, dynamic task mapping, asset scheduling, quality gates, lineage, retries, and operational recovery.

18 min read3563 words
PrevNext
Lesson 5884 lesson track46–69 Deepen Practice
#java#data-pipeline#airflow#orchestration+4 more

Part 058 — Airflow DAG Design for Java Pipelines

Airflow is often misunderstood.

It is not a data processing engine.

It is not a streaming engine.

It is not where you should write millions of row transformations.

Airflow is a workflow orchestrator. Its best role in Java data platforms is to coordinate finite, observable, retryable units of work:

  • submit Java batch jobs
  • submit Spark jobs
  • coordinate file ingestion
  • wait for external availability
  • run data quality gates
  • publish table versions
  • emit lineage
  • run backfills
  • coordinate report generation

This part shows how to design Airflow DAGs for Java pipelines without turning Airflow into a fragile pile of Python glue.


1. Mental Model

A good Airflow DAG should answer:

“What bounded work must happen, in what dependency order, with what retry, validation, and publish semantics?”

A bad Airflow DAG answers:

“Where can I paste all my pipeline code?”

The distinction matters.

Airflow should own control-flow

Airflow should decide:

  • when the pipeline run starts
  • which task depends on which task
  • which task can retry
  • which task blocks publication
  • where task status is recorded
  • how humans inspect and rerun work

Java should own data logic

Java/Spark/Flink jobs should decide:

  • how records are parsed
  • how domain transformations work
  • how contracts are validated
  • how sink writes are staged
  • how idempotency is enforced
  • how reconciliation is computed

The clean architecture is:

Airflow coordinates. Java processes.


2. Airflow DAG as a Control Contract

A DAG is not just a visual graph. It is a control contract.

For each DAG, define:

dagId: regulatory_case_daily_snapshot
owner: enforcement-data-platform
businessPurpose: Produce official daily case snapshot for regulatory reporting
schedule: daily
inputDatasets:
  - regulatory.raw.case_events
  - regulatory.ref.case_policy_versions
outputDatasets:
  - regulatory.gold.case_daily_snapshot
criticality: high
freshnessSlo: published by 06:00 Asia/Jakarta
failureImpact: official reporting dataset unavailable
manualRunSupported: true
backfillSupported: true

If this metadata is absent, your DAG is only code. It is not an operational asset.


3. Task Boundary Principle

A task should be a meaningful failure, retry, and evidence boundary.

Good task boundaries:

create_run_manifest
verify_source_availability
submit_java_extract_job
validate_raw_output
submit_java_transform_job
validate_staging_output
publish_dataset_version
emit_lineage
close_run_manifest

Bad task boundaries:

process_record_1
process_record_2
process_record_3

Also bad:

do_everything

A task is too coarse if:

  • you cannot tell which phase failed
  • retry repeats unnecessary expensive work
  • output cannot be inspected before publish
  • audit evidence is hidden in logs

A task is too fine if:

  • task count explodes with row count
  • Airflow metadata DB becomes the bottleneck
  • scheduler overhead dominates work
  • individual tasks are not independently meaningful

Practical rule

One Airflow task should usually represent one bounded external operation,
not one data record and not the entire pipeline.

4. Standard Java Pipeline DAG Shape

A production Java pipeline DAG often has this structure:

This structure creates explicit boundaries:

  • run identity
  • source readiness
  • processing execution
  • quality validation
  • publication
  • lineage
  • run closure

5. Run Manifest First

Do not start a serious pipeline without a run manifest.

A run manifest is the bridge between Airflow and the Java job.

{
  "runId": "case-daily-snapshot__2026-07-04__attempt-1",
  "dagId": "regulatory_case_daily_snapshot",
  "logicalDate": "2026-07-04",
  "inputScope": {
    "eventDateFrom": "2026-07-03",
    "eventDateTo": "2026-07-04"
  },
  "transformVersion": "case-snapshot-transform:3.8.1",
  "mode": "NORMAL",
  "requestedBy": "airflow",
  "outputTarget": "iceberg://regulatory.gold.case_daily_snapshot/dt=2026-07-03",
  "qualityProfile": "official-reporting-v2"
}

Why manifest first?

Because the Java job needs stable context:

  • run ID
  • input range
  • output target
  • transform version
  • processing mode
  • quality profile
  • lineage identity
  • backfill/correction mode

Without this, your job reads environment variables, file paths, and dates from scattered configuration.

That becomes unreproducible.


6. Java Job Contract

The Java job invoked by Airflow should expose a simple contract.

Command-line interface

java -jar case-snapshot-pipeline.jar \
  --run-manifest-uri s3://pipeline-manifests/case-daily-snapshot/2026-07-04/manifest.json

Required outputs

The Java job should write:

run-output.json
metrics.json
quality-precheck.json
lineage.json
sink-commit.json
error-report.json, if failed

Example run-output.json

{
  "runId": "case-daily-snapshot__2026-07-04__attempt-1",
  "status": "SUCCEEDED",
  "inputRecordCount": 812334,
  "outputRecordCount": 811902,
  "rejectedRecordCount": 432,
  "stagingLocation": "s3://lake/staging/case_daily_snapshot/run=...",
  "checkpoint": {
    "source": "iceberg://regulatory.silver.case_events",
    "snapshotId": "918274981274"
  },
  "startedAt": "2026-07-04T01:00:12Z",
  "finishedAt": "2026-07-04T01:18:44Z"
}

Airflow should inspect this structured output, not scrape logs.


7. Exit Code Contract

A Java job must return meaningful exit codes.

public enum PipelineExitCode {
    SUCCESS(0),
    CONFIGURATION_ERROR(10),
    SOURCE_NOT_READY(20),
    CONTRACT_VIOLATION(30),
    QUALITY_PRECHECK_FAILED(40),
    SINK_WRITE_FAILED(50),
    SINK_COMMIT_UNKNOWN(60),
    NON_RETRYABLE_BUG(70);

    private final int code;

    PipelineExitCode(int code) {
        this.code = code;
    }

    public int code() {
        return code;
    }
}

Airflow retry policy can then be sensible:

Exit classRetry?Action
Source not readyYesRetry/wait.
Temporary sink failureYesRetry with idempotent staging.
Contract violationNoQuarantine and notify owner.
Quality failedNo or manualBlock publish.
Commit unknownManualInvestigate sink state before retry.
Code bugNoFail fast.

Do not let every failure return 1.


8. Airflow Should Submit, Not Compute

Bad Airflow task:

def transform_cases():
    rows = read_millions_of_rows()
    transformed = []
    for row in rows:
        transformed.append(apply_business_rules(row))
    write_output(transformed)

This abuses Airflow workers.

Better Airflow task:

def submit_java_job(run_manifest_uri: str):
    submit_kubernetes_job(
        image="registry.acme.com/case-snapshot-pipeline:3.8.1",
        args=["--run-manifest-uri", run_manifest_uri],
    )

The task launches a data-plane job.

The data-plane job does the heavy work.


9. Deployment Options for Java Jobs

Airflow can submit Java jobs through several execution backends.

BackendUse whenCaution
BashOperator calling java -jarSimple local worker deploymentWeak isolation and dependency management.
DockerOperatorContainerized jobs on worker-accessible DockerOperationally constrained in many clusters.
KubernetesPodOperatorKubernetes-native job executionNeed image/version/resource discipline.
SparkSubmitOperatorSpark cluster manages executionBest for Spark jobs; inspect driver/executor logs.
EMR/Dataproc/Glue operatorsCloud-managed Spark jobsCloud-specific behavior and IAM complexity.
Custom operatorRepeated enterprise patternMust implement lineage, retries, status cleanly.

For serious platforms, Kubernetes or managed Spark submission is usually cleaner than running heavy Java directly on Airflow workers.


10. DAG Skeleton

This is a simplified conceptual skeleton.

Provider imports differ by Airflow version and provider package, so treat this as design structure, not copy-paste API reference.

from __future__ import annotations

import json
from datetime import datetime, timedelta

from airflow.decorators import dag, task

DAG_ID = "regulatory_case_daily_snapshot"
PIPELINE_IMAGE = "registry.acme.com/case-snapshot-pipeline:3.8.1"

@dag(
    dag_id=DAG_ID,
    start_date=datetime(2026, 1, 1),
    schedule="0 1 * * *",
    catchup=True,
    max_active_runs=1,
    default_args={
        "owner": "enforcement-data-platform",
        "retries": 2,
        "retry_delay": timedelta(minutes=10),
    },
    tags=["java", "regulatory", "gold", "case"],
)
def case_daily_snapshot():

    @task
    def create_run_manifest(logical_date: str) -> str:
        manifest = {
            "dagId": DAG_ID,
            "logicalDate": logical_date,
            "pipelineName": "case-daily-snapshot",
            "transformVersion": "3.8.1",
            "mode": "NORMAL",
        }
        # write manifest to object storage or manifest service
        return write_manifest(manifest)

    @task
    def check_sources(manifest_uri: str) -> str:
        manifest = read_manifest(manifest_uri)
        # verify required upstream dataset snapshots exist
        # return same manifest URI after updating readiness evidence
        return manifest_uri

    @task
    def submit_java_job(manifest_uri: str) -> str:
        # submit Kubernetes/Spark/Batch job with manifest URI
        # wait or return external job ID depending on operator design
        return submit_job_and_wait(
            image=PIPELINE_IMAGE,
            args=["--run-manifest-uri", manifest_uri],
        )

    @task
    def run_quality_gate(job_result_uri: str) -> str:
        result = read_json(job_result_uri)
        assert_quality(result)
        return job_result_uri

    @task
    def publish_dataset(job_result_uri: str) -> str:
        result = read_json(job_result_uri)
        return publish_staging_output(result)

    @task
    def emit_lineage(published_version_uri: str) -> None:
        emit_openlineage_event(published_version_uri)

    manifest_uri = create_run_manifest("{{ ds }}")
    ready_manifest_uri = check_sources(manifest_uri)
    job_result_uri = submit_java_job(ready_manifest_uri)
    quality_result_uri = run_quality_gate(job_result_uri)
    published_version_uri = publish_dataset(quality_result_uri)
    emit_lineage(published_version_uri)

case_daily_snapshot()

The real functions write_manifest, read_manifest, submit_job_and_wait, assert_quality, publish_staging_output, and emit_openlineage_event should live in shared platform libraries.

Do not duplicate them in every DAG.


11. Scheduling Model

Airflow scheduling is part of correctness.

A DAG may run because of:

  • cron schedule
  • manual trigger
  • backfill/catchup
  • upstream dataset/asset update
  • external event bridge
  • SLA remediation

Cron schedule

Use for time-based pipelines:

Every day at 01:00, build yesterday’s official snapshot.

Risk:

  • upstream data may not be ready
  • time schedule hides dependency on data availability

Mitigation:

  • source readiness checks
  • sensors
  • asset-aware scheduling
  • freshness guard

Asset/dataset-triggered schedule

Use when downstream work should run after upstream data updates.

When silver.case_events and reference.case_policy_versions are updated,
run gold.case_daily_snapshot.

Risk:

  • multiple updates may trigger too often
  • quality of upstream update may still be bad

Mitigation:

  • trigger consolidation
  • quality state in dataset event
  • max active runs
  • run manifest dedupe

Manual trigger

Use for:

  • incident repair
  • correction replay
  • targeted backfill
  • one-off publication

Risk:

  • undocumented parameters
  • accidental duplicate outputs

Mitigation:

  • explicit parameter schema
  • run manifest validation
  • approval controls for production datasets

12. Catchup and Backfill

Airflow catchup can be useful, but dangerous.

If a daily DAG has catchup enabled and has missed 120 days, Airflow may schedule 120 runs.

That can be correct for partitioned recomputation.

It can also destroy your platform if every run launches expensive Spark jobs at once.

Design rules

Set:

max_active_runs=1

or a small number when output partitions are independent.

Use pools or concurrency limits for expensive pipelines.

Define whether backfill runs:

  • overwrite partitions
  • append new versions
  • create restatement snapshots
  • write to shadow namespace
  • suppress external notifications

Backfill manifest

{
  "mode": "BACKFILL",
  "backfillRunId": "bf_case_snapshot_2024_q4_v2",
  "dateRange": {
    "from": "2024-10-01",
    "to": "2024-12-31"
  },
  "publishMode": "SHADOW_THEN_PROMOTE",
  "suppressNotifications": true,
  "requestedBy": "data-platform-oncall",
  "approvedBy": "regulatory-reporting-owner"
}

Backfill is not just “rerun old dates”. It is a controlled operational mode.


13. Sensors and Waiting

Waiting is common:

  • wait for file arrival
  • wait for upstream table snapshot
  • wait for external API export
  • wait for Spark job completion
  • wait for quality report
  • wait for downstream consumer lag to drain

But waiting consumes resources if done badly.

Poor waiting pattern

while not source_ready():
    time.sleep(60)

This occupies a worker slot.

Better waiting pattern

Use sensors designed for Airflow, and where available, prefer deferrable operators/sensors so waiting can be offloaded from workers.

Conceptually:

wait_for_partner_file = DeferrableFileSensor(
    task_id="wait_for_partner_file",
    path="s3://landing/partner/{{ ds }}/manifest.json",
    timeout=60 * 60 * 4,
)

The exact class depends on provider support.

The design principle is stable:

Waiting should not waste compute that should execute work.


14. Dynamic Task Mapping

Dynamic task mapping is useful when the amount of work is known at runtime.

Examples:

  • one task per detected partition
  • one task per partner file
  • one task per tenant
  • one task per independent source table

Do not use it for row-level processing.

Good dynamic mapping scope

100 partitions -> 100 mapped tasks may be fine.
100 million rows -> 100 million mapped tasks is catastrophic.

Example conceptual pattern

@task
def discover_partitions(manifest_uri: str) -> list[dict]:
    return [
        {"dt": "2026-07-01"},
        {"dt": "2026-07-02"},
        {"dt": "2026-07-03"},
    ]

@task
def submit_partition_job(partition: dict) -> str:
    return submit_java_partition_job(partition)

partitions = discover_partitions(manifest_uri)
results = submit_partition_job.expand(partition=partitions)

Guardrails

  • cap max mapped tasks
  • group small partitions
  • use pools
  • use per-partition manifest
  • preserve partition-level output evidence
  • avoid dynamic task IDs that hide lineage

15. Source Readiness Checks

A source is ready only when it satisfies its contract.

Not enough:

file exists

Better:

file exists
manifest exists
file size stable
checksum matches
row count matches trailer
source extract status is COMPLETED
source partition watermark >= required time

Java-style source readiness record

{
  "source": "partner-enforcement-file",
  "partition": "dt=2026-07-03",
  "ready": true,
  "evidence": {
    "manifestFound": true,
    "checksumMatched": true,
    "recordCount": 98122,
    "producerStatus": "COMPLETED"
  }
}

Airflow should not simply check existence and move on.


16. Quality Gates

A quality gate is a control-flow decision based on data evidence.

Quality gate examples:

  • row count within expected range
  • required columns non-null
  • uniqueness constraints hold
  • referential checks pass
  • checksum reconciliation passes
  • sensitive data policy passes
  • freshness is acceptable
  • output grain is valid

Quality result

{
  "runId": "case-daily-snapshot__2026-07-04__attempt-1",
  "status": "FAILED",
  "checks": [
    {
      "name": "case_id_not_null",
      "severity": "BLOCKER",
      "failedCount": 0
    },
    {
      "name": "event_count_reconciliation",
      "severity": "BLOCKER",
      "expected": 812334,
      "actual": 811902,
      "difference": 432
    }
  ]
}

Do not publish official datasets if blocker checks fail.


17. Publish Step

Publishing should be explicit.

The transform writes staging output.

The publish task promotes it.

transform -> staging output -> validation -> publish pointer/version

This avoids half-published datasets.

Publish patterns

PatternUse when
Atomic renameFile system supports atomic rename semantics.
Catalog pointer updateLakehouse table snapshot/catalog controls visibility.
Replace partitionPartition output is independently recomputable.
Versioned publishConsumers can select dataset version.
View switchPresentation layer points to new version.

Publish record

{
  "datasetId": "regulatory.gold.case_daily_snapshot",
  "datasetVersion": "snapshot-918274981274",
  "runId": "case-daily-snapshot__2026-07-04__attempt-1",
  "publishedAt": "2026-07-04T02:00:00Z",
  "publishedBy": "airflow",
  "qualityStatus": "PASSED"
}

Publication is a business and operational boundary, not just a file write.


18. Retry Design

Airflow retries tasks. That does not automatically make the pipeline safe.

Every retried task must be idempotent.

Retry-safe task examples

TaskSafe if
create manifestrunId is unique and create is idempotent.
check sourceread-only.
submit Java jobsubmission has idempotency key.
write stagingstaging path includes run attempt or is cleaned safely.
quality gateread-only over stable staging output.
publishpublish operation is atomic and idempotent.
emit lineagelineage event has stable run/job identity.

Dangerous retry

Task failed after writing output but before reporting success.
Airflow retries.
Second attempt writes duplicate output.

Fix:

  • write to run-specific staging
  • use sink commit ledger
  • make publish idempotent
  • check existing output before writing
  • treat unknown commit separately

19. Unknown Commit Outcome

This is one of the most dangerous production states.

Example:

Java job writes output.
Network timeout occurs before it confirms commit.
Airflow marks task failed.

Was the output committed?

Maybe.

A blind retry may duplicate or corrupt.

Required handling

The Java job should write a commit record:

{
  "runId": "case-daily-snapshot__2026-07-04__attempt-1",
  "sink": "iceberg://regulatory.gold.case_daily_snapshot",
  "stagingLocation": "s3://lake/staging/...",
  "commitState": "COMMITTED",
  "committedSnapshotId": "918274981274"
}

If Airflow receives SINK_COMMIT_UNKNOWN, route to manual recovery or a safe commit-inspection task.


20. XCom Discipline

Do not push large data through Airflow XCom.

Use XCom for small control metadata:

  • manifest URI
  • job ID
  • output URI
  • dataset version ID
  • quality result URI

Do not use it for:

  • records
  • large JSON payloads
  • dataframes
  • file contents
  • millions of IDs

Pattern:

XCom carries pointer, object storage carries payload.

Example:

return "s3://pipeline-runs/case-daily/2026-07-04/run-output.json"

Not:

return entire_output_dataframe

21. Pools, Concurrency, and Blast Radius

Airflow can launch too much work if unbounded.

Use:

  • max_active_runs
  • task concurrency
  • pools
  • queue separation
  • priority weights
  • environment separation

Example concurrency policy

pipeline: regulatory_case_daily_snapshot
maxActiveRuns: 1
pool: gold_publication_pool
maxParallelPartitions: 8
sparkClusterQueue: regulatory-critical
backfillApprovalRequired: true

Why this matters

Without concurrency control:

  • catchup can overload source systems
  • backfill can starve daily pipelines
  • retries can create storm behavior
  • multiple runs can publish conflicting outputs

Orchestration is also capacity governance.


22. DAG Parameters and Validation

Manual runs need parameters.

But parameters must be validated.

Example:

{
  "mode": "BACKFILL",
  "fromDate": "2026-01-01",
  "toDate": "2026-01-31",
  "publishMode": "SHADOW",
  "reason": "Recompute after policy rule v4 fix"
}

Validation rules:

  • fromDate <= toDate
  • date range within allowed maximum
  • production publish requires approval
  • NORMAL mode cannot override transform version manually
  • BACKFILL mode must include reason
  • CORRECTION mode must include correction ticket

Do not trust manually supplied parameters.


23. Environment and Configuration

Avoid hardcoding environment-specific values in DAG files.

Bad:

S3_BUCKET = "prod-regulatory-data-lake"
DB_PASSWORD = "..."

Better:

  • environment-specific deployment config
  • secret manager
  • Airflow connections for connection metadata
  • runtime manifest for pipeline parameters
  • image tag for transform version

Configuration layers

LayerExampleShould change how often?
CodeDAG structureRarely
Pipeline registrydataset owners, quality profileOccasionally
Runtime manifestdate range, mode, run IDEvery run
SecretscredentialsRotated
Infrastructurecluster, queue, resource classPlatform-managed

24. Observability

Airflow gives task status, not full pipeline observability.

You need additional signals.

Airflow-level metrics

  • DAG run duration
  • task duration
  • retries
  • failures by task
  • schedule delay
  • queue delay
  • SLA misses

Java job metrics

  • input records
  • output records
  • rejected records
  • throughput
  • stage duration
  • source latency
  • sink commit latency
  • memory/GC

Data quality metrics

  • failed checks
  • null ratio
  • duplicate count
  • referential violations
  • freshness
  • reconciliation delta

Dataset metrics

  • published version
  • row count
  • partition count
  • table snapshot ID
  • consumer freshness

Airflow is one lens. It is not the whole observability platform.


25. Lineage

Lineage should connect:

Airflow DAG run -> task -> Java job -> input datasets -> output datasets -> published version

OpenLineage-style concepts are useful:

  • run
  • job
  • dataset
  • input dataset
  • output dataset
  • facets for metadata

Example lineage event shape

{
  "eventType": "COMPLETE",
  "eventTime": "2026-07-04T02:00:00Z",
  "run": {
    "runId": "case-daily-snapshot__2026-07-04__attempt-1"
  },
  "job": {
    "namespace": "airflow.prod",
    "name": "regulatory_case_daily_snapshot.submit_java_job"
  },
  "inputs": [
    {
      "namespace": "iceberg.prod",
      "name": "regulatory.silver.case_events"
    }
  ],
  "outputs": [
    {
      "namespace": "iceberg.prod",
      "name": "regulatory.gold.case_daily_snapshot"
    }
  ]
}

Lineage must not be afterthought logs. It is part of production evidence.


26. Logging Contract

Java job logs should be structured.

Minimum fields:

runId
pipelineName
taskName
attempt
stage
datasetId
partition
tenantId, if applicable
traceId
correlationId
severity
errorCode

Example:

{
  "level": "INFO",
  "runId": "case-daily-snapshot__2026-07-04__attempt-1",
  "stage": "write_staging",
  "datasetId": "regulatory.gold.case_daily_snapshot",
  "partition": "dt=2026-07-03",
  "message": "Staging write complete",
  "outputRecordCount": 811902
}

Airflow task logs should show control-flow decisions.

Java logs should show data-plane execution.


27. Alerting

Alerting should be tied to impact, not just failure.

Useful alert dimensions

  • DAG failed
  • critical task failed
  • quality gate failed
  • publish delayed
  • freshness SLO breached
  • source unavailable
  • repeated retries
  • unknown commit state
  • backfill exceeded cost/time limit

Alert payload

{
  "severity": "HIGH",
  "pipeline": "regulatory_case_daily_snapshot",
  "runId": "case-daily-snapshot__2026-07-04__attempt-1",
  "failedTask": "run_quality_gate",
  "impact": "Official case daily snapshot not published",
  "owner": "enforcement-data-platform",
  "runbook": "https://runbooks.acme/regulatory-case-daily-snapshot",
  "nextAction": "Inspect quality result URI"
}

Do not alert with only:

Task failed.

That is not operationally useful.


28. Security and Secrets

Airflow often has broad access. Treat it carefully.

Rules:

  • do not store secrets in DAG code
  • use connections/secret backends
  • limit Airflow service account privileges
  • use separate identities for Java data-plane jobs
  • do not let every DAG write every dataset
  • propagate identity into run manifest
  • log who triggered manual runs

Identity separation

Airflow scheduler identity:
  can submit job
  can read/write run manifest
  cannot directly mutate protected gold table unless through publish service

Java job identity:
  can read specific input datasets
  can write staging area
  cannot publish official version unless publish token/approval exists

Publish service identity:
  can promote validated staging output

This prevents the orchestrator from becoming an uncontrolled superuser.


29. Multi-Tenant DAG Design

If the same pipeline runs for multiple tenants, avoid copy-pasting DAGs blindly.

Options:

  1. One DAG per tenant.
  2. One DAG with tenant parameter.
  3. Dynamic task mapping over tenants.
  4. Platform-generated DAGs from registry.

Decision matrix

ModelGood forRisk
DAG per tenantStrong isolation and ownershipMany DAGs to manage.
Parameterized DAGSimple common logicManual run mistakes.
Dynamic tenant mappingShared schedule, parallelismOne tenant failure may complicate run state.
Generated DAGsPlatform scaleGenerator becomes critical system.

Tenant isolation rules

  • tenant-specific run manifest
  • tenant-specific output namespace or partition
  • tenant-specific quality result
  • tenant-specific retry limit
  • tenant-specific access control
  • tenant-specific lineage facet

30. Backfill DAG Design

Do not overload the daily DAG with complex backfill logic unless the backfill semantics are truly identical.

Often you need a separate backfill DAG.

Backfill DAG shape

Backfill must define:

  • date range
  • transform version
  • source snapshot version
  • publish mode
  • approval
  • notification suppression
  • cost budget
  • rollback strategy

31. Airflow + Kafka Boundary

Airflow can interact with Kafka, but be careful.

Good uses:

  • emit DatasetVersionPublished after publish
  • trigger a DAG from a compacted control topic through a bridge
  • wait for consumer lag to drain after backfill
  • submit bounded replay job

Bad uses:

  • one DAG run per Kafka event
  • Airflow task as Kafka consumer loop
  • Airflow as event bus
  • storing Kafka payloads in XCom

Pattern: publish event after dataset commit

{
  "eventType": "DatasetVersionPublished",
  "datasetId": "regulatory.gold.case_daily_snapshot",
  "datasetVersion": "snapshot-918274981274",
  "runId": "case-daily-snapshot__2026-07-04__attempt-1"
}

Downstream systems react through Kafka.

Airflow does not directly call every consumer.


For Flink, Airflow usually coordinates bounded operations:

  • deploy/update job
  • trigger savepoint
  • stop with savepoint
  • run bounded replay job
  • validate output after replay

Airflow should not supervise every record.

For continuous Flink jobs, use dedicated platform/job manager monitoring for runtime health, not a daily Airflow task pretending to run the stream.


33. Airflow + Spark Boundary

Spark fits Airflow well when the Spark job is finite.

Airflow controls:

  • schedule
  • parameters
  • submission
  • dependency
  • retry
  • quality gate
  • publish

Spark controls:

  • distributed read
  • transform
  • shuffle
  • aggregation
  • write
  • checkpoint/staging

Spark Java command

spark-submit \
  --class com.acme.pipeline.casegold.CaseDailySnapshotJob \
  --conf spark.sql.shuffle.partitions=800 \
  s3://artifacts/case-snapshot-pipeline-3.8.1.jar \
  --run-manifest-uri s3://pipeline-runs/case/2026-07-04/manifest.json

The DAG should not contain Spark transformation code.


34. DAG Testing

A DAG needs tests just like Java code.

Test categories:

Parse test

  • DAG imports successfully
  • task IDs are stable
  • dependencies are correct

Parameter validation test

  • invalid manual config fails
  • date range maximum enforced
  • publish mode rules enforced

Manifest test

  • created manifest contains required fields
  • logical date mapping is correct
  • backfill mode differs from normal mode

Dependency test

  • publish depends on quality gate
  • quality gate depends on transform
  • lineage emission depends on publish

Idempotency test

  • duplicate trigger event maps to same run or is rejected
  • rerun does not create duplicate manifest

Failure-path test

  • quality failure routes to quarantine
  • commit unknown routes to inspection
  • source unavailable retries/waits

Do not only test that the DAG imports.


35. Operational Runbook

Every production DAG needs a runbook.

Minimum sections:

Pipeline purpose
Owner
Input datasets
Output datasets
Schedule
Freshness SLO
Quality gates
Common failures
How to rerun one partition
How to run backfill
How to inspect Java job logs
How to inspect sink commit state
How to handle unknown commit
How to replay DLQ or rejected data
How to contact upstream owner
How to disable publication safely

Airflow UI helps with operations, but only if the recovery process is designed.


36. Regulatory Enforcement Example DAG

Suppose we build official daily case snapshot.

Inputs

regulatory.silver.case_events
regulatory.silver.case_assignments
regulatory.ref.policy_versions
regulatory.ref.regional_units

Output

regulatory.gold.case_daily_snapshot

DAG

Quality checks

case_id is not null
one row per case_id per snapshot_date
open cases count reconciles with event-derived ledger
closed cases do not have active breach clocks
regional_unit_id exists in reference table
PII fields are absent from gold reporting table
snapshot freshness <= 6 hours

Failure policy

FailureAction
Missing case events inputWait/retry.
Missing reference dataFail and notify reference owner.
Java transform failureRetry if retryable; otherwise fail.
Quality failureQuarantine, block publish.
Publish unknownInspect table snapshot before retry.
Lineage emit failureRetry; if still fails, mark run degraded but do not corrupt data.

37. Common Airflow Anti-Patterns

Anti-pattern: Business rules in DAG code

Bad:

if case["severity"] == "HIGH" and case["age"] > 30:
    case["breach"] = True

Business rules belong in versioned Java transformation code, not orchestration glue.

Anti-pattern: XCom as database

Do not pass large data through XCom.

Anti-pattern: No publish boundary

Writing directly to final table from transform job makes quality gate meaningless.

Anti-pattern: Unlimited catchup

Catchup without concurrency/cost control can trigger platform incidents.

Anti-pattern: Silent skips

Skipped tasks must have business meaning. Do not skip failed quality gates to make the DAG green.

Anti-pattern: Manual rerun with different code version

Rerun must record transform version. Otherwise output is not reproducible.

Anti-pattern: Logs as evidence

Logs are not enough. Store structured manifests, quality reports, commit records, and lineage events.


38. Production Checklist

Before promoting an Airflow DAG for a Java data pipeline, verify:

DAG design

  • DAG has owner, purpose, schedule, input, output, SLO.
  • Tasks represent meaningful retry/failure boundaries.
  • Heavy data processing is not done inside Airflow workers.
  • max_active_runs, pools, and concurrency limits are set.
  • Catchup behavior is intentional.

Java job integration

  • Java job accepts run manifest URI.
  • Java job emits structured run output.
  • Java job has meaningful exit codes.
  • Transform version is recorded.
  • Logs contain run ID and task context.

Correctness

  • Source readiness checks use contract evidence.
  • Output is staged before publish.
  • Quality gate blocks publication.
  • Publish is atomic/idempotent.
  • Unknown commit state has recovery path.

Operations

  • Runbook exists.
  • Alerts include impact and next action.
  • Manual parameters are validated.
  • Backfill mode is explicit.
  • Lineage is emitted.
  • Dataset publish event is emitted after commit.

Security

  • Secrets are not in DAG code.
  • Airflow identity is least-privileged.
  • Java job identity is scoped to required data.
  • Manual production publish requires appropriate approval.

39. Final Mental Model

Airflow is powerful when it is used as a control plane.

It becomes fragile when it becomes a data plane.

For Java data pipelines:

Airflow owns:
  schedule, dependencies, run state, retries, visibility, publish gate

Java owns:
  parsing, transformation, validation logic, sink protocol, metrics, domain correctness

Lakehouse/Kafka/Flink/Spark own:
  scalable data movement and processing

Manifest/lineage/quality systems own:
  evidence and governance

A top-tier design does not ask Airflow to do everything.

It makes every boundary explicit.


40. References

  • Apache Airflow Documentation — DAG authoring, scheduling, asset-aware scheduling, dynamic task mapping, sensors, deferrable operators.
  • Apache Airflow Providers — Kubernetes, Spark, OpenLineage provider documentation.
  • Apache Spark Documentation — spark-submit, Java APIs, Spark SQL, Structured Streaming.
  • OpenLineage Specification — run/job/dataset lineage model and facets.
  • Apache Iceberg Documentation — snapshot publication, table metadata, optimistic commits, maintenance.

41. Next Part

Next:

learn-java-data-pipeline-pattern-part-059-asset-centric-orchestration.mdx

We will move from task-centric DAGs to asset-centric orchestration:

  • dataset as first-class object
  • freshness contracts
  • dependency graph by data product
  • asset materialization
  • lineage-driven impact analysis
  • task graph vs asset graph
  • ownership and governance at dataset boundary
Lesson Recap

You just completed lesson 58 in deepen practice. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.

Continue The Track

Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.