Learn Java Kafka In Action Part 005 Java Client Architecture
title: Learn Java Kafka in Action - Part 005 description: Deep dive into Java Kafka client architecture: producer, consumer, admin client, networking, metadata, threading, serializers, interceptors, lifecycle, and production-safe usage patterns. series: learn-java-kafka-in-action seriesTitle: Learn Java Kafka in Action order: 5 partTitle: Java Client Architecture tags:
- java
- kafka
- kafka-client
- producer
- consumer
- admin-client
- distributed-systems
- series date: 2026-07-01
Part 005 — Java Client Architecture
1. Tujuan Part Ini
Di part sebelumnya kita melihat Kafka dari sisi broker: topic, partition, replication, ISR, dan KRaft control plane. Mulai part ini kita pindah ke sisi aplikasi Java.
Tujuan part ini adalah memahami arsitektur internal Java Kafka client agar kita tidak memperlakukan KafkaProducer, KafkaConsumer, dan AdminClient sebagai black box.
Setelah part ini, kita harus bisa menjawab:
- Apa yang benar-benar terjadi ketika aplikasi Java memanggil
producer.send(record)? - Kenapa producer umumnya boleh dipakai lintas thread, sedangkan consumer tidak boleh dipakai sembarang lintas thread?
- Kenapa metadata cluster bisa stale dan bagaimana client menyegarkannya?
- Apa hubungan serializer, partitioner, accumulator, sender thread, broker connection, callback, dan future?
- Kenapa
consumer.poll()bukan sekadar mengambil data, tetapi juga menjalankan bagian penting dari protokol consumer? - Bagaimana lifecycle client yang benar dalam service produksi?
- Kapan harus memakai AdminClient dan kapan tidak?
Part ini belum membahas tuning throughput secara penuh. Itu akan masuk Part 007. Di sini kita fokus pada struktur kerja client dan invariant penggunaannya.
2. Mental Model Ringkas
Kafka Java client bukan wrapper HTTP sederhana. Kafka client adalah stateful network client yang menyimpan metadata cluster, membuka koneksi TCP ke broker, mengelola request/response, melakukan retry, mem-buffer record, dan berpartisipasi dalam protokol cluster.
Untuk consumer, modelnya berbeda.
Perbedaan kritis:
| Client | Model Dominan | Threading Rule | Risiko Utama |
|---|---|---|---|
KafkaProducer | Async buffered sender | Producer instance thread-safe | Callback blocking, buffer exhaustion, wrong close/flush |
KafkaConsumer | Poll-driven state machine | Consumer instance tidak untuk shared concurrent access | Poll starvation, rebalance bug, unsafe worker handoff |
AdminClient | Async control-plane request client | Dipakai untuk operasi administratif | Mengubah cluster dari aplikasi runtime tanpa guardrail |
3. Kaufman Deconstruction
Kita pecah skill “menggunakan Kafka Java client” menjadi sub-skill yang bisa dilatih.
| Sub-skill | Yang Harus Dikuasai | Failure Signal |
|---|---|---|
| Producer lifecycle | Create once, reuse, flush/close properly | Latency spike, buffer exhaustion, lost on shutdown |
| Producer async model | Future, callback, sender thread | Callback lambat, deadlock, unobserved send failure |
| Serialization | Key/value serializer, schema-aware serialization | Bad payload, schema mismatch, poison record |
| Partition routing | Key, partitioner, metadata | Hot partition, ordering broken |
| Consumer poll loop | Poll, process, commit, heartbeat/coordinator | Lag naik, rebalance storm, duplicate processing |
| Consumer threading | One consumer per poll thread or safe handoff | ConcurrentModification, commit race, offset disorder |
| Admin operations | Create topic, inspect configs, describe cluster | Unsafe auto-create, wrong RF/partition count |
| Observability | Client metrics, logs, callback error path | Silent failure, no lag visibility |
Di tahap ini kita belum mengejar semua konfigurasi. Kita mengejar self-correction: mampu melihat gejala dan tahu bagian client mana yang mungkin salah.
4. Java Client Dependency Baseline
Untuk aplikasi Java murni, dependency minimum biasanya:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.clients.version}</version>
</dependency>
Untuk production project, versi client harus dipilih sadar:
- Client Kafka biasanya kompatibel lintas versi broker dalam rentang tertentu, tetapi fitur baru membutuhkan broker dan client yang mendukung.
- Jangan upgrade client hanya karena library lain menarik dependency transitive.
- Lock versi client melalui dependency management.
- Jalankan integration test terhadap cluster target, bukan hanya unit test.
Contoh dependency management:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.clients.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
Dalam organisasi besar, versi client adalah keputusan platform. Jangan biarkan setiap service memilih versi sendiri tanpa compatibility matrix.
5. Producer Architecture
KafkaProducer terlihat sederhana:
producer.send(new ProducerRecord<>(topic, key, value));
Tetapi secara internal ada beberapa tahap.
5.1 Application Thread
Application thread melakukan:
- Validasi record.
- Serialization.
- Partition selection.
- Append ke buffer.
- Return
Future<RecordMetadata>.
Artinya send() bukan selalu operasi non-blocking sempurna. Ia bisa blocking jika:
- Metadata topic belum tersedia.
- Buffer penuh.
- Serializer lambat.
- Max block time tercapai.
- Interceptor melakukan pekerjaan berat.
Invariant:
Jangan menaruh serialization berat, I/O eksternal, atau logic bisnis mahal di jalur
send().
5.2 Serializer
Serializer mengubah object Java menjadi bytes. Kafka broker tidak peduli struktur object; broker menyimpan bytes.
public interface Serializer<T> extends Closeable {
byte[] serialize(String topic, T data);
}
Risiko serializer:
| Risiko | Contoh | Dampak |
|---|---|---|
| Non-deterministic serialization | Field order berubah tanpa schema | Consumer gagal membaca |
| Hidden schema change | Rename field tanpa kompatibilitas | Poison event |
| Heavy CPU serialization | JSON reflection berlebihan | Producer latency naik |
| External dependency | Serializer memanggil service lain | Send path tidak stabil |
Rule:
Serializer harus deterministic, cepat, side-effect-free, dan schema-aware untuk data kontrak antar-service.
5.3 Partitioner
Partitioner menentukan partition target ketika record tidak menetapkan partition eksplisit.
Input utama:
- Topic.
- Key bytes.
- Value bytes.
- Metadata cluster.
- Jumlah partition topic.
Key design sangat penting karena partition adalah batas ordering. Kalau semua order milik orderId=123 harus diproses berurutan, key biasanya harus mengandung orderId atau aggregate id yang setara.
Anti-pattern:
new ProducerRecord<>("order-events", UUID.randomUUID().toString(), event);
Jika key random dipakai untuk event yang sebenarnya perlu ordering per order, ordering per aggregate akan pecah.
5.4 Record Accumulator dan Buffer Pool
Producer tidak selalu mengirim satu record sebagai satu network request. Record ditampung dulu per topic-partition dalam batch.
Parameter yang terkait, tetapi akan dibahas detail di Part 007:
| Config | Fungsi |
|---|---|
buffer.memory | Total memory producer untuk buffering |
batch.size | Target ukuran batch per partition |
linger.ms | Waktu menunggu agar batch terisi |
compression.type | Compression untuk batch |
max.block.ms | Batas blocking saat metadata/buffer tidak tersedia |
Jika producer lebih cepat dari broker/network, buffer bisa penuh. Saat itu send() dapat blocking dan akhirnya gagal.
5.5 Sender Network Thread
Producer memiliki background sender thread yang:
- Mengambil batch siap kirim.
- Mengelompokkan request per broker.
- Mengirim ProduceRequest.
- Menerima response.
- Menangani retry.
- Menyelesaikan callback/future.
Implikasi penting:
- Callback dieksekusi pada jalur internal producer; jangan blocking di callback.
- Jika callback lambat, completion record lain bisa ikut tertahan.
- Jangan memanggil operasi berat, HTTP call, DB write, atau synchronous wait panjang dari callback.
Callback yang baik:
producer.send(record, (metadata, exception) -> {
if (exception != null) {
metrics.increment("kafka.producer.send.error", "topic", record.topic());
log.warn("Failed to publish event topic={} key={}", record.topic(), record.key(), exception);
return;
}
metrics.increment("kafka.producer.send.success", "topic", metadata.topic());
});
Callback yang buruk:
producer.send(record, (metadata, exception) -> {
auditRepository.save(...); // DB I/O di callback
httpClient.post(...); // External I/O di callback
anotherProducer.flush(); // Bisa memperburuk blocking
});
6. Producer Lifecycle Pattern
Producer adalah resource mahal. Jangan create-close per message.
Buruk:
public void publish(OrderEvent event) {
try (KafkaProducer<String, OrderEvent> producer = new KafkaProducer<>(props)) {
producer.send(new ProducerRecord<>("order-events", event.orderId(), event));
}
}
Masalah:
- Membuka koneksi berulang.
- Metadata fetch berulang.
- Batching tidak efektif.
- Throughput hancur.
- Shutdown path rawan kehilangan record jika tidak flush dengan benar.
Lebih baik:
public final class OrderEventPublisher implements AutoCloseable {
private final KafkaProducer<String, OrderEvent> producer;
private final String topic;
public OrderEventPublisher(Properties props, String topic) {
this.producer = new KafkaProducer<>(props);
this.topic = topic;
}
public CompletableFuture<RecordMetadata> publish(OrderEvent event) {
ProducerRecord<String, OrderEvent> record = new ProducerRecord<>(
topic,
event.orderId(),
event
);
CompletableFuture<RecordMetadata> result = new CompletableFuture<>();
producer.send(record, (metadata, exception) -> {
if (exception != null) {
result.completeExceptionally(exception);
} else {
result.complete(metadata);
}
});
return result;
}
@Override
public void close() {
producer.flush();
producer.close();
}
}
Design note:
- Satu producer per logical config profile biasanya cukup.
- Pisahkan producer untuk workload dengan SLA berbeda jika config-nya harus berbeda.
- Jangan campur critical event dan noisy telemetry event dalam producer yang sama jika backpressure telemetry bisa mengganggu critical path.
7. Consumer Architecture
Consumer tidak sama dengan producer. Consumer adalah state machine yang dikendalikan oleh poll().
while (running.get()) {
ConsumerRecords<String, OrderEvent> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, OrderEvent> record : records) {
process(record);
}
consumer.commitSync();
}
poll() bukan hanya mengambil record. Dalam model consumer, poll() juga terkait dengan:
- Network I/O.
- Fetch request.
- Group coordination.
- Partition assignment.
- Rebalance callback.
- Heartbeat/progress semantics, tergantung protocol dan versi.
- Timeout detection.
Jika aplikasi berhenti memanggil poll() terlalu lama, consumer bisa dianggap tidak sehat atau melanggar max.poll.interval.ms.
8. Consumer Threading Rule
Rule produksi:
Satu
KafkaConsumersebaiknya dimiliki oleh satu poll thread. Jangan akses instance consumer yang sama secara concurrent dari banyak thread.
Kenapa?
Consumer menyimpan state internal:
- Subscription.
- Assignment partition.
- Position offset.
- In-flight fetch.
- Pending commit.
- Rebalance state.
- Coordinator connection.
Concurrent access tanpa koordinasi bisa merusak asumsi lifecycle.
8.1 Pattern A — One Consumer per Thread
Cocok untuk:
- Processing relatif cepat.
- Satu record diproses inline.
- Commit mudah dijaga.
- Throughput ditingkatkan dengan menambah consumer instance sampai batas partition.
8.2 Pattern B — Poll Thread + Worker Pool
Cocok untuk:
- Processing mahal.
- I/O downstream lambat.
- Perlu parallelism lebih besar dari jumlah partition.
Tetapi pattern ini berbahaya jika offset commit tidak didesain benar.
Contoh bug:
Partition 0 records: offset 10, 11, 12
Worker finishes: 12 first, then 10, while 11 still running
Consumer commits offset 13
Process crashes
Offset 11 hilang secara logical karena sudah dilewati commit
Rule:
Jika processing parallel per partition, commit hanya boleh maju sampai offset contiguous yang sudah selesai.
8.3 Pause/Resume untuk Backpressure
Consumer bisa mengambil data lebih cepat daripada worker menyelesaikan pekerjaan. Gunakan bounded queue dan pause()/resume().
if (workQueue.remainingCapacity() < LOW_WATERMARK) {
consumer.pause(consumer.assignment());
}
if (workQueue.remainingCapacity() > HIGH_WATERMARK) {
consumer.resume(consumer.assignment());
}
Namun poll tetap harus dipanggil agar consumer tetap hidup.
while (running.get()) {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(200));
dispatch(records);
maybePauseOrResume();
maybeCommitCompletedOffsets();
}
9. Consumer Lifecycle
Lifecycle aman:
public final class OrderEventConsumer implements Runnable, AutoCloseable {
private final KafkaConsumer<String, OrderEvent> consumer;
private final AtomicBoolean running = new AtomicBoolean(true);
public OrderEventConsumer(Properties props, String topic) {
this.consumer = new KafkaConsumer<>(props);
this.consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitCurrentProgress();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("Assigned partitions {}", partitions);
}
});
}
@Override
public void run() {
try {
while (running.get()) {
ConsumerRecords<String, OrderEvent> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, OrderEvent> record : records) {
process(record);
}
consumer.commitSync();
}
} catch (WakeupException e) {
if (running.get()) {
throw e;
}
} finally {
try {
commitCurrentProgress();
} finally {
consumer.close();
}
}
}
@Override
public void close() {
running.set(false);
consumer.wakeup();
}
private void process(ConsumerRecord<String, OrderEvent> record) {
// domain processing here
}
private void commitCurrentProgress() {
try {
consumer.commitSync();
} catch (Exception e) {
log.warn("Failed to commit offsets during shutdown/rebalance", e);
}
}
}
Key points:
- Shutdown dari thread lain dilakukan dengan
wakeup(). - Jangan langsung
close()dari thread lain saat poll loop aktif tanpa koordinasi. - Commit saat partition revoked mengurangi duplicate setelah rebalance.
- Processing harus punya idempotency karena commit bisa gagal setelah side effect berhasil.
10. Metadata Lifecycle
Kafka client tidak selalu tahu cluster penuh sejak awal. bootstrap.servers hanya titik masuk.
bootstrap.servers bukan daftar semua broker wajib, tetapi harus cukup untuk menemukan cluster.
Risiko:
| Masalah | Gejala | Penyebab Umum |
|---|---|---|
| Wrong advertised listener | Client connect gagal ke broker leader | Broker mengiklankan host yang tidak bisa dijangkau client |
| Metadata stale | LEADER_NOT_AVAILABLE, retry | Topic baru, leader pindah, broker restart |
| Auto-create topic tidak sengaja | Topic muncul dengan default buruk | Producer kirim ke typo topic |
| Topic authorization denied | Send/fetch gagal | ACL tidak lengkap |
Rule platform:
Untuk production, topic penting sebaiknya dibuat eksplisit melalui IaC/Admin pipeline, bukan mengandalkan auto-create dari producer.
11. AdminClient Architecture
AdminClient digunakan untuk operasi administratif:
- Create topic.
- Describe topic.
- Describe cluster.
- Alter configs.
- List offsets.
- Inspect consumer groups.
- Manage ACL, tergantung environment.
Contoh create topic eksplisit:
try (AdminClient admin = AdminClient.create(adminProps)) {
NewTopic topic = new NewTopic("order-events", 12, (short) 3)
.configs(Map.of(
"cleanup.policy", "delete",
"retention.ms", String.valueOf(Duration.ofDays(7).toMillis())
));
admin.createTopics(List.of(topic)).all().get(30, TimeUnit.SECONDS);
}
Namun jangan jadikan setiap microservice bebas membuat topic sendiri saat startup tanpa kontrol.
Masalah umum:
- Race condition antar service.
- Config topic tidak konsisten.
- Partition count berubah tanpa impact analysis.
- Retention salah.
- ACL belum siap.
Lebih baik:
Topic Lifecycle:
Architecture Decision -> Schema Review -> IaC Change -> Platform Approval -> Deployment -> Runtime Validation
AdminClient boleh digunakan aplikasi untuk introspection ringan atau health readiness tertentu, tetapi perubahan cluster harus dijaga guardrail.
12. Interceptor
Producer dan consumer mendukung interceptor.
Use case:
- Inject tracing header.
- Metrics tambahan.
- Audit metadata.
- Header normalization.
Jangan gunakan interceptor untuk:
- Logic bisnis utama.
- External I/O blocking.
- Mutasi payload kompleks.
- Authorization custom yang seharusnya ada di broker/platform.
Contoh producer interceptor concept:
public class TraceProducerInterceptor implements ProducerInterceptor<String, OrderEvent> {
@Override
public ProducerRecord<String, OrderEvent> onSend(ProducerRecord<String, OrderEvent> record) {
Headers headers = record.headers();
headers.add("trace-id", currentTraceId().getBytes(StandardCharsets.UTF_8));
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// metrics only, no blocking I/O
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
13. Header Design in Client Layer
Kafka record punya key, value, timestamp, dan headers.
Header cocok untuk metadata teknis:
| Header | Fungsi |
|---|---|
trace-id | Distributed tracing |
correlation-id | Menghubungkan command/event/reply |
causation-id | Event penyebab langsung |
event-id | Deduplication/idempotency |
schema-version | Debugging/compatibility aid |
tenant-id | Routing/security context, jika sesuai |
Jangan menaruh semua data bisnis di header. Header bukan pengganti schema event.
14. Error Surfaces di Java Client
Error producer bisa muncul di beberapa tempat:
| Lokasi | Contoh | Cara Tangani |
|---|---|---|
| Constructor | Config invalid, serializer class tidak ada | Fail fast startup |
send() immediate | Serialization error, buffer timeout, metadata timeout | Return failure ke caller |
| Callback/future | Broker reject, timeout, authorization, unknown topic | Metrics + retry policy higher layer |
flush()/close() | Pending send gagal | Shutdown logging dan alert |
Error consumer:
| Lokasi | Contoh | Cara Tangani |
|---|---|---|
poll() | Deserialization error, authorization, wakeup | Classify retryable/non-retryable |
| Processing | DB failure, validation error | Retry/DLQ/idempotency |
| Commit | Commit failed, rebalance race | Retry or accept duplicate possibility |
| Rebalance listener | Commit during revoke fails | Log, metrics, idempotent processing |
Important:
Kafka client retry menyelesaikan sebagian problem network/protocol. Ia tidak menyelesaikan correctness bisnis.
15. Deserialization Boundary
Consumer deserialization sering diremehkan. Jika deserializer melempar exception di poll(), aplikasi bahkan belum mendapatkan ConsumerRecord untuk dikirim ke DLQ secara mudah.
Pattern yang lebih aman untuk sistem besar:
- Consume sebagai bytes atau schema-aware envelope.
- Tangani deserialization dalam layer aplikasi yang bisa menghasilkan DLQ record lengkap.
- Simpan original bytes/header untuk forensic jika policy memungkinkan.
- Pisahkan invalid schema, invalid domain, dan downstream failure.
Contoh wrapper:
public sealed interface DecodeResult<T> permits DecodeResult.Valid, DecodeResult.Invalid {
record Valid<T>(T value) implements DecodeResult<T> {}
record Invalid<T>(byte[] raw, String reason, Exception cause) implements DecodeResult<T> {}
}
16. Client Config Baseline
Producer baseline untuk event bisnis penting:
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092
client.id=order-service-producer
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.example.kafka.OrderEventSerializer
acks=all
enable.idempotence=true
retries=2147483647
delivery.timeout.ms=120000
request.timeout.ms=30000
linger.ms=5
batch.size=32768
compression.type=zstd
max.in.flight.requests.per.connection=5
Consumer baseline:
bootstrap.servers=kafka-1:9092,kafka-2:9092,kafka-3:9092
client.id=order-service-consumer-1
group.id=order-projection-service
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=com.example.kafka.OrderEventDeserializer
enable.auto.commit=false
auto.offset.reset=earliest
max.poll.records=500
max.poll.interval.ms=300000
fetch.min.bytes=1
isolation.level=read_committed
Catatan:
isolation.level=read_committedrelevan jika membaca topic dengan transactional producer.auto.offset.reset=earliestbukan selalu benar; untuk service baru yang harus membangun state dari awal, benar. Untuk alert consumer non-critical, mungkinlatestlebih cocok.client.idharus informatif agar metrics/log broker bisa ditelusuri.
17. Client Identity
Gunakan client.id dan group.id sebagai bagian observability.
Contoh naming:
client.id = <service-name>-<role>-<instance-id>
group.id = <service-name>-<consumer-purpose>
Contoh:
client.id = order-service-producer-pod-7c9d
group.id = order-projection-service
Jangan gunakan group id generik:
group.id = consumer-group
client.id = app
Dalam incident, nama generik memperlambat diagnosis.
18. Health Check dan Readiness
Health check Kafka client sering salah.
Bad readiness:
return producer != null;
Lebih baik readiness memeriksa hal yang relevan:
- Bisa resolve metadata topic wajib.
- Producer tidak sedang stuck buffer exhaustion.
- Consumer assignment sudah siap jika service harus consume.
- Authorization topic tersedia.
- Schema registry tersedia jika serializer butuh.
Namun jangan membuat health check terlalu mahal sampai menambah beban control plane.
AdminClient readiness example:
public boolean canDescribeRequiredTopic(AdminClient admin, String topic) {
try {
admin.describeTopics(List.of(topic))
.allTopicNames()
.get(3, TimeUnit.SECONDS);
return true;
} catch (Exception e) {
return false;
}
}
19. Logging Strategy
Log client harus membantu incident, bukan membanjiri.
Log saat producer send gagal:
level=WARN
message="Kafka publish failed"
topic=order-events
key=ORD-123
clientId=order-service-producer-pod-7c9d
exceptionClass=TimeoutException
Jangan log full payload untuk event sensitif. Untuk regulatory/audit system, payload bisa mengandung data pribadi atau rahasia enforcement.
Gunakan:
- Event id.
- Aggregate id.
- Topic.
- Partition.
- Offset, untuk consumer.
- Schema id/version.
- Trace id.
20. Metrics yang Wajib Dipahami
Producer metrics:
| Metric Concept | Arti |
|---|---|
| Record send rate | Laju record keluar |
| Record error rate | Gagal publish |
| Request latency | Waktu request ke broker |
| Batch size average | Efektivitas batching |
| Buffer available bytes | Sisa buffer |
| Record retry rate | Retry broker/network |
Consumer metrics:
| Metric Concept | Arti |
|---|---|
| Records consumed rate | Laju konsumsi |
| Fetch latency | Waktu fetch |
| Commit latency | Waktu commit |
| Assigned partitions | Jumlah partition assigned |
| Consumer lag | Jarak offset end vs committed/processed |
| Rebalance count/time | Stabilitas group |
Rule:
Jangan deploy Kafka client tanpa dashboard minimal untuk error rate, latency, lag, retry, dan rebalance.
21. Common Architecture Mistakes
21.1 Create Producer per Request
Gejala:
- Latency tinggi.
- Connection churn.
- Throughput rendah.
- Broker log noisy.
Fix:
- Reuse producer.
- Close saat service shutdown.
- Pisahkan producer hanya jika config/SLA berbeda.
21.2 Blocking Callback
Gejala:
- Send completion lambat.
- Memory naik.
- Throughput drop.
Fix:
- Callback hanya metrics/log ringan.
- Kalau perlu side effect, kirim ke executor terpisah dengan bounded queue.
21.3 Shared Consumer Across Threads
Gejala:
- Race condition.
- Commit offset salah.
- Rebalance weird.
Fix:
- One consumer per thread.
- Poll thread + worker pool dengan completion tracker.
- Gunakan
wakeup()untuk shutdown.
21.4 Auto-Commit untuk Side Effect Non-Idempotent
Gejala:
- Data hilang secara logical.
- Offset maju sebelum DB write selesai.
Fix:
- Disable auto commit.
- Commit setelah side effect berhasil.
- Tambahkan idempotency di downstream.
21.5 Topic Creation di Runtime Tanpa Governance
Gejala:
- Topic typo.
- Retention default salah.
- RF salah.
- Partition count salah.
Fix:
- Topic IaC.
- Naming convention.
- Schema and ACL review.
22. Reference Mini-Architecture
Client architecture decision:
- Producer dipakai oleh publisher component, bukan langsung tersebar di semua domain service.
- Consumer punya poll loop sendiri.
- Serialization melalui schema-aware layer.
- Observability dari client diekspos sebagai first-class signal.
- Topic lifecycle tidak dibuat ad hoc dari business code.
23. Practice: Build a Client Skeleton
23.1 Task
Buat mini project dengan tiga class:
KafkaProducerFactoryOrderEventPublisherOrderEventConsumerRunner
Target:
- Producer reusable.
- Callback mengubah
FuturemenjadiCompletableFuture. - Consumer memakai manual commit.
- Shutdown memakai
wakeup(). - Semua record log memakai topic/key/partition/offset.
23.2 Acceptance Criteria
Producer:
- Tidak create-close per message.
client.idjelas.- Send failure tidak silent.
flush()saat shutdown.
Consumer:
enable.auto.commit=false.- Commit setelah processing.
WakeupExceptionditangani benar.- Tidak ada shared consumer concurrent access.
Observability:
- Counter success/failure publish.
- Counter records processed.
- Timer processing latency.
- Gauge assigned partitions.
24. Design Review Checklist
Gunakan checklist ini saat mereview service Java Kafka.
Producer
- Apakah producer instance direuse?
- Apakah callback tidak blocking?
- Apakah send failure diamati?
- Apakah serializer deterministic dan schema-aware?
- Apakah key sesuai ordering boundary?
- Apakah config critical producer memakai
acks=alldan idempotence jika perlu? - Apakah shutdown melakukan flush/close?
Consumer
- Apakah satu consumer hanya dimiliki satu poll thread?
- Apakah auto commit disabled untuk processing dengan side effect?
- Apakah commit dilakukan setelah processing berhasil?
- Apakah rebalance listener mengamankan progress?
- Apakah processing idempotent?
- Apakah poison event strategy jelas?
- Apakah lag dan rebalance termonitor?
Admin/Platform
- Apakah topic dibuat melalui governance/IaC?
- Apakah AdminClient runtime tidak mengubah cluster tanpa guardrail?
- Apakah
client.iddangroup.idpunya naming convention? - Apakah ACL dan schema dependency dicek sebelum production?
25. Interview-Level Questions
- Mengapa
KafkaProducer.send()bisa blocking walaupun API-nya asynchronous? - Kenapa
KafkaProducerbisa direuse lintas thread, tetapiKafkaConsumertidak boleh dipakai sembarang lintas thread? - Apa yang dilakukan
consumer.poll()selain mengambil record? - Bagaimana cara shutdown consumer dari thread lain dengan aman?
- Kenapa callback producer tidak boleh melakukan DB call?
- Apa beda
client.iddangroup.id? - Kenapa topic auto-create berbahaya di production?
- Bagaimana metadata broker didapat dari
bootstrap.servers? - Di mana deserialization error sebaiknya ditangani?
- Bagaimana desain poll thread + worker pool agar commit offset tidak melewati record yang belum selesai?
26. Ringkasan
Java Kafka client adalah stateful distributed-system client, bukan library I/O sederhana.
Producer bekerja dengan model async buffered sender: application thread melakukan serialization dan append ke accumulator; sender thread mengirim batch ke broker dan menyelesaikan callback. Karena itu producer cocok direuse, tetapi callback harus ringan.
Consumer bekerja dengan poll-driven state machine. poll() menggerakkan fetch, group coordination, assignment, dan progress. Karena consumer menyimpan banyak state internal, satu instance consumer harus dimiliki satu poll thread atau disinkronisasi sangat hati-hati. Untuk sistem produksi, pattern paling aman adalah one consumer per thread atau poll thread dengan worker pool yang punya offset completion tracker.
AdminClient berguna untuk control-plane operation, tetapi perubahan cluster harus dikelola dengan governance. Aplikasi bisnis sebaiknya tidak bebas membuat atau mengubah topic produksi tanpa review.
Part berikutnya akan masuk ke Producer Reliability and Delivery Semantics: acks, retries, idempotence, timeout, ordering, duplicates, transactional producer, dan producer fencing.
27. Referensi
- Apache Kafka Documentation — Java client, producer, consumer, configuration.
- Apache Kafka Producer Configs.
- Apache Kafka Consumer Configs.
- Apache Kafka Javadocs —
KafkaProducer,KafkaConsumer. - Confluent Documentation — Java client overview, delivery semantics, producer/consumer configuration.
You just completed lesson 05 in start here. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.
Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.