Custom Collectors: Correctness, Associativity, and Parallel Safety
Learn Java Array, Collections, Iterator/Iterable, Stream - Part 025
Deep dive into custom Java Collectors: collector laws, identity, associativity, combiner correctness, finisher semantics, characteristics, thread confinement, parallel safety, testing strategies, and production failure modeling.
Part 025 — Custom Collectors: Correctness, Associativity, and Parallel Safety
Target: setelah bagian ini, kamu mampu membuat custom
Collectoryang benar secara sequential dan parallel, memahami hukum identity/associativity, memilihCharacteristicsdengan aman, menulis combiner yang tidak sekadar “agar compile”, serta menguji collector agar tidak rusak saat stream dipartisi, dieksekusi paralel, atau digunakan di downstream collector.
Custom collector adalah salah satu area Stream API yang terlihat sederhana tetapi sering menjadi sumber bug halus.
Contoh signature-nya tampak ringan:
Collector<T, A, R>
Tetapi di balik itu ada kontrak formal:
T = input element type
A = mutable accumulation state
R = final result type
Collector yang benar harus menjawab:
- bagaimana state dibuat?
- bagaimana satu elemen masuk ke state?
- bagaimana dua state parsial digabung?
- bagaimana state akhir diubah menjadi result?
- apakah hasil bergantung pada order?
- apakah state boleh diakumulasi dari beberapa thread?
- apakah final result sama object-nya dengan accumulation state?
Kalau collector dipakai hanya pada stream sequential, bug combiner sering tidak terlihat. Begitu pipeline menjadi parallel, collector dipakai sebagai downstream groupingBy, atau source dipartisi berbeda, hasil bisa salah.
Mental model:
A custom collector is not a loop callback.
It is a reduction algebra.
1. Posisi Part Ini dalam Framework Kaufman
Kaufman-style simplification:
Do not start by writing Collector.of(...).
Start by defining the algebra of your aggregation.
Pertanyaan paling penting:
Can partial results be combined without changing meaning?
Kalau jawabannya tidak jelas, jangan buat custom collector yang mengklaim parallel-safe.
2. Kapan Custom Collector Diperlukan?
Banyak kasus tidak membutuhkan custom collector.
Gunakan built-in collector jika cukup:
Map<CustomerId, List<Order>> byCustomer = orders.stream()
.collect(Collectors.groupingBy(Order::customerId));
Gunakan composition jika cukup:
Map<CustomerId, Long> countByCustomer = orders.stream()
.collect(Collectors.groupingBy(
Order::customerId,
Collectors.counting()
));
Gunakan collectingAndThen jika hanya butuh final transform:
List<OrderDto> result = orders.stream()
.map(OrderDto::from)
.collect(Collectors.collectingAndThen(
Collectors.toList(),
List::copyOf
));
Custom collector layak ketika:
- ada state accumulation yang tidak natural dengan built-in collector
- kamu butuh result object domain-specific
- kamu butuh merge policy yang kompleks
- kamu butuh diagnostics, bukan hanya value akhir
- kamu butuh menggabungkan beberapa aggregation dalam satu pass
- kamu butuh downstream collector reusable
- kamu bisa mendefinisikan combiner yang benar
Contoh valid:
ValidationReport report = records.stream()
.collect(ValidationReport.collector(rules));
Contoh mencurigakan:
List<OrderDto> result = orders.stream()
.collect(myCustomListCollector());
Kalau hanya membuat list, jangan buat custom collector.
Rule:
Custom collector is justified when it captures a reusable aggregation contract,
not when it merely hides a simple stream pipeline.
3. Collector Anatomy
Collector terdiri dari empat fungsi utama.
Collector<T, A, R> collector = Collector.of(
supplier,
accumulator,
combiner,
finisher,
characteristics
);
Secara konseptual:
supplier -> A
accumulator -> (A, T) -> void
combiner -> (A, A) -> A
finisher -> A -> R
Sequential mental execution:
A state = supplier.get();
for (T element : source) {
accumulator.accept(state, element);
}
R result = finisher.apply(state);
Parallel mental execution:
A left = supplier.get();
A right = supplier.get();
// partition 1
accumulator.accept(left, t1);
accumulator.accept(left, t2);
// partition 2
accumulator.accept(right, t3);
accumulator.accept(right, t4);
A combined = combiner.apply(left, right);
R result = finisher.apply(combined);
Karena stream library bebas mempartisi input, collector tidak boleh bergantung pada satu urutan internal yang hanya berlaku di loop sequential.
4. T, A, dan R: Tiga Type yang Harus Dipisahkan
Banyak bug collector dimulai dari tidak membedakan A dan R.
Contoh:
Collector<Order, OrderReportBuilder, OrderReport>
Order= input elementOrderReportBuilder= mutable accumulation stateOrderReport= immutable final result
Ini sehat.
Bandingkan:
Collector<Order, List<OrderDto>, List<OrderDto>>
Di sini A dan R sama. Ini bisa valid jika hasil boleh mutable, atau jika collector memakai IDENTITY_FINISH.
Tetapi untuk API boundary yang aman, biasanya lebih baik:
Collector<Order, List<OrderDto>, List<OrderDto>>
with finisher:
List::copyOf
atau:
Collector<Order, ArrayList<OrderDto>, List<OrderDto>>
Mental model:
A is working state.
R is published state.
Do not publish working state unless you intentionally allow mutation.
5. Collector Laws: Identity and Associativity
Custom collector harus memenuhi dua hukum besar:
- identity
- associativity
5.1 Identity Law
Untuk state parsial apa pun a, menggabungkan dengan empty state harus equivalent dengan a.
A a = partiallyAccumulatedState();
A empty = supplier.get();
A combined = combiner.apply(a, empty);
Harus equivalent dengan:
a
Contoh benar:
(left, right) -> {
left.addAll(right);
return left;
}
Untuk list:
[a, b] + [] = [a, b]
[] + [a, b] = [a, b]
Contoh salah:
(left, right) -> {
left.add(null);
left.addAll(right);
return left;
}
Karena:
[a, b] + [] = [a, b, null]
Identity rusak.
5.2 Associativity Law
Partitioning tidak boleh mengubah makna hasil.
Sequential:
A a1 = supplier.get();
accumulator.accept(a1, t1);
accumulator.accept(a1, t2);
R r1 = finisher.apply(a1);
Split:
A a2 = supplier.get();
accumulator.accept(a2, t1);
A a3 = supplier.get();
accumulator.accept(a3, t2);
R r2 = finisher.apply(combiner.apply(a2, a3));
Harus equivalent:
r1 == r2, or r1.equals(r2), or equivalent under UNORDERED semantics
Contoh benar: sum integer dengan caveat overflow semantics tetap deterministik untuk operasi integer Java.
record SumBox(int[] value) {}
Namun lebih umum:
class LongSum {
long value;
}
Combiner:
(left, right) -> {
left.value += right.value;
return left;
}
Contoh salah: average dengan menggabungkan average tanpa count.
class BadAverage {
double average;
}
Combiner salah:
(left, right) -> {
left.average = (left.average + right.average) / 2;
return left;
}
Kenapa salah?
average([10, 20, 30]) = 20
average(average([10]), average([20, 30])) = average(10, 25) = 17.5
State untuk average harus menyimpan minimal:
sum + count
Benar:
final class AverageState {
long count;
BigDecimal sum = BigDecimal.ZERO;
void add(BigDecimal value) {
count++;
sum = sum.add(value);
}
AverageState merge(AverageState other) {
count += other.count;
sum = sum.add(other.sum);
return this;
}
Optional<BigDecimal> finish(MathContext mathContext) {
if (count == 0) {
return Optional.empty();
}
return Optional.of(sum.divide(BigDecimal.valueOf(count), mathContext));
}
}
6. Equivalence Is Not Always equals
Untuk ordered collectors, result biasanya harus sama secara equals.
List.of("a", "b").equals(List.of("a", "b")); // true
List.of("b", "a").equals(List.of("a", "b")); // false
Untuk unordered collector, equivalence boleh mengabaikan order.
Set.of("a", "b").equals(Set.of("b", "a")); // true
Tetapi jangan salah kaprah:
Collector.Characteristics.UNORDERED
bukan berarti output bebas berantakan tanpa konsekuensi. Artinya collector tidak menjanjikan encounter order sebagai bagian dari equivalence.
Kalau output akan dipakai untuk audit, snapshot, diff, CSV, log deterministik, atau test golden file, jangan jadikan unordered kecuali kamu menstabilkan order di finisher.
Contoh deterministic finisher:
Collector<Order, ArrayList<Order>, List<Order>> sortedByIdCollector = Collector.of(
ArrayList::new,
ArrayList::add,
(left, right) -> {
left.addAll(right);
return left;
},
list -> list.stream()
.sorted(Comparator.comparing(Order::id))
.toList()
);
7. Characteristics: Jangan Asal Tambah Flag
Collector characteristics memberi tahu stream library tentang sifat collector.
Set<Collector.Characteristics> characteristics();
Karakteristik utama:
| Characteristic | Makna | Risiko Salah Pakai |
|---|---|---|
IDENTITY_FINISH | A bisa langsung dianggap R | Salah jika finisher melakukan transformasi, copy, atau wrapping |
UNORDERED | result tidak bergantung encounter order | Output nondeterministic jika sebenarnya order penting |
CONCURRENT | accumulator boleh dipanggil concurrently pada state yang sama | Data race / corruption jika state tidak concurrent-safe |
7.1 IDENTITY_FINISH
Valid:
Collector<String, List<String>, List<String>> c = Collector.of(
ArrayList::new,
List::add,
(left, right) -> {
left.addAll(right);
return left;
},
Collector.Characteristics.IDENTITY_FINISH
);
Tidak valid:
Collector<String, List<String>, List<String>> c = Collector.of(
ArrayList::new,
List::add,
(left, right) -> {
left.addAll(right);
return left;
},
List::copyOf,
Collector.Characteristics.IDENTITY_FINISH // wrong
);
Kalau IDENTITY_FINISH diset, stream runtime boleh melewati finisher karena kamu sudah menyatakan A adalah R.
Rule:
Only use IDENTITY_FINISH when finisher is truly identity and A is safely R.
7.2 UNORDERED
Valid untuk set-like result:
Collector<String, Set<String>, Set<String>> toHashSet = Collector.of(
HashSet::new,
Set::add,
(left, right) -> {
left.addAll(right);
return left;
},
Collector.Characteristics.UNORDERED,
Collector.Characteristics.IDENTITY_FINISH
);
Tidak valid untuk ordered diagnostics:
Collector<Violation, List<Violation>, List<Violation>> diagnostics = Collector.of(
ArrayList::new,
List::add,
(left, right) -> {
left.addAll(right);
return left;
},
Collector.Characteristics.UNORDERED // wrong if order matters
);
Jika diagnostics harus mengikuti input order, jangan klaim unordered.
7.3 CONCURRENT
CONCURRENT berarti result container mendukung accumulator dipanggil secara concurrent pada container yang sama.
Ini bukan berarti:
collector can be used in parallel
Non-concurrent collector tetap bisa dipakai di parallel stream karena runtime dapat membuat state terpisah per partition lalu menggabungkannya.
CONCURRENT hanya valid jika:
- accumulation container concurrent-safe
- accumulator aman dipanggil bersamaan pada state yang sama
- collector juga
UNORDERED, atau source-nya unordered - result semantics tetap benar saat interleaving
Contoh hati-hati:
Collector<Event, ConcurrentHashMap<EventType, LongAdder>, Map<EventType, Long>> c = Collector.of(
ConcurrentHashMap::new,
(map, event) -> map
.computeIfAbsent(event.type(), ignored -> new LongAdder())
.increment(),
(left, right) -> {
right.forEach((type, count) -> left
.computeIfAbsent(type, ignored -> new LongAdder())
.add(count.sum()));
return left;
},
map -> {
Map<EventType, Long> result = new EnumMap<>(EventType.class);
map.forEach((type, count) -> result.put(type, count.sum()));
return Map.copyOf(result);
},
Collector.Characteristics.UNORDERED,
Collector.Characteristics.CONCURRENT
);
Even here, ask:
Do I actually need CONCURRENT, or is normal partition+combine enough?
Most custom collectors should not declare CONCURRENT.
8. Thread Confinement for Non-Concurrent Collectors
Non-concurrent collector tidak harus thread-safe secara internal.
Ini valid:
Collector<Order, ArrayList<OrderDto>, List<OrderDto>> dtoCollector = Collector.of(
ArrayList::new,
(list, order) -> list.add(OrderDto.from(order)),
(left, right) -> {
left.addAll(right);
return left;
},
List::copyOf
);
ArrayList tidak thread-safe, tetapi collector masih bisa digunakan di parallel stream karena setiap partial result di-thread-confine selama accumulation.
Yang penting:
- jangan simpan state di static field
- jangan share accumulator antar supplier calls
- jangan capture mutable external state
- jangan return same container dari supplier setiap kali
Anti-pattern fatal:
static final List<OrderDto> SHARED = new ArrayList<>();
static Collector<Order, List<OrderDto>, List<OrderDto>> broken() {
return Collector.of(
() -> SHARED,
(list, order) -> list.add(OrderDto.from(order)),
(left, right) -> {
left.addAll(right);
return left;
}
);
}
Masalah:
supplier must create a fresh independent state.
9. Pattern 1: Domain Report Collector
Misal kita punya input:
record Payment(String id, String customerId, BigDecimal amount, String currency) {}
Kita ingin result:
record PaymentReport(
int count,
BigDecimal totalAmount,
Set<String> currencies,
List<String> invalidPaymentIds
) {}
Collector state:
final class PaymentReportState {
int count;
BigDecimal totalAmount = BigDecimal.ZERO;
final Set<String> currencies = new LinkedHashSet<>();
final List<String> invalidPaymentIds = new ArrayList<>();
void add(Payment payment) {
count++;
if (payment.amount() == null || payment.amount().signum() < 0) {
invalidPaymentIds.add(payment.id());
return;
}
totalAmount = totalAmount.add(payment.amount());
if (payment.currency() != null) {
currencies.add(payment.currency());
}
}
PaymentReportState merge(PaymentReportState other) {
count += other.count;
totalAmount = totalAmount.add(other.totalAmount);
currencies.addAll(other.currencies);
invalidPaymentIds.addAll(other.invalidPaymentIds);
return this;
}
PaymentReport finish() {
return new PaymentReport(
count,
totalAmount,
Set.copyOf(currencies),
List.copyOf(invalidPaymentIds)
);
}
}
Collector:
static Collector<Payment, PaymentReportState, PaymentReport> paymentReportCollector() {
return Collector.of(
PaymentReportState::new,
PaymentReportState::add,
PaymentReportState::merge,
PaymentReportState::finish
);
}
Usage:
PaymentReport report = payments.stream()
.collect(paymentReportCollector());
Why this is good:
- accumulation state tersembunyi
- final result immutable secara boundary
- combiner jelas
- sequential/parallel equivalent
- diagnostics ikut aggregation
- tidak expose mutable internal list/set
Caveat:
invalidPaymentIds order follows encounter order only if stream preserves encounter order.
Kalau order diagnostics harus deterministic lintas parallel execution, sorting di finisher lebih defensible.
List.copyOf(invalidPaymentIds.stream().sorted().toList())
10. Pattern 2: Top-N Collector
Top-N sering ditulis dengan sort semua data:
List<Order> top = orders.stream()
.sorted(Comparator.comparing(Order::amount).reversed())
.limit(10)
.toList();
Untuk data besar, kita bisa mempertahankan heap ukuran N.
State:
final class TopNState<T> {
private final int n;
private final Comparator<? super T> comparator;
private final PriorityQueue<T> heap;
TopNState(int n, Comparator<? super T> comparator) {
if (n < 0) {
throw new IllegalArgumentException("n must be >= 0");
}
this.n = n;
this.comparator = comparator;
this.heap = new PriorityQueue<>(comparator);
}
void add(T value) {
if (n == 0) {
return;
}
if (heap.size() < n) {
heap.add(value);
return;
}
T smallest = heap.peek();
if (comparator.compare(value, smallest) > 0) {
heap.poll();
heap.add(value);
}
}
TopNState<T> merge(TopNState<T> other) {
other.heap.forEach(this::add);
return this;
}
List<T> finishDescending() {
ArrayList<T> result = new ArrayList<>(heap);
result.sort(comparator.reversed());
return List.copyOf(result);
}
}
Factory:
static <T> Collector<T, ?, List<T>> topN(
int n,
Comparator<? super T> comparator
) {
Objects.requireNonNull(comparator, "comparator");
return Collector.of(
() -> new TopNState<T>(n, comparator),
TopNState::add,
TopNState::merge,
TopNState::finishDescending
);
}
Usage:
List<Order> top10 = orders.stream()
.collect(topN(10, Comparator.comparing(Order::amount)));
Correctness questions:
| Question | Answer |
|---|---|
| Fresh state? | yes, supplier creates new TopNState |
| Identity? | merging with empty heap changes nothing |
| Associative? | top N of merged partitions equals top N of whole set, assuming comparator total enough for domain |
| Mutable leak? | no, finisher returns List.copyOf |
| Parallel safe? | yes as non-concurrent collector, state is partition-confined |
| Ordered result? | explicitly sorted in finisher |
Tie-breaker warning:
Comparator.comparing(Order::amount)
Jika ada equal amount, output order antar equal elements bisa tidak stable. Untuk audit, tambahkan tie-breaker:
Comparator.comparing(Order::amount)
.thenComparing(Order::id)
11. Pattern 3: Duplicate Diagnostic Collector
Collectors.toMap bisa menerima merge function, tetapi kadang kamu butuh report duplicate, bukan silent merge.
Domain:
record User(String id, String email) {}
record DuplicateIndex<K, V>(
Map<K, V> uniqueValues,
Map<K, List<V>> duplicates
) {}
State:
final class DuplicateIndexState<K, V> {
private final Function<? super V, ? extends K> keyExtractor;
private final Map<K, V> unique = new LinkedHashMap<>();
private final Map<K, List<V>> duplicates = new LinkedHashMap<>();
DuplicateIndexState(Function<? super V, ? extends K> keyExtractor) {
this.keyExtractor = keyExtractor;
}
void add(V value) {
K key = keyExtractor.apply(value);
V previous = unique.putIfAbsent(key, value);
if (previous != null) {
duplicates.computeIfAbsent(key, ignored -> new ArrayList<>())
.add(value);
}
}
DuplicateIndexState<K, V> merge(DuplicateIndexState<K, V> other) {
other.unique.values().forEach(this::add);
other.duplicates.values().forEach(list -> list.forEach(this::add));
return this;
}
DuplicateIndex<K, V> finish() {
Map<K, List<V>> frozenDuplicates = new LinkedHashMap<>();
duplicates.forEach((key, values) -> frozenDuplicates.put(key, List.copyOf(values)));
return new DuplicateIndex<>(
Map.copyOf(unique),
Map.copyOf(frozenDuplicates)
);
}
}
Collector:
static <K, V> Collector<V, ?, DuplicateIndex<K, V>> indexingWithDuplicateDiagnostics(
Function<? super V, ? extends K> keyExtractor
) {
Objects.requireNonNull(keyExtractor, "keyExtractor");
return Collector.of(
() -> new DuplicateIndexState<K, V>(keyExtractor),
DuplicateIndexState::add,
DuplicateIndexState::merge,
DuplicateIndexState::finish
);
}
Usage:
DuplicateIndex<String, User> index = users.stream()
.collect(indexingWithDuplicateDiagnostics(User::email));
Important nuance:
This collector's duplicate ordering is encounter-order sensitive.
Do not declare UNORDERED unless output order is irrelevant or normalized.
Also note the semantic policy:
uniqueValues keeps first seen value.
duplicates contains later conflicting values.
This policy must be documented because another team might expect duplicates to include the first value too.
Alternative result shape:
record DuplicateIndex<K, V>(
Map<K, V> uniqueValues,
Map<K, List<V>> duplicateGroupsIncludingOriginal
) {}
There is no universally correct shape. There is only explicit policy.
12. Pattern 4: Validation Collector
Validation pipelines often need to accumulate all errors instead of fail-fast.
Domain:
record Violation(String path, String code, String message) {}
record ValidationReport(List<Violation> violations) {
boolean isValid() {
return violations.isEmpty();
}
}
Rule interface:
@FunctionalInterface
interface Rule<T> {
Optional<Violation> validate(T value);
}
State:
final class ValidationState<T> {
private final List<Rule<T>> rules;
private final List<Violation> violations = new ArrayList<>();
ValidationState(List<Rule<T>> rules) {
this.rules = List.copyOf(rules);
}
void add(T value) {
for (Rule<T> rule : rules) {
rule.validate(value).ifPresent(violations::add);
}
}
ValidationState<T> merge(ValidationState<T> other) {
violations.addAll(other.violations);
return this;
}
ValidationReport finish() {
return new ValidationReport(List.copyOf(violations));
}
}
Collector:
static <T> Collector<T, ?, ValidationReport> validating(List<Rule<T>> rules) {
Objects.requireNonNull(rules, "rules");
return Collector.of(
() -> new ValidationState<T>(rules),
ValidationState::add,
ValidationState::merge,
ValidationState::finish
);
}
Usage:
ValidationReport report = customers.stream()
.collect(validating(List.of(
customer -> customer.email() == null
? Optional.of(new Violation("email", "required", "Email is required"))
: Optional.empty(),
customer -> customer.status() == null
? Optional.of(new Violation("status", "required", "Status is required"))
: Optional.empty()
)));
Production considerations:
- if rules are expensive, avoid repeated allocation inside
add - if report must be deterministic, preserve encounter order or sort in finisher
- if validation is fail-fast, collector is the wrong abstraction
- if validation needs external IO, stream collector is usually the wrong place
13. Combiner Design
Combiner is the most commonly broken part.
13.1 Correct Combiner Shape
Typical mutable merge:
(left, right) -> {
left.addAll(right);
return left;
}
Important:
After right is combined, stream runtime will not use right again except as allowed by collector contract.
Therefore mutating left and returning it is valid.
13.2 Bad Combiner: Ignores Right
(left, right) -> left
This passes sequential tests because combiner may never run.
Parallel result loses data.
13.3 Bad Combiner: Returns New but Drops State
(left, right) -> new ArrayList<>()
Identity and associativity both broken.
13.4 Bad Combiner: Mutates Both and Returns One
(left, right) -> {
left.addAll(right);
right.clear();
return left;
}
Usually unnecessary. It may work under current constraints, but it creates no benefit and makes reasoning harder.
13.5 Bad Combiner: Side Effect to External State
List<String> global = new ArrayList<>();
(left, right) -> {
global.add("combined");
left.addAll(right);
return left;
}
Collector behavior should not depend on external side effects.
14. Finisher Design
Finisher transforms accumulation state into published result.
Common finisher responsibilities:
- immutable copy
- sorting result
- validation of final invariants
- conversion to domain object
- removing internal helper state
- computing derived values
Example:
state -> new Report(
state.count,
state.total,
List.copyOf(state.violations)
)
Avoid returning mutable accumulation state unless intended.
Anti-pattern:
state -> state.violations
This leaks the mutable internal list.
Better:
state -> List.copyOf(state.violations)
Do not put resource cleanup only in finisher if exceptions during accumulation can bypass it. Collector is not a general resource lifecycle abstraction.
Bad:
Collector<Row, WriterState, Path> c = Collector.of(
WriterState::openFile,
WriterState::write,
WriterState::merge,
WriterState::closeAndReturnPath
);
Problem:
If accumulator throws, finisher may not run.
Use explicit try-with-resources outside the stream for resource lifecycle.
15. Collector vs Stream.collect(supplier, accumulator, combiner)
There are two collect forms.
Collector object:
R result = stream.collect(myCollector);
Three-arg collect:
R result = stream.collect(
supplier,
accumulator,
combiner
);
Three-arg collect is useful for simple mutable accumulation where A == R and no finisher/characteristics are needed.
Example:
ArrayList<OrderDto> result = orders.stream()
.collect(
ArrayList::new,
(list, order) -> list.add(OrderDto.from(order)),
ArrayList::addAll
);
But if you need:
- immutable final result
- reusable operation
- downstream collector
- characteristics
- named domain abstraction
- nontrivial finisher
prefer Collector.
16. Custom Collector as Downstream Collector
Custom collector becomes more powerful when used downstream.
Example:
Map<CustomerId, ValidationReport> reportByCustomer = orders.stream()
.collect(Collectors.groupingBy(
Order::customerId,
validating(orderRules)
));
This requires your collector to be correct not only alone but also when many instances are created as downstream accumulators.
Common bug:
static <T> Collector<T, ?, ValidationReport> brokenValidating(List<Rule<T>> rules) {
ValidationState<T> shared = new ValidationState<>(rules);
return Collector.of(
() -> shared,
ValidationState::add,
ValidationState::merge,
ValidationState::finish
);
}
Every group receives same state. Catastrophic.
Correct:
() -> new ValidationState<>(rules)
Rule:
Supplier must produce a fresh state for every independent reduction.
17. Parallel Safety Does Not Mean Faster
A collector can be parallel-safe and still slower in parallel.
Reasons:
- small input
- expensive combiner
- poor splitting source
- high allocation pressure
- ordering constraints
- cache misses
- common ForkJoinPool contention
- downstream collector overhead
Example: Top-N collector combiner cost can be non-trivial:
merge partial heap by feeding every right element into left heap
If many partitions exist, combiner overhead may dominate.
Decision rule:
Correctness first.
Parallel safety second.
Parallel performance only after measurement.
Do not add parallelStream() just because collector has combiner.
18. Testing Custom Collectors
A custom collector needs more than one happy-path test.
18.1 Sequential Test
@Test
void collectsSequentially() {
PaymentReport report = payments.stream()
.collect(paymentReportCollector());
assertThat(report.count()).isEqualTo(3);
}
18.2 Parallel Equivalence Test
@Test
void sequentialAndParallelResultsAreEquivalent() {
PaymentReport sequential = payments.stream()
.collect(paymentReportCollector());
PaymentReport parallel = payments.parallelStream()
.collect(paymentReportCollector());
assertThat(parallel).isEqualTo(sequential);
}
If output order is unspecified, compare with normalized result.
assertThat(new HashSet<>(parallel.violations()))
.isEqualTo(new HashSet<>(sequential.violations()));
But for audit-grade result, prefer deterministic ordering.
18.3 Manual Partition Test
This catches combiner bugs directly.
@Test
void combinerProducesSameResultAsWholeAccumulation() {
Collector<Payment, PaymentReportState, PaymentReport> collector = paymentReportCollector();
PaymentReportState whole = collector.supplier().get();
payments.forEach(p -> collector.accumulator().accept(whole, p));
PaymentReport expected = collector.finisher().apply(whole);
PaymentReportState left = collector.supplier().get();
payments.subList(0, 2).forEach(p -> collector.accumulator().accept(left, p));
PaymentReportState right = collector.supplier().get();
payments.subList(2, payments.size()).forEach(p -> collector.accumulator().accept(right, p));
PaymentReport actual = collector.finisher().apply(
collector.combiner().apply(left, right)
);
assertThat(actual).isEqualTo(expected);
}
18.4 Empty Input Test
@Test
void handlesEmptyInput() {
PaymentReport report = Stream.<Payment>empty()
.collect(paymentReportCollector());
assertThat(report.count()).isZero();
assertThat(report.invalidPaymentIds()).isEmpty();
}
18.5 Single Element Test
@Test
void handlesSingleElement() {
Payment payment = new Payment("p1", "c1", BigDecimal.TEN, "USD");
PaymentReport report = Stream.of(payment)
.collect(paymentReportCollector());
assertThat(report.count()).isEqualTo(1);
}
18.6 Random Partition Property Test
Conceptual helper:
static <T, A, R> R collectInTwoPartitions(
List<T> input,
int split,
Collector<T, A, R> collector
) {
A left = collector.supplier().get();
for (T value : input.subList(0, split)) {
collector.accumulator().accept(left, value);
}
A right = collector.supplier().get();
for (T value : input.subList(split, input.size())) {
collector.accumulator().accept(right, value);
}
A combined = collector.combiner().apply(left, right);
return collector.finisher().apply(combined);
}
Test all split points:
for (int split = 0; split <= input.size(); split++) {
R actual = collectInTwoPartitions(input, split, collector);
assertThat(actual).isEqualTo(expected);
}
This is often more valuable than only calling parallelStream().
19. Failure Catalogue
19.1 Shared Mutable Supplier
ArrayList<T> shared = new ArrayList<>();
Collector<T, List<T>, List<T>> broken = Collector.of(
() -> shared,
List::add,
(left, right) -> {
left.addAll(right);
return left;
}
);
Failure:
- cross-call contamination
- downstream grouping corruption
- data races in parallel
19.2 Broken Combiner
(left, right) -> left
Failure:
- parallel result silently drops partition data
19.3 Wrong IDENTITY_FINISH
Collector.of(
ArrayList::new,
List::add,
(left, right) -> { left.addAll(right); return left; },
List::copyOf,
Collector.Characteristics.IDENTITY_FINISH
);
Failure:
- finisher may be skipped
- mutable result may escape
- ClassCastException possible depending shape
19.4 External Side Effects
AtomicInteger count = new AtomicInteger();
Collector<Order, List<Order>, List<Order>> broken = Collector.of(
ArrayList::new,
(list, order) -> {
count.incrementAndGet();
list.add(order);
},
(left, right) -> {
left.addAll(right);
return left;
}
);
Failure:
- count semantics separate from reduction
- retry/partition/optimization assumptions leak
- harder to reason
19.5 Order-Dependent Combiner
(left, right) -> {
right.addAll(left);
return right;
}
This reverses partition order depending on merge tree.
Failure:
- nondeterministic output order in parallel
- test flakiness
19.6 Non-Associative Math
Floating point addition is not mathematically associative.
double total = values.parallelStream()
.collect(customDoubleSum());
Potential issue:
sequential and parallel may differ in low-order bits
For financial values, prefer BigDecimal with explicit scale/rounding policy.
19.7 Resource Lifecycle in Collector
Collector<Row, CsvWriter, Path> writerCollector = ...
Failure:
- finisher is not guaranteed to run on accumulation exception
- combiner semantics for open files are often undefined
- resource ownership unclear
Use explicit resource scope.
20. Decision Matrix
| Need | Recommended Approach |
|---|---|
| collect into list | toList, toCollection, or Stream.toList() |
| collect immutable list | Stream.toList() or collectingAndThen(..., List::copyOf) depending mutability/null policy needed |
| group by key | groupingBy |
| index by unique key | toMap with explicit merge policy |
| report duplicates | custom collector or explicit loop |
| top N | custom collector or explicit heap loop |
| validation report | custom collector if reusable, loop if local/simple |
| stateful intermediate operation | consider Gatherer, not Collector |
| resource-backed writing | explicit try-with-resources loop |
| fail-fast validation | stream short-circuit or loop, not collector |
| parallel-safe mutable reduction | collector with correct combiner; rarely CONCURRENT |
21. Collector Design Checklist
Before publishing a custom collector, answer:
1. What is T?
2. What is A?
3. What is R?
4. Does supplier create a fresh independent A?
5. Does accumulator mutate only its A?
6. Does combiner preserve all information from both states?
7. Does combining with empty state change nothing?
8. Does arbitrary partitioning produce equivalent result?
9. Does finisher leak mutable working state?
10. Are characteristics minimal and truthful?
11. Is result order specified, unspecified, or normalized?
12. Does it work as downstream collector?
13. Does it pass sequential vs parallel equivalence tests?
14. Does it handle empty input?
15. Is custom collector simpler than an explicit loop?
Minimal characteristics rule:
When in doubt, declare fewer characteristics.
Correct but less optimized is better than incorrectly optimized.
22. Baeldung-Style Practical Summary
Use custom collectors sparingly, but do not fear them. They are excellent when a domain aggregation is reused often and has a clear result type.
Good custom collector:
fresh state + local mutation + correct combiner + safe finisher + minimal characteristics
Bad custom collector:
shared state + ignored combiner + leaked mutable result + fake CONCURRENT + accidental order dependency
Most production bugs happen because the code is tested only as a sequential stream. A collector is correct only when the same logical result can be produced after arbitrary partitioning and combining.
23. Exercises
Exercise 1 — Fix the Combiner
Given:
static Collector<String, StringBuilder, String> joiningWithPipe() {
return Collector.of(
StringBuilder::new,
(builder, value) -> {
if (!builder.isEmpty()) {
builder.append('|');
}
builder.append(value);
},
(left, right) -> {
left.append(right);
return left;
},
StringBuilder::toString
);
}
Problem:
Combiner misses delimiter between left and right.
Fix it.
Expected behavior:
Stream.of("a", "b", "c").collect(joiningWithPipe())
// "a|b|c"
Exercise 2 — Build toImmutableEnumSet
Create collector:
static <E extends Enum<E>> Collector<E, ?, Set<E>> toImmutableEnumSet(Class<E> enumType)
Requirements:
- use
EnumSet.noneOf(enumType)internally - merge with
addAll - return immutable copy or unmodifiable view with clear semantics
- handle empty input
Exercise 3 — Test All Split Points
Write a helper that compares normal sequential collection against manual two-partition collection for every possible split.
Exercise 4 — Refactor Loop to Collector
Given a loop that builds:
record ImportReport(int totalRows, int acceptedRows, List<Violation> violations) {}
Write a custom collector and prove its combiner is correct.
Exercise 5 — Decide Not to Use Collector
Find one case where an explicit loop is better than custom collector. Explain why.
24. References
- Java SE 25
CollectorAPI: https://docs.oracle.com/en/java/javase/25/docs/api/java.base/java/util/stream/Collector.html - Java SE 25
CollectorsAPI: https://docs.oracle.com/en/java/javase/25/docs/api/java.base/java/util/stream/Collectors.html - Java SE 25
StreamAPI: https://docs.oracle.com/en/java/javase/25/docs/api/java.base/java/util/stream/Stream.html - Java SE 25 Stream package summary: https://docs.oracle.com/en/java/javase/25/docs/api/java.base/java/util/stream/package-summary.html
25. What You Should Be Able to Do Now
Setelah part ini, kamu harus bisa:
- menjelaskan
T,A, danRdalam collector - menulis supplier yang fresh dan independent
- membedakan accumulator dan combiner
- membuktikan identity law
- membuktikan associativity law
- menghindari shared mutable state
- memilih characteristics secara konservatif
- membuat final result immutable secara boundary
- menguji collector dengan manual partitioning
- memutuskan kapan explicit loop lebih baik
- membaca custom collector orang lain dan menemukan bug combiner
Key invariant:
A collector is correct when sequential accumulation and arbitrary partitioned accumulation produce equivalent final results.
You just completed lesson 25 in deepen practice. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.
Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.