Deepen PracticeOrdered learning track

Spark Java Pipeline Patterns

Learn Java Data Pipeline Pattern - Part 050

Apache Spark pipeline patterns using Java: Dataset and DataFrame API, job packaging, transformation design, partitioning, joins, aggregations, writes, testing, and operational boundaries.

17 min read3216 words
PrevNext
Lesson 5084 lesson track46–69 Deepen Practice
#java#data-pipeline#apache-spark#batch+3 more

Part 050 — Spark Java Pipeline Patterns

Apache Spark is a distributed analytics engine. In Java pipeline work, Spark is usually used for:

  • large batch transformations,
  • joins across big datasets,
  • historical backfill,
  • lakehouse table writes,
  • data quality profiling,
  • feature generation,
  • wide aggregations,
  • micro-batch streaming through Structured Streaming.

But Spark should not be treated as “Java collections at cluster scale”.

That mental model fails.

A better mental model:

Spark lets you describe a logical transformation plan over distributed datasets; the engine optimizes and executes that plan across a cluster.

The Java engineer’s job is not only to write transformations. It is to design:

  • clean input/output contracts,
  • deterministic transform boundaries,
  • partition-aware computation,
  • safe write semantics,
  • resource-aware execution,
  • testable transformation code,
  • operational observability.

This part focuses on production Java usage, not a toy word count example.


1. Spark Is Not a Bigger Stream API

It is tempting to see this:

List<CaseEvent> events = ...;
events.stream()
    .filter(...)
    .map(...)
    .collect(...);

and imagine Spark as:

Dataset<CaseEvent> events = ...;
events.filter(...)
    .map(...)
    .collectAsList();

That is the wrong center of gravity.

In Spark, operations build a distributed plan. Data is partitioned. Computation is lazy. Shuffles are expensive. User-defined Java lambdas are serialized and sent to executors. Execution happens away from the driver.

The central questions are:

What plan am I building?
Where will the data move?
What is the partitioning boundary?
What is the cardinality after each stage?
What can be pushed into Spark SQL engine?
What side effects are accidentally happening on executors?
How will output be committed safely?

Spark pipeline design is plan design.


2. Spark Abstractions You Must Separate

Spark offers several major APIs.

APIJava Pipeline UseNotes
Dataset<Row> / DataFrameMost production batch transformationOptimized relational/columnar style; less Java type safety
Dataset<T>Typed domain-ish processingUseful but Java encoders and lambdas can be awkward
SQLDeclarative transformationsStrong for joins/aggregations; easy to review
RDDLow-level distributed collectionRare in modern structured pipelines; useful for specialized control
Structured StreamingIncremental/micro-batch stream processingCovered in next part

A DataFrame in Java is represented as Dataset<Row>.

This matters because the type boundary is different from normal Java domain code.

Dataset<Row> df = spark.read().parquet(inputPath);

You do not get compile-time field safety.

You need runtime schema checks, contract tests, and explicit mapping at boundaries.


3. SparkSession as the Application Boundary

Most Spark Java jobs start with SparkSession.

SparkSession spark = SparkSession.builder()
    .appName("case-sla-daily-metrics")
    .getOrCreate();

Production rule:

SparkSession belongs at the infrastructure boundary, not inside transformation logic.

Bad:

public class CaseTransform {
    public Dataset<Row> run(String inputPath) {
        SparkSession spark = SparkSession.builder().getOrCreate();
        return spark.read().parquet(inputPath).filter("status = 'OPEN'");
    }
}

Better:

public final class CaseTransform {
    public Dataset<Row> transform(Dataset<Row> events, Dataset<Row> calendar) {
        return events
            .join(calendar, events.col("business_date").equalTo(calendar.col("date")))
            .where(events.col("case_status").notEqual("DELETED"));
    }
}

The job shell owns IO. The transform owns logic.


A production Spark Java job should be boring and layered.

case-sla-spark-job/
  src/main/java/com/company/pipeline/casesla/
    CaseSlaJobMain.java
    CaseSlaJobConfig.java
    CaseSlaPlanner.java
    CaseSlaInputs.java
    CaseSlaTransform.java
    CaseSlaOutputWriter.java
    CaseSlaQualityChecks.java
    CaseSlaSchemas.java
    CaseSlaMetrics.java

Separation:

ComponentResponsibility
Mainparse args, create SparkSession, wire dependencies
Configpaths, dates, run ID, mode, resources
Plannerinput/output boundary and reference versions
Inputsread datasets and enforce schemas
Transformpure logical transformations
QualityChecksvalidation and reconciliation
OutputWriterstaged writes and publish protocol
Schemasdeclared expected schemas
Metricsrun and data metrics

Avoid one giant main method. Spark code becomes unreviewable quickly when IO, SQL strings, quality checks, and publish logic are mixed.


5. Dataset<Row> Boundary Pattern

The most common production style is:

external data -> Dataset<Row> -> validated logical schema -> transforms -> output Dataset<Row>

Example:

public final class CaseSlaInputs {
    public Dataset<Row> readCaseEvents(SparkSession spark, String path) {
        Dataset<Row> raw = spark.read().parquet(path);
        SchemaAssertions.requireColumns(raw.schema(), List.of(
            "case_id",
            "event_id",
            "event_type",
            "event_time",
            "business_date",
            "owning_unit",
            "payload_version"
        ));
        return raw;
    }
}

Do not trust files just because they are Parquet.

Validate:

  • required columns,
  • column types,
  • nullability expectation,
  • enum domains,
  • timestamp timezone policy,
  • version columns,
  • uniqueness keys,
  • partition columns.

The schema is a contract boundary.


6. Typed Dataset Boundary Pattern

For some transformations, typed Dataset<T> can clarify intent.

public record CaseEvent(
    String caseId,
    String eventId,
    String eventType,
    Timestamp eventTime,
    Date businessDate,
    String owningUnit
) implements Serializable {}

Convert with encoder:

Encoder<CaseEvent> encoder = Encoders.bean(CaseEventBean.class);
Dataset<CaseEventBean> events = raw.as(encoder);

In Java, records are not always as frictionless with Spark encoders as ordinary JavaBeans depending on Spark version and encoder path. A pragmatic approach is often:

Dataset<Row> for large relational transforms
small typed domain objects at edges/tests
explicit mappers for critical boundaries

Use typed datasets when they improve correctness, not because they feel more Java-like.


7. Transformation Design: Prefer Relational Primitives

Spark SQL/DataFrame operations are optimized by Spark's query engine.

Prefer:

Dataset<Row> filtered = events
    .where(col("event_type").isin("CASE_OPENED", "CASE_ESCALATED"))
    .select(
        col("case_id"),
        col("event_id"),
        col("event_time"),
        col("business_date"),
        col("owning_unit")
    );

Be careful with Java UDFs for simple logic.

Bad:

spark.udf().register("normalizeStatus", (String s) -> normalize(s), DataTypes.StringType);
Dataset<Row> out = events.withColumn("status_norm", callUDF("normalizeStatus", col("status")));

If equivalent built-in expressions exist, prefer them:

Dataset<Row> out = events.withColumn(
    "status_norm",
    upper(trim(col("status")))
);

Why?

  • built-ins are visible to optimizer,
  • Java UDFs can be black boxes,
  • serialization overhead may increase,
  • null behavior can be surprising,
  • predicate pushdown may be lost.

Use UDFs when business logic truly cannot be expressed otherwise, and isolate them.


8. Avoid collect() as a Control-Flow Escape Hatch

collectAsList() moves distributed data to the driver.

Bad:

List<Row> rows = events.collectAsList();
for (Row row : rows) {
    callExternalApi(row);
}

This breaks the distributed model and can crash the driver.

Allowed cases:

  • tiny config dataset,
  • quality summary with known bounded size,
  • partition manifest list with explicit cap,
  • debugging in local tests.

Safer pattern for small reference data:

List<Row> units = unitMapping.limit(10_000).collectAsList();

Still enforce a limit and fail if unexpectedly large.

Driver memory should never be the hidden sink of an unbounded dataset.


9. Join Pattern: Design Before Code

Joins are where Spark jobs often become expensive or wrong.

Before writing a join, answer:

Join keys:
  Are keys stable, normalized, and unique on the reference side?

Cardinality:
  one-to-one, one-to-many, many-to-many?

Temporal semantics:
  latest reference or effective-at-event-time?

Missing reference:
  reject, quarantine, default, or preserve null?

Duplicate reference:
  fail, pick latest, aggregate, or explode?

Size:
  can one side be broadcast?

Skew:
  do some keys dominate?

Example safe join shape:

Dataset<Row> reference = orgUnits
    .where(col("valid_from").leq(col("event_time")))
    .where(col("valid_to").isNull().or(col("valid_to").gt(col("event_time"))));

But temporal joins in batch often need careful interval logic.

Common pattern:

Dataset<Row> joined = events.alias("e")
    .join(orgUnits.alias("o"),
        col("e.owning_unit").equalTo(col("o.unit_code"))
            .and(col("e.event_time").geq(col("o.valid_from")))
            .and(col("o.valid_to").isNull().or(col("e.event_time").lt(col("o.valid_to")))),
        "left");

Then explicitly route missing matches:

Dataset<Row> unresolved = joined.where(col("o.unit_id").isNull());
Dataset<Row> accepted = joined.where(col("o.unit_id").isNotNull());

Do not silently default unresolved joins to “UNKNOWN” unless the contract says that is valid.


10. Broadcast Join Pattern

For small reference data, broadcast can avoid shuffle.

Dataset<Row> enriched = events.join(
    functions.broadcast(orgUnits),
    events.col("owning_unit").equalTo(orgUnits.col("unit_code")),
    "left"
);

Use when:

  • reference data is small enough for executor memory,
  • reference side is reused heavily,
  • join key is clean,
  • no major temporal explosion.

Do not broadcast because “it is faster” without measuring.

Risks:

  • executor memory pressure,
  • stale assumption as reference grows,
  • long serialization/deserialization cost,
  • driver planning pressure.

Production guard:

long refCount = orgUnits.count();
if (refCount > config.maxBroadcastRows()) {
    throw new IllegalStateException("Reference dataset too large for broadcast: " + refCount);
}

A better guard includes estimated size, not just row count.


11. Skew Handling Pattern

Data skew occurs when some keys are much larger than others.

Symptoms:

  • one task runs much longer,
  • shuffle spill increases,
  • executor OOM,
  • job stuck at 99%,
  • uneven partition sizes.

Example:

case_id = "UNKNOWN" appears in 40% of records
owning_unit = "CENTRAL" owns 60% of events
tenant_id = "large-bank-1" dominates all others

Approaches:

PatternUse CaseTrade-off
Filter bad keyinvalid placeholder valuesrequires quarantine policy
Salt hot keylarge aggregation/join hot keymore complex recombine
Split large tenanttenant-isolated computeorchestration complexity
Broadcast small sidesmall dimension joinmemory risk
Pre-aggregatereduce data before shuffleonly safe for associative operations
Repartition by composite keybetter distributionmay change downstream layout

Salting example:

Dataset<Row> saltedEvents = events.withColumn(
    "salt",
    pmod(abs(hash(col("event_id"))), lit(16))
);

Dataset<Row> saltedRef = ref.crossJoin(
    spark.range(0, 16).toDF("salt")
);

Dataset<Row> joined = saltedEvents.join(
    saltedRef,
    saltedEvents.col("key").equalTo(saltedRef.col("key"))
        .and(saltedEvents.col("salt").equalTo(saltedRef.col("salt")))
);

Use salting carefully. It changes the shape of the plan and must be documented.


12. Aggregation Pattern

Aggregations must define key, window, null policy, and duplicate policy.

Example:

Dataset<Row> metrics = events
    .where(col("event_type").equalTo("CASE_ESCALATED"))
    .groupBy(col("business_date"), col("owning_unit"))
    .agg(
        count(lit(1)).alias("escalation_count"),
        countDistinct(col("case_id")).alias("distinct_case_count"),
        max(col("event_time")).alias("last_event_time")
    );

Review questions:

  • Are duplicate events already deduped?
  • Is business_date event-derived or ingestion-derived?
  • Are null owning units excluded, grouped, or quarantined?
  • Is countDistinct acceptable at this scale?
  • Does output require zero rows for missing groups?
  • Does metric need restatement when late data arrives?

A metric is a contract, not just an aggregation expression.


13. Window-Like Batch Computation

Batch jobs often compute rolling windows:

7-day breach count
30-day case reopen rate
monthly escalation ratio
business-day SLA age

In batch, window semantics must still be explicit.

Example:

WindowSpec w = Window
    .partitionBy(col("case_id"))
    .orderBy(col("event_time"))
    .rowsBetween(Window.unboundedPreceding(), Window.currentRow());

Dataset<Row> withSeq = events.withColumn("event_sequence", row_number().over(w));

Window functions are powerful but can be expensive:

  • they often require sort,
  • partition skew hurts,
  • large partitions spill,
  • ordering tie-breakers must be deterministic.

Always include stable tie-breakers:

.orderBy(col("event_time"), col("source_position"), col("event_id"))

Do not order only by timestamp if multiple events can share the same timestamp.


14. Deduplication Pattern

Spark dedupe must match business semantics.

Naive:

Dataset<Row> deduped = events.dropDuplicates("event_id");

Better when source can resend same event ID with changed payload:

WindowSpec latestByEvent = Window
    .partitionBy(col("event_id"))
    .orderBy(col("source_commit_time").desc(), col("source_position").desc());

Dataset<Row> deduped = events
    .withColumn("rn", row_number().over(latestByEvent))
    .where(col("rn").equalTo(1))
    .drop("rn");

But this encodes a rule: latest wins.

Other possible rules:

  • first wins,
  • highest source version wins,
  • exact payload duplicate only,
  • correction event supersedes original,
  • duplicate is error if payload differs.

Production dedupe requires a policy table, not just dropDuplicates.


15. Write Pattern: Stage Before Publish

Spark makes it easy to write output:

metrics.write().mode(SaveMode.Overwrite).parquet(outputPath);

That is not enough for production.

Prefer:

String stagingPath = config.stagingPath() + "/run_id=" + config.runId();

metrics.write()
    .mode(SaveMode.ErrorIfExists)
    .parquet(stagingPath);

qualityChecks.validate(stagingPath);
publisher.publish(stagingPath, config.finalOutputBoundary());

Why avoid direct overwrite?

  • consumers may see partial files,
  • overwrite semantics differ by filesystem/table format,
  • retry can delete valid prior output,
  • crash during write leaves ambiguous state,
  • validation should happen before publish.

Use direct table writes only when the table format and commit protocol provide the required atomicity.


16. Save Modes and Their Risks

Spark DataFrameWriter supports modes such as append, overwrite, ignore, and error-if-exists.

Production interpretation:

ModeUse Carefully ForMain Risk
Appendimmutable facts, event logsduplicates on retry
Overwriteowned partition/table replacementdestructive if boundary wrong
ErrorIfExistsstaging run outputgood retry guard, but needs cleanup policy
Ignorerarely appropriatecan hide failures

A strong rule:

Never use overwrite unless the output boundary is explicit and validated.

For partition overwrite, configure and test semantics carefully. Static overwrite and dynamic partition overwrite can have very different blast radius depending on Spark configuration and table/catalog behavior.


17. Partitioned Writes

Example:

metrics.write()
    .mode(SaveMode.Append)
    .partitionBy("business_date")
    .parquet(stagingPath);

Partitioning should reflect:

  • common read filters,
  • recompute boundary,
  • correction boundary,
  • retention boundary,
  • output ownership.

Bad partitioning:

partitionBy(case_id)

Usually creates extreme partition explosion.

Better:

partitionBy(business_date)
partitionBy(tenant_id, business_date) only if tenant/date is a real operational boundary

But even tenant/date can create too many small partitions for many tenants.

Measure:

rows per partition
files per partition
average file size
largest partition size
smallest partition size

Partitioning is both physical layout and operational semantics.


18. Repartition, Coalesce, and Shuffle Awareness

repartition causes a shuffle.

Dataset<Row> repartitioned = metrics.repartition(col("business_date"));

coalesce reduces partitions without full shuffle in many cases.

Dataset<Row> compact = metrics.coalesce(32);

Patterns:

OperationUse WhenRisk
repartition(n)need parallelism/distributionexpensive shuffle
repartition(col)align by key before join/writeskew possible
coalesce(n)reduce output file countcan create uneven partitions
no changeengine plan is sufficientmay create too many/few files

Do not randomly sprinkle repartition(200).

Ask:

Am I solving input parallelism, shuffle distribution, output file size, or join correctness?

Each has a different answer.


19. Caching and Persistence

Spark transformations are lazy. Reusing the same expensive dataset may recompute it.

Dataset<Row> normalized = normalize(events).persist(StorageLevel.MEMORY_AND_DISK());

Dataset<Row> metricsA = computeMetricA(normalized);
Dataset<Row> metricsB = computeMetricB(normalized);

// trigger actions
metricsA.count();
metricsB.count();

normalized.unpersist();

Use persistence when:

  • reused multiple times,
  • expensive to compute,
  • stable intermediate after filtering/projection,
  • enough memory/disk exists.

Avoid caching:

  • huge raw datasets used once,
  • data with many columns when only few are needed later,
  • as a blind fix for slow jobs,
  • without unpersisting long-lived jobs.

Persistence is a resource trade-off, not a magic performance switch.


20. Explain Plans as Engineering Artifacts

Spark can explain execution plans.

metrics.explain(true);

Review the plan for:

  • unexpected Cartesian product,
  • repeated scans,
  • unpushed filters,
  • huge shuffles,
  • unnecessary sorts,
  • broadcast decisions,
  • skew symptoms,
  • wide dependencies.

For important pipelines, store plan snapshots during review or CI for critical transforms.

A top engineer does not only ask “did it run?”. They ask:

What physical work did the engine have to perform?


21. Null Semantics

Spark SQL null semantics are not the same as Java Optional semantics.

Examples:

where(col("status").notEqual("DELETED"))

Rows with null status may not behave as a business user expects.

Safer:

where(col("status").isNotNull()
    .and(col("status").notEqual("DELETED")))

Or explicitly route nulls:

Dataset<Row> invalid = events.where(col("status").isNull());
Dataset<Row> valid = events.where(col("status").isNotNull());

The data contract should define:

  • nullable fields,
  • null meaning,
  • default policy,
  • quarantine policy,
  • metric impact.

Null is not “nothing”. In pipelines, null is often an unhandled business state.


22. Timezone and Timestamp Pattern

Spark jobs often fail subtly around time.

Rules:

  • set session timezone explicitly,
  • parse timestamps with known format,
  • distinguish event time, ingestion time, business date,
  • avoid host default timezone,
  • store UTC instants and derived business date separately when needed,
  • test daylight-saving and boundary cases even if your primary timezone has no DST.

Example:

SparkSession spark = SparkSession.builder()
    .appName("case-sla-daily-metrics")
    .config("spark.sql.session.timeZone", "UTC")
    .getOrCreate();

Then derive business date explicitly:

Dataset<Row> withBusinessDate = events.withColumn(
    "business_date_jakarta",
    to_date(from_utc_timestamp(col("event_time_utc"), "Asia/Jakarta"))
);

Do not let timestamps inherit accidental environment behavior.


23. External Side Effects from Spark Executors

Avoid calling external services inside Spark transformations.

Bad:

Dataset<Row> enriched = events.map(row -> {
    String risk = riskApi.lookup(row.getAs("case_id"));
    return RowFactory.create(row.getAs("case_id"), risk);
}, rowEncoder);

Problems:

  • executor retries duplicate calls,
  • no global rate limit,
  • partial side effects,
  • non-deterministic output,
  • API outage fails distributed tasks,
  • credentials on executors,
  • hard-to-debug latency.

Better patterns:

  • pre-extract reference data as dataset,
  • join inside Spark,
  • use separate ingestion pipeline for API source,
  • materialize external lookup table with version,
  • use controlled enrichment service outside Spark if request/response semantics are needed.

Spark transform should be mostly pure.


24. Quality Checks in Spark

Quality checks should produce structured output, not only throw exceptions.

Example checks:

public final class CaseSlaQualityChecks {
    public List<QualityResult> validate(Dataset<Row> metrics) {
        List<QualityResult> results = new ArrayList<>();

        long negativeDurations = metrics
            .where(col("sla_age_minutes").lt(0))
            .count();

        results.add(QualityResult.count(
            "negative_sla_age_minutes",
            negativeDurations,
            negativeDurations == 0 ? QualityStatus.PASS : QualityStatus.FAIL
        ));

        return results;
    }
}

But be careful: every count() is an action and can trigger a job.

For many checks, compute them in one pass:

Dataset<Row> summary = metrics.agg(
    count(lit(1)).alias("rows"),
    sum(when(col("sla_age_minutes").lt(0), 1).otherwise(0)).alias("negative_sla_age"),
    sum(when(col("owning_unit").isNull(), 1).otherwise(0)).alias("missing_owner")
);

Design quality checks as Spark workloads too.


25. Reconciliation in Spark

A common reconciliation pattern:

Dataset<Row> inputSummary = input.groupBy(col("business_date"))
    .agg(count(lit(1)).alias("input_count"));

Dataset<Row> outputSummary = output.groupBy(col("business_date"))
    .agg(sum(col("case_count")).alias("output_case_count"));

Dataset<Row> recon = inputSummary.join(outputSummary, "business_date");

But count reconciliation is rarely enough.

Add:

  • duplicate count,
  • rejected count,
  • accepted count,
  • checksum over business keys,
  • sum of measures,
  • partition completeness,
  • min/max event time,
  • latest input source position.

A robust output row should carry lineage columns:

run_id
job_version
input_snapshot_id
transform_version
published_at
source_min_event_time
source_max_event_time

Consumers should be able to explain where output came from.


26. Testing Spark Java Transforms

Keep transform methods pure enough to test locally.

Example test shape:

@Test
void computesDailyEscalationCount() {
    SparkSession spark = localSpark();

    Dataset<Row> events = spark.createDataFrame(List.of(
        RowFactory.create("c1", "CASE_ESCALATED", Date.valueOf("2026-07-03")),
        RowFactory.create("c2", "CASE_OPENED", Date.valueOf("2026-07-03")),
        RowFactory.create("c3", "CASE_ESCALATED", Date.valueOf("2026-07-03"))
    ), CaseSlaSchemas.eventsSchema());

    Dataset<Row> out = new CaseSlaTransform().dailyEscalations(events);

    List<Row> rows = out.collectAsList();
    assertThat(rows).hasSize(1);
    assertThat(rows.get(0).getAs("escalation_count")).isEqualTo(2L);
}

Test categories:

  • schema tests,
  • null behavior,
  • duplicate handling,
  • temporal boundary,
  • join missing reference,
  • aggregation correctness,
  • deterministic ordering,
  • output partition columns,
  • quality violation routing.

Do not test only happy path row counts.


27. Golden Dataset Pattern

A golden dataset is a small but realistic input/output fixture.

Structure:

src/test/resources/golden/case-sla/v1/
  input/case_events.jsonl
  input/sla_calendar.jsonl
  expected/case_sla_daily_metrics.jsonl
  manifest.yaml

Manifest:

scenario: late escalation after reassignment
business_date: 2026-07-03
expectations:
  rows: 2
  no_negative_duration: true
  quarantined_records: 1

Golden datasets should include edge cases:

  • duplicate event,
  • late event,
  • missing reference,
  • null optional field,
  • invalid enum,
  • correction event,
  • boundary timestamp,
  • multi-tenant split,
  • closed period correction.

Golden tests prevent “small refactor” from silently changing semantics.


28. Configuration Pattern

Spark config comes from multiple layers:

application parameters
Spark submit arguments
Spark session config
cluster defaults
catalog/table config
job-specific runtime config

Separate business parameters from engine parameters.

Business parameters:

--run-id=run-20260704-010000
--business-date=2026-07-03
--input-snapshot=snap-123
--mode=REPLACE_PARTITION

Engine parameters:

--conf spark.executor.instances=20
--conf spark.executor.memory=8g
--conf spark.sql.shuffle.partitions=800

Do not let business logic read arbitrary Spark configs directly.

Use typed config:

public record CaseSlaJobConfig(
    String runId,
    LocalDate businessDate,
    String inputPath,
    String stagingPath,
    String outputPath,
    BatchMode mode,
    int maxBroadcastRows
) {}

29. Packaging and Submission

A Spark Java job is commonly packaged as a fat/shaded jar or distribution artifact.

Production concerns:

  • dependency conflicts,
  • Spark/Scala binary compatibility,
  • logging bindings,
  • connector versions,
  • cloud storage libraries,
  • serialization compatibility,
  • reproducible build,
  • artifact provenance,
  • config externalization.

Example submit shape:

spark-submit \
  --class com.company.pipeline.casesla.CaseSlaJobMain \
  --master yarn \
  --deploy-mode cluster \
  --conf spark.sql.session.timeZone=UTC \
  --conf spark.sql.shuffle.partitions=800 \
  case-sla-spark-job.jar \
  --run-id run-20260704-010000 \
  --business-date 2026-07-03 \
  --input-path s3://raw/case-events/snapshot=snap-123 \
  --staging-path s3://stage/case-sla/run_id=run-20260704-010000 \
  --output-path s3://curated/case-sla

Treat submit command as deployable infrastructure, not a wiki snippet.


30. Spark Job Lifecycle

A production Spark batch run follows the same lifecycle from Part 049:

Spark executes the heavy computation. Your pipeline architecture controls visibility and evidence.


31. Error Handling Pattern

Different failures require different responses.

FailureExampleResponse
Config errorinvalid datefail before Spark work
Schema errorrequired column missingfail input validation
Data quality errorinvalid statusquarantine or fail by policy
Resource errorexecutor OOMtune/split/retry after diagnosis
External storage errorstaging write failedinspect staging and retry/cleanup
Unknown publishtimeout after commitrecovery inspection
Logic bugwrong metric formulablock publish/backfill fix

Do not hide all exceptions behind “Spark job failed”. The failure class determines safe retry.


32. Observability Pattern

Spark UI is useful, but not enough.

Emit pipeline-level metrics:

spark_job_run_status
input_rows_by_dataset
output_rows_by_partition
quarantined_rows_by_reason
quality_check_status
reconciliation_status
publish_status
runtime_seconds_by_stage

Emit data-level logs:

{
  "runId": "run-20260704-010000",
  "jobName": "case-sla-daily-metrics",
  "businessDate": "2026-07-03",
  "inputRows": 18203121,
  "outputRows": 3812,
  "quarantinedRows": 29,
  "status": "PUBLISHED"
}

Store:

  • Spark application ID,
  • run ID,
  • input snapshot,
  • output version,
  • commit ID,
  • quality results,
  • reconciliation summary.

The Spark application ID tells you how execution ran. The pipeline run ID tells you what truth was produced.


33. Performance Design Checklist

Before tuning blindly, inspect the shape.

Ask:

  • Is the input pruned by partition and projection?
  • Are filters pushed down?
  • Are joins using expected strategy?
  • Is one key skewed?
  • Are shuffle partitions appropriate?
  • Are output files too small?
  • Are expensive UDFs blocking optimization?
  • Is data cached only when reused?
  • Is count() called too many times for validation?
  • Are wide transformations unavoidable?
  • Are reference datasets accidentally huge?

Tuning after a bad logical design only makes the wrong plan slightly faster.


34. Security and Sensitive Data Boundary

Spark jobs often have broad access.

Design rules:

  • read only required columns,
  • drop sensitive fields early when not needed,
  • avoid logging row payloads,
  • classify output dataset sensitivity,
  • separate raw/staging/curated access,
  • ensure staging paths have retention and access policy,
  • mask/tokenize before broad publication,
  • record data classification in manifest.

Bad:

logger.info("bad row: {}", row);

Better:

logger.warn("invalid row: runId={}, reason={}, eventIdHash={}",
    runId,
    reason,
    hash(row.getAs("event_id"))
);

Data pipelines leak data through logs, staging, metrics labels, samples, and exception messages. Control all of them.


35. Case Study: Case SLA Daily Metrics Spark Job

Input datasets:

case_events_raw
sla_calendar
org_unit_history
case_priority_history

Output dataset:

case_sla_daily_metrics

High-level code shape:

public final class CaseSlaJobMain {
    public static void main(String[] args) {
        CaseSlaJobConfig config = CaseSlaJobConfig.parse(args);

        SparkSession spark = SparkSession.builder()
            .appName("case-sla-daily-metrics")
            .config("spark.sql.session.timeZone", "UTC")
            .getOrCreate();

        CaseSlaInputs inputs = new CaseSlaInputs();
        Dataset<Row> events = inputs.readEvents(spark, config.eventsPath());
        Dataset<Row> calendar = inputs.readCalendar(spark, config.calendarPath());
        Dataset<Row> orgUnits = inputs.readOrgUnits(spark, config.orgUnitPath());

        CaseSlaTransform transform = new CaseSlaTransform();
        Dataset<Row> metrics = transform.compute(events, calendar, orgUnits, config.businessDate());

        CaseSlaQualityChecks quality = new CaseSlaQualityChecks();
        QualityReport report = quality.validate(metrics);
        report.throwIfBlocking();

        new CaseSlaOutputWriter().writeStaging(metrics, config.stagingPath());
        new CaseSlaPublisher().publish(config);
    }
}

Transform sketch:

public Dataset<Row> compute(
    Dataset<Row> events,
    Dataset<Row> calendar,
    Dataset<Row> orgUnits,
    LocalDate businessDate
) {
    Dataset<Row> relevant = events
        .where(col("business_date").equalTo(lit(Date.valueOf(businessDate))))
        .where(col("event_type").isin("CASE_OPENED", "CASE_ESCALATED", "CASE_RESOLVED"));

    Dataset<Row> deduped = dedupeEvents(relevant);
    Dataset<Row> enriched = enrichOrgUnit(deduped, orgUnits);
    Dataset<Row> aged = computeSlaAge(enriched, calendar);

    return aged.groupBy(col("business_date"), col("owning_unit"), col("priority"))
        .agg(
            countDistinct(col("case_id")).alias("case_count"),
            sum(when(col("sla_breached"), 1).otherwise(0)).alias("breach_count"),
            avg(col("sla_age_minutes")).alias("avg_sla_age_minutes")
        )
        .withColumn("run_id", lit("${RUN_ID_REPLACED_BY_CALLER}"));
}

In real code, avoid hidden run ID substitution inside transform. Pass run metadata explicitly at output boundary.


36. Common Spark Java Anti-Patterns

Anti-Pattern 1: Giant Main Method

Fix: separate IO, transform, quality, publish.

Anti-Pattern 2: collectAsList() for Real Data

Fix: keep data distributed; collect only bounded summaries.

Anti-Pattern 3: Java UDF for Everything

Fix: use built-in SQL/DataFrame functions where possible.

Anti-Pattern 4: Direct Overwrite of Final Path

Fix: staging + validation + atomic publish.

Anti-Pattern 5: Join Without Cardinality Contract

Fix: validate uniqueness and route missing/duplicate references.

Anti-Pattern 6: Timestamp Defaults

Fix: set session timezone and derive business dates explicitly.

Anti-Pattern 7: Blind Repartition

Fix: understand whether you need parallelism, distribution, or file control.

Anti-Pattern 8: No Manifest

Fix: record run ID, input boundary, output boundary, Spark app ID, schema version, quality result, reconciliation result.


37. Production Review Checklist

Before merging a Spark Java pipeline, ask:

  • Is business logic separated from Spark job bootstrap?
  • Are input schemas validated?
  • Are output schemas explicitly declared?
  • Does the job use a run ID?
  • Are input and output boundaries explicit?
  • Is the transform deterministic?
  • Are reference datasets versioned or temporally scoped?
  • Are joins reviewed for cardinality and missing-reference policy?
  • Are duplicates handled by a documented rule?
  • Are null semantics explicit?
  • Is timezone configured?
  • Are UDFs justified?
  • Are collect() calls bounded?
  • Are writes staged before publish?
  • Is overwrite scoped to owned partitions only?
  • Are quality checks efficient and meaningful?
  • Are reconciliation results stored?
  • Are Spark app ID and pipeline run ID linked?
  • Is small-file output measured?
  • Does the job support shadow/backfill mode?
  • Is sensitive data controlled in logs, staging, and output?

Spark is powerful, but power without boundaries creates expensive ambiguity.


38. The Mental Model to Keep

A Spark Java pipeline has two layers:

Logical data pipeline layer:
  contracts, input boundary, transform semantics, validation, publish, evidence

Spark execution layer:
  distributed plan, partitioning, shuffle, caching, tasks, executors, storage

Top-tier engineers keep both layers visible.

They do not write Spark code as if it were normal Java collections.

They do not trust a green Spark job as proof of data correctness.

They design Spark jobs as deterministic, staged, observable, reproducible computation units inside a larger pipeline architecture.


References

  • Apache Spark documentation: Spark is a unified analytics engine for large-scale data processing with high-level APIs in Java, Scala, Python, and R.
  • Spark SQL, DataFrames, and Datasets guide: the DataFrame API is available in Java, and in Java a DataFrame is represented as a Dataset<Row>.
  • Spark JavaDoc: Dataset<T> is a strongly typed collection of domain-specific objects that can be transformed in parallel.
  • Spark Structured Streaming documentation: Structured Streaming is built on Spark SQL engine and is covered separately in the next part.
Lesson Recap

You just completed lesson 50 in deepen practice. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.

Continue The Track

Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.