Java RabbitMQ Client Architecture: Connection, Channel, Threading, Lifecycle
Learn Java RabbitMQ, RabbitMQ Streams, Patterns, and Deployment In Action - Part 004
Production-grade RabbitMQ Java client architecture: ConnectionFactory, Connection, Channel, threading, lifecycle, recovery, heartbeat, blocked connection handling, and safe shutdown.
Part 004 — Java RabbitMQ Client Architecture: Connection, Channel, Threading, Lifecycle
1. Tujuan Part Ini
Part ini membahas arsitektur Java client RabbitMQ untuk production service. Kita belum fokus ke pattern bisnis seperti retry, DLQ, outbox, atau stream processing. Fokus kita adalah pertanyaan dasar yang sering menyebabkan bug serius:
- berapa banyak connection yang harus dibuat;
- kapan membuat channel baru;
- apakah channel boleh dipakai lintas thread;
- bagaimana mengelola producer dan consumer lifecycle;
- bagaimana connection recovery bekerja;
- bagaimana heartbeat, network failure, dan blocked connection memengaruhi service;
- bagaimana shutdown yang benar agar tidak kehilangan pesan atau meninggalkan state ambigu.
Dalam kerangka Kaufman, part ini adalah fase remove practice barriers. Kita membangun client architecture yang stabil agar latihan berikutnya—publisher confirms, manual ack, retry, stream, benchmarking—tidak kacau karena lifecycle client salah.
2. Mental Model Java Client
RabbitMQ Java client mengekspos konsep utama AMQP 0-9-1:
Hierarki praktis:
| Object | Makna | Expensive? | Threading guidance |
|---|---|---|---|
ConnectionFactory | Factory/config holder | Tidak mahal | Bisa singleton/config bean |
Connection | TCP connection ke broker | Relatif mahal | Biasanya sedikit per app instance |
Channel | Virtual AMQP session di atas connection | Lebih murah dari connection | Jangan share sembarang lintas thread |
Consumer | Callback/subscription di channel | Terkait channel | Lifecycle harus eksplisit |
Rule dasar:
Connection adalah transport. Channel adalah session/protocol lane. Producer/consumer operations berjalan di channel. Jangan desain seolah channel adalah stateless utility object.
3. Dependency dan Versi Client
Maven dependency umum:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.26.0</version>
</dependency>
Dalam project real, jangan hardcode versi di modul service. Pakai dependency management/BOM internal agar:
- patch security bisa dinaikkan terpusat;
- behavior recovery/metrics konsisten;
- integration test matrix jelas;
- library wrapper bisa dikembangkan tanpa dependency drift.
4. ConnectionFactory: Konfigurasi Transport dan Auth
ConnectionFactory menyimpan konfigurasi broker connection.
Contoh:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("rabbitmq.internal");
factory.setPort(5672);
factory.setVirtualHost("/commerce");
factory.setUsername("order-service");
factory.setPassword(System.getenv("RABBITMQ_PASSWORD"));
factory.setConnectionTimeout(10_000);
factory.setRequestedHeartbeat(30);
factory.setAutomaticRecoveryEnabled(true);
factory.setTopologyRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5_000);
Catatan:
- credentials harus dari secret manager, bukan config file plain;
- virtual host adalah isolation boundary;
- heartbeat harus disesuaikan dengan network dan load balancer;
- automatic recovery membantu, tetapi bukan pengganti idempotency;
- topology recovery berguna, tetapi service tetap harus punya startup validation.
4.1 URI-based configuration
factory.setUri("amqps://order-service:${password}@rabbitmq.internal:5671/%2Fcommerce");
Pastikan vhost / di-encode sebagai %2F jika memakai URI.
4.2 TLS
Untuk production, sering memakai TLS:
factory.useSslProtocol();
Dalam environment regulated, TLS bukan checklist kosmetik. Pertanyaan yang harus dijawab:
- siapa CA yang dipercaya;
- bagaimana certificate rotation dilakukan;
- apakah mutual TLS dipakai;
- apakah hostname verification aktif;
- bagaimana secret/cert disuntikkan ke container;
- apa runbook saat certificate expired.
Security detail akan dibahas di Part 034.
5. Connection: TCP Transport dan Failure Boundary
Connection mewakili koneksi TCP ke broker. Satu connection bisa multiplex banyak channel.
5.1 Berapa connection per service instance?
Default yang masuk akal:
- satu connection untuk publisher;
- satu connection untuk consumers;
- optional satu connection untuk admin/topology/health;
- optional dedicated connection untuk high-throughput publisher.
Mengapa tidak satu connection per publish?
- TCP handshake mahal;
- authentication mahal;
- broker connection count meningkat;
- heartbeat/thread/resource overhead naik;
- throughput buruk.
Mengapa tidak selalu satu connection untuk semua hal?
- publisher yang blocked bisa memengaruhi consumer jika sharing transport;
- high-volume publish bisa mengganggu latency consumer;
- operational isolation lebih rendah;
- shutdown producer dan consumer punya lifecycle berbeda.
Practical pattern:
5.2 Connection naming
Berikan client-provided connection name agar terlihat di RabbitMQ management UI:
Connection connection = factory.newConnection("order-service.publisher.us-east-1.instance-7");
Nama connection memudahkan incident response:
- connection mana yang blocked;
- service mana yang membuka terlalu banyak channel;
- instance mana yang tidak reconnect;
- workload mana yang publish rate-nya naik.
6. Channel: AMQP Session, Bukan Thread-Safe Utility
Channel adalah virtual session di atas connection. Banyak AMQP operation dilakukan di channel:
exchangeDeclare;queueDeclare;queueBind;basicPublish;basicConsume;basicAck;basicNack;confirmSelect;basicQos.
6.1 Channel lifecycle
try (Connection connection = factory.newConnection("demo")) {
try (Channel channel = connection.createChannel()) {
channel.exchangeDeclare("ex.demo", BuiltinExchangeType.DIRECT, true);
channel.basicPublish("ex.demo", "demo.key", null, "hello".getBytes(StandardCharsets.UTF_8));
}
}
Ini baik untuk demo, tetapi production service tidak membuka/menutup connection dan channel per message.
6.2 Channel per thread / channel per worker
Guideline production:
- jangan share channel untuk concurrent publish dari banyak thread tanpa serialisasi eksplisit;
- gunakan channel per publisher thread atau channel pool;
- consumer channel biasanya dedicated untuk subscription tersebut;
- ack sebaiknya dilakukan di channel yang menerima delivery atau melalui desain yang menjamin serialization.
Kenapa?
AMQP memakai frames. Concurrent operation di channel yang sama bisa menyebabkan frame interleaving dan efek samping dengan publisher confirms. Selain itu, delivery tags scoped per channel; ack dengan channel yang salah adalah bug.
6.3 Delivery tag scoped to channel
Delivery tag bukan global message id. Delivery tag hanya bermakna pada channel tempat delivery diterima.
Buruk:
// Message delivered on consumerChannelA
long tag = delivery.getEnvelope().getDeliveryTag();
// Later, wrong channel used
publisherChannel.basicAck(tag, false); // wrong model
Benar:
consumerChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
Jika processing dipindah ke worker thread, desain ack path harus tetap aman.
7. Threading Model: Consumer Callback dan Worker Pool
RabbitMQ Java client menerima frames dari broker dan menjalankan consumer callback melalui executor yang terkait connection/client.
Production consumer tidak boleh melakukan pekerjaan berat tanpa batas di callback thread jika itu menghambat delivery handling. Gunakan worker pool dengan bounded queue.
7.1 Simple callback consumer
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
process(delivery);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
};
channel.basicQos(50);
channel.basicConsume("q.order.commands", false, deliverCallback, consumerTag -> {});
Ini cukup untuk processing ringan. Untuk processing berat, blocking I/O, atau external call, lebih baik memisahkan delivery reception dan processing.
7.2 Worker pool with bounded executor
public final class RabbitConsumerWorker implements AutoCloseable {
private final Channel channel;
private final ExecutorService workers;
private final Semaphore inFlight;
public RabbitConsumerWorker(Channel channel, int maxInFlight, int workerThreads) {
this.channel = channel;
this.workers = Executors.newFixedThreadPool(workerThreads);
this.inFlight = new Semaphore(maxInFlight);
}
public DeliverCallback callback() {
return (consumerTag, delivery) -> {
if (!inFlight.tryAcquire()) {
// This should rarely happen if prefetch is aligned with maxInFlight.
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
return;
}
workers.submit(() -> {
try {
process(delivery);
synchronized (channel) {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
} catch (RetryableException e) {
synchronized (channel) {
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
} catch (Exception e) {
synchronized (channel) {
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
}
} finally {
inFlight.release();
}
});
};
}
private void process(Delivery delivery) {
// business logic here
}
@Override
public void close() {
workers.shutdown();
}
}
Catatan penting:
- contoh ini memakai
synchronized(channel)untuk mencegah concurrent ack/nack di channel yang sama; - desain yang lebih bersih adalah satu consumer channel per worker, atau dedicated ack dispatcher thread;
prefetchharus selaras dengan worker capacity;- bounded executor lebih aman daripada unbounded queue.
7.3 Ack dispatcher pattern
Untuk throughput dan safety lebih baik, pakai single-thread ack dispatcher per channel:
Dengan ini:
- worker tidak memanggil channel langsung;
- ack/nack serialized;
- delivery tag tetap diproses di channel yang benar;
- shutdown lebih mudah dikontrol.
8. Publisher Architecture
Producer production-grade biasanya membutuhkan:
- long-lived connection;
- channel pool atau channel per publishing thread;
- publisher confirms;
- bounded local queue;
- timeout handling;
- returned-message listener untuk mandatory publish;
- metrics.
8.1 Naive publisher
public void publish(byte[] body) throws IOException {
Channel channel = connection.createChannel();
channel.basicPublish("ex.order.events", "commerce.order.created", null, body);
channel.close();
}
Masalah:
- create/close channel per message mahal;
- tidak ada publisher confirm;
- tidak ada mandatory handling;
- tidak ada backpressure;
- error semantics lemah.
8.2 Better publisher skeleton
public final class ConfirmingPublisher implements AutoCloseable {
private final Channel channel;
public ConfirmingPublisher(Connection connection) throws IOException {
this.channel = connection.createChannel();
this.channel.confirmSelect();
this.channel.addReturnListener(returned -> {
// emit metric + persist failure + alert if critical
System.err.printf("Unroutable message: exchange=%s routingKey=%s replyText=%s%n",
returned.getExchange(),
returned.getRoutingKey(),
returned.getReplyText());
});
}
public void publish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
throws IOException, InterruptedException, TimeoutException {
channel.basicPublish(exchange, routingKey, true, props, body);
boolean confirmed = channel.waitForConfirms(5_000);
if (!confirmed) {
throw new IOException("Publish was not confirmed within timeout");
}
}
@Override
public void close() throws Exception {
channel.close();
}
}
Ini masih sederhana. Di throughput tinggi, waitForConfirms per message terlalu lambat. Kita akan bahas async confirms dan batching di Part 005 dan Part 026.
9. Consumer Architecture
Consumer production-grade butuh:
- dedicated channel;
- manual ack;
- configured prefetch;
- bounded worker capacity;
- idempotent processing;
- explicit nack/reject strategy;
- cancellation callback;
- shutdown drain;
- metrics.
9.1 Consumer skeleton
public final class CommandConsumer implements AutoCloseable {
private final Channel channel;
private final String queue;
private String consumerTag;
public CommandConsumer(Connection connection, String queue, int prefetch) throws IOException {
this.channel = connection.createChannel();
this.queue = queue;
this.channel.basicQos(prefetch);
}
public void start() throws IOException {
DeliverCallback onDelivery = (tag, delivery) -> {
long deliveryTag = delivery.getEnvelope().getDeliveryTag();
try {
handle(delivery);
channel.basicAck(deliveryTag, false);
} catch (RetryableException e) {
channel.basicNack(deliveryTag, false, true);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false);
}
};
CancelCallback onCancel = tag -> {
// emit metric and trigger service health degradation if needed
};
this.consumerTag = channel.basicConsume(queue, false, onDelivery, onCancel);
}
private void handle(Delivery delivery) {
// parse, validate, apply idempotency, execute business transaction
}
@Override
public void close() throws Exception {
if (consumerTag != null && channel.isOpen()) {
channel.basicCancel(consumerTag);
}
channel.close();
}
}
9.2 The consumer invariant
Ack only after business side effect is safely committed.
If message processing writes to database, order biasanya:
- receive delivery;
- validate message;
- check idempotency;
- execute business transaction;
- commit transaction;
- ack delivery.
Jika service crash setelah commit sebelum ack, message bisa redelivered. Karena itu idempotency wajib.
10. Prefetch as Capacity Contract
basicQos(prefetch) membatasi jumlah unacknowledged deliveries yang boleh outstanding.
Prefetch bukan sekadar performance tuning. Prefetch adalah contract antara broker dan consumer:
“Consumer ini sanggup memegang N message yang belum di-ack.”
10.1 Example
Jika:
- worker threads = 10;
- average processing time = 200 ms;
- p95 processing time = 2 s;
- external dependency kadang lambat;
prefetch 1000 buruk karena satu instance bisa menahan 1000 message yang belum selesai. Saat instance mati, 1000 message redelivered. Saat external dependency lambat, broker terus mengirim terlalu banyak message.
Prefetch awal lebih masuk akal:
prefetch = workerThreads * smallMultiplier
= 10 * 2
= 20
Lalu ukur throughput, latency, redelivery, memory, dan queue depth.
10.2 Prefetch and fairness
Prefetch terlalu tinggi bisa membuat satu consumer mengambil terlalu banyak message, sehingga consumer lain idle. Prefetch terlalu rendah bisa mengurangi throughput karena round-trip dan idle worker.
Rule:
- CPU-bound work: prefetch mendekati worker count;
- I/O-bound work: prefetch bisa lebih tinggi, tetapi bounded;
- strict ordering: prefetch 1 atau single active consumer;
- high memory payload: prefetch rendah;
- slow external dependency: prefetch rendah + circuit breaker.
11. Heartbeat dan Network Failure
Heartbeat membantu client dan broker mendeteksi broken TCP connection yang tidak cleanly closed.
Jika heartbeat terlalu pendek:
- false positive saat GC pause atau network jitter;
- connection sering terputus;
- recovery storm.
Jika heartbeat terlalu panjang:
- failure detection lambat;
- message stuck lebih lama;
- failover lambat.
Common starting point: 30 sampai 60 detik, lalu validasi dengan environment nyata.
11.1 Load balancer concern
Jika connection melewati load balancer/firewall/NAT:
- idle timeout harus lebih besar dari heartbeat behavior;
- TCP keepalive bisa relevan;
- connection draining saat deploy harus diuji;
- broker/client logs harus dikorelasikan.
12. Automatic Recovery
RabbitMQ Java client mendukung automatic connection recovery. Saat connection gagal, client mencoba reconnect. Jika topology recovery enabled, client juga mencoba memulihkan channel, exchanges, queues, bindings, dan consumers yang pernah dideklarasikan melalui connection tersebut.
12.1 What recovery does not solve
Automatic recovery does not make publish exactly-once. It does not remove duplicates. It does not guarantee external side effects are atomic with ack. It does not know business idempotency.
Failure cases still exist:
| Failure | What can happen | Required design |
|---|---|---|
| Connection lost after publish before confirm | Producer uncertain | confirm tracking + idempotent message id |
| Consumer commits DB then connection lost before ack | Message redelivered | idempotent consumer |
| Topology changed externally during outage | Recovery mismatch | topology governance + startup validation |
| Broker blocked due to resource alarm | Publish stalls/fails | blocked listener + backpressure |
12.2 Recovery listener
Add recovery listeners for observability:
Connection connection = factory.newConnection("order-service.consumer");
if (connection instanceof Recoverable recoverable) {
recoverable.addRecoveryListener(new RecoveryListener() {
@Override
public void handleRecoveryStarted(Recoverable recoverable) {
// metric: rabbitmq.recovery.started
}
@Override
public void handleRecovery(Recoverable recoverable) {
// metric: rabbitmq.recovery.completed
}
});
}
13. Topology Recovery: Helpful but Dangerous if Misunderstood
Topology recovery can redeclare topology after reconnect. This is convenient, but it can hide governance problems.
13.1 Good use
- local/dev environment;
- service-owned queue and binding;
- temporary queue;
- consumers that self-declare private topology.
13.2 Risky use
- shared exchange managed by platform;
- topology arguments changed by migration;
- service has permissions to declare too much;
- multiple versions of service declare incompatible queue args.
Example risk:
Service v1 declares:
queueDeclare("q.billing.order-events", true, false, false, Map.of(
"x-queue-type", "classic"
));
Service v2 declares:
queueDeclare("q.billing.order-events", true, false, false, Map.of(
"x-queue-type", "quorum"
));
This mismatch can fail declaration and disrupt startup/recovery.
Rule:
Use topology recovery intentionally. For shared production topology, prefer GitOps/operator-managed topology or strict startup validation.
14. Blocked Connection Handling
RabbitMQ can block publishers when broker resources such as memory or disk are under pressure. Java client can register BlockedListener.
connection.addBlockedListener(new BlockedListener() {
@Override
public void handleBlocked(String reason) {
// stop accepting new publish requests, trip readiness, emit alert
System.err.println("RabbitMQ connection blocked: " + reason);
}
@Override
public void handleUnblocked() {
// resume publish cautiously
System.out.println("RabbitMQ connection unblocked");
}
});
Production behavior:
- pause or shed new publish workload;
- return 503 for synchronous API if command publish is required;
- buffer only within strict bounded limit;
- alert on sustained blocked connection;
- do not build unbounded in-memory publish queue.
14.1 Separate publisher and consumer connections
If publisher connection is blocked, you often do not want consumer lifecycle coupled to it. Separate connections help isolate failure domains.
15. Resource Ownership in a Java Service
A production service should have explicit ownership for RabbitMQ resources.
Suggested modules:
| Component | Responsibility |
|---|---|
RabbitConnectionFactoryProvider | Build configured factory |
RabbitConnectionManager | Own long-lived connections |
TopologyValidator | Passive declare / validate required topology |
PublisherManager | Own publishing channels and confirm strategy |
ConsumerManager | Start/stop consumers |
AckDispatcher | Serialize ack/nack if worker pool used |
RabbitHealthIndicator | Connection/channel/topology health |
RabbitMetricsBinder | Publish/consume/recovery metrics |
16. Startup Lifecycle
Recommended startup order:
Why topology before consumers?
- fail fast if dependency missing;
- avoid consumer starting on wrong queue;
- avoid publish path becoming healthy when route invalid;
- make readiness meaningful.
16.1 Readiness vs liveness
| Signal | Meaning | RabbitMQ relationship |
|---|---|---|
| Liveness | Process should be restarted if false | JVM deadlock, fatal stuck state |
| Readiness | Service can handle traffic | RabbitMQ publish/consume dependency healthy |
Do not make liveness fail just because RabbitMQ is temporarily down unless the process cannot recover. Prefer readiness degradation.
17. Shutdown Lifecycle
Shutdown must prevent new work, drain in-flight work, and close channels cleanly.
Recommended order:
17.1 Consumer shutdown decision
For each in-flight message:
| State | Shutdown action |
|---|---|
| Not started | Let broker redeliver after cancel/close |
| Processing not committed | Nack/requeue or close channel to redeliver |
| Business commit done | Ack if possible; rely on idempotency if not |
| Poison classified | Nack requeue=false to DLQ |
17.2 Avoid this
System.exit(0); // without cancelling consumers, flushing confirms, or closing channels
Abrupt shutdown is sometimes unavoidable, but your normal deployment path should not create duplicate storms or ambiguous publish state.
18. Health Checks
A RabbitMQ health check should not simply be “connection object is not null”.
Better levels:
18.1 Basic connectivity
- connection open;
- channel can be created;
- heartbeat not failed.
18.2 Topology dependency
- required exchange exists;
- required queue exists;
- service has permissions;
- passive declare succeeds.
18.3 Functional publish path
For critical publish-only service, optionally publish synthetic message to a health exchange/queue. Do not publish fake business messages.
18.4 Consumer health
- consumer tag active;
- channel open;
- recent deliveries or queue idle expected;
- in-flight below threshold;
- redelivery/DLQ not exploding.
19. Metrics
At minimum, instrument client-side metrics:
Publisher metrics
| Metric | Why it matters |
|---|---|
| publish attempts | input rate |
| publish confirms | broker acceptance rate |
| confirm latency | broker/disk/quorum pressure signal |
| publish failures | immediate client errors |
| returned messages | routing failures |
| blocked duration | broker resource alarm |
| local buffer depth | app backpressure |
Consumer metrics
| Metric | Why it matters |
|---|---|
| deliveries | input rate |
| ack count | successful processing |
| nack/reject count | failure handling |
| processing latency | business handler health |
| in-flight count | capacity pressure |
| redelivery count | duplicates/retry loops |
| consumer cancellations | broker/topology issue |
| worker queue depth | local saturation |
Connection metrics
| Metric | Why it matters |
|---|---|
| connection open/closed | availability |
| recovery started/completed | network/broker instability |
| channel count | resource leak |
| heartbeat failures | transport instability |
| blocked/unblocked | broker pressure |
20. Error Handling Boundary
Not all exceptions mean the same thing.
| Error location | Example | Correct response |
|---|---|---|
| Connection open | auth fail, DNS fail | fail readiness, retry with backoff |
| Channel operation | precondition failed | fail fast; topology mismatch |
| Publish | IO exception | uncertain publish; use confirm/idempotency |
| Return listener | unroutable | routing failure; alert/fail command |
| Consumer processing | validation error | reject/DLQ, no retry |
| Consumer processing | transient DB timeout | retry/nack/requeue with budget |
| Ack | channel closed | message may redeliver; idempotency required |
A top-tier design never catches Exception and blindly basicAcks.
21. Channel Pooling for Publishers
For multi-threaded publishers, use a channel pool or one channel per publishing thread.
21.1 Simple bounded channel pool
public final class ChannelPool implements AutoCloseable {
private final BlockingQueue<Channel> channels;
public ChannelPool(Connection connection, int size) throws IOException {
this.channels = new ArrayBlockingQueue<>(size);
for (int i = 0; i < size; i++) {
Channel ch = connection.createChannel();
ch.confirmSelect();
channels.add(ch);
}
}
public <T> T withChannel(ChannelOperation<T> operation) throws Exception {
Channel channel = channels.take();
try {
return operation.execute(channel);
} finally {
if (channel.isOpen()) {
channels.offer(channel);
} else {
// production code should recreate or mark pool degraded
}
}
}
@Override
public void close() throws Exception {
for (Channel ch : channels) {
if (ch.isOpen()) {
ch.close();
}
}
}
@FunctionalInterface
public interface ChannelOperation<T> {
T execute(Channel channel) throws Exception;
}
}
21.2 Pool sizing
Start small:
publisherChannelPoolSize = min(availableProcessors, 8)
Then measure:
- publish throughput;
- confirm latency;
- blocked time;
- CPU;
- broker channel count;
- memory allocation.
More channels is not always better. It can increase broker/client overhead and make confirms harder to reason about.
22. Consumer Channel Strategy
Common options:
22.1 One channel per queue consumer
Simple and safe.
q.order.commands -> channel A -> worker pool A
q.payment.events -> channel B -> worker pool B
Good default.
22.2 One channel per worker
Useful when:
- strict channel serialization desired;
- each worker consumes independently;
- simpler ack ownership;
- higher parallelism needed.
Trade-off:
- more channels;
- more consumers;
- prefetch distribution must be tuned.
22.3 One channel for many queues
Possible, but often not ideal because:
- delivery tags share channel scope;
- QoS interaction can surprise;
- failure closes all consumers on that channel;
- observability less clean.
Rule:
Prefer isolation by workload. Shared channel optimization should be justified by measurement, not habit.
23. Topology Declaration in Java
Example topology declaration:
public final class CommerceTopology {
public void declare(Channel channel) throws IOException {
channel.exchangeDeclare("ex.commerce.order.events", BuiltinExchangeType.TOPIC, true);
channel.exchangeDeclare("ex.commerce.order.dlx", BuiltinExchangeType.DIRECT, true);
Map<String, Object> queueArgs = Map.of(
"x-dead-letter-exchange", "ex.commerce.order.dlx",
"x-dead-letter-routing-key", "commerce.order-events.failed"
);
channel.queueDeclare("q.billing.order-events", true, false, false, queueArgs);
channel.queueBind("q.billing.order-events", "ex.commerce.order.events", "commerce.order.created");
channel.queueBind("q.billing.order-events", "ex.commerce.order.events", "commerce.order.paid");
channel.queueDeclare("q.billing.order-events.dlq", true, false, false, Map.of());
channel.queueBind(
"q.billing.order-events.dlq",
"ex.commerce.order.dlx",
"commerce.order-events.failed"
);
}
}
Production note:
- declaration must be idempotent with identical arguments;
- mismatched durable/arguments causes precondition failure;
- shared topology often belongs in infra repo/operator, not service code;
- passive validation is safer for shared objects.
24. Passive Validation Example
public final class TopologyValidator {
public void validate(Channel channel) throws IOException {
channel.exchangeDeclarePassive("ex.commerce.order.events");
channel.exchangeDeclarePassive("ex.commerce.order.dlx");
channel.queueDeclarePassive("q.billing.order-events");
channel.queueDeclarePassive("q.billing.order-events.dlq");
}
}
If passive declare fails, the channel is usually closed by broker due to protocol exception. Production validator should create a fresh channel for validation and close it after use.
try (Channel validationChannel = connection.createChannel()) {
validator.validate(validationChannel);
}
25. Message Properties Builder
Standardize message metadata.
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.contentEncoding("utf-8")
.deliveryMode(2) // persistent
.messageId(UUID.randomUUID().toString())
.correlationId(correlationId)
.type("commerce.order.created")
.timestamp(Date.from(Instant.now()))
.headers(Map.of(
"schema", "commerce.order.created.v1",
"producer", "order-service",
"causationId", causationId
))
.build();
Important:
deliveryMode(2)marks message persistent;messageIdsupports idempotency/dedup;correlationIdsupports tracing request/workflow;typehelps consumer dispatch;- headers should not become ungoverned payload.
26. Configuration Model
Represent RabbitMQ config explicitly.
public record RabbitMqConfig(
String host,
int port,
String virtualHost,
String username,
String password,
int connectionTimeoutMillis,
int heartbeatSeconds,
boolean automaticRecovery,
boolean topologyRecovery,
int networkRecoveryIntervalMillis,
int publisherChannelPoolSize,
int consumerPrefetch
) {}
Validation:
public void validate(RabbitMqConfig cfg) {
if (cfg.host() == null || cfg.host().isBlank()) throw new IllegalArgumentException("host required");
if (cfg.port() <= 0) throw new IllegalArgumentException("port invalid");
if (cfg.heartbeatSeconds() < 5) throw new IllegalArgumentException("heartbeat too low");
if (cfg.consumerPrefetch() <= 0) throw new IllegalArgumentException("prefetch required");
}
Config should be treated as production control surface, not incidental property bag.
27. Minimal Production Wrapper
A practical internal wrapper can hide low-level lifecycle without hiding semantics.
public interface MessagePublisher {
void publish(PublishRequest request) throws PublishException;
}
public record PublishRequest(
String exchange,
String routingKey,
String messageId,
String correlationId,
String type,
Map<String, Object> headers,
byte[] body,
boolean mandatory
) {}
But avoid wrapper that hides critical choices:
Bad:
rabbit.send("some-data");
Better:
publisher.publish(new PublishRequest(
"ex.commerce.order.events",
"commerce.order.created",
messageId,
correlationId,
"commerce.order.created",
headers,
body,
true
));
A good wrapper standardizes metadata, confirms, metrics, and error handling while keeping exchange/routing semantics visible.
28. Failure Matrix for Java Client Lifecycle
| Moment | Failure | Observable symptom | Correct mental model |
|---|---|---|---|
| Startup | Broker unreachable | connection timeout | service not ready |
| Startup | Queue arg mismatch | precondition failed, channel closed | topology incompatible |
| Publish | Channel closes before confirm | IOException/timeout | publish state uncertain |
| Publish | Message returned | return listener fires | routing failure, not broker storage failure |
| Runtime | Broker memory alarm | connection blocked | apply backpressure |
| Consume | Handler throws validation error | business exception | reject/DLQ, do not infinite retry |
| Consume | DB commit succeeds, ack fails | channel closed | duplicate likely, idempotency required |
| Recovery | Reconnect succeeds | recovery listener fires | topology/consumers may resume |
| Shutdown | Consumer killed mid-processing | redelivery later | must tolerate duplicate |
29. Testing Client Lifecycle
29.1 Unit tests
Test pure components:
- routing key builder;
- message property builder;
- retry classification;
- idempotency decision;
- topology object model;
- config validation.
29.2 Integration tests
Use real RabbitMQ, not mocks, for:
- exchange/queue/binding declaration;
- mandatory publish returns;
- publisher confirms;
- manual ack/nack;
- prefetch behavior;
- DLX routing;
- reconnect behavior.
29.3 Failure tests
Simulate:
- broker restart;
- close channel externally;
- invalid queue declaration;
- network interruption;
- consumer throws exception;
- worker pool saturation;
- blocked publish path if possible.
30. Common Mistakes
30.1 Creating connection per operation
Symptom:
- high latency;
- broker connection churn;
- CPU overhead;
- connection limit issues.
Fix:
- long-lived connections;
- channel reuse/pool.
30.2 Sharing one channel across all publisher threads
Symptom:
- random protocol errors;
- confirm confusion;
- throughput instability.
Fix:
- channel per thread or bounded channel pool.
30.3 Auto-ack for business-critical consumer
Symptom:
- message lost when handler crashes after delivery;
- no redelivery.
Fix:
- manual ack after successful commit.
30.4 Unbounded worker queue
Symptom:
- JVM memory grows;
- OOM under traffic spike;
- shutdown impossible.
Fix:
- prefetch + bounded executor + backpressure.
30.5 Blind automatic recovery trust
Symptom:
- duplicate side effects;
- hidden reconnect loops;
- recovered stale topology.
Fix:
- recovery metrics;
- idempotency;
- topology validation;
- explicit failure tests.
30.6 Ack on wrong channel
Symptom:
- unknown delivery tag;
- channel closed;
- redelivery storm.
Fix:
- keep ack tied to delivery channel;
- use ack dispatcher per channel.
31. Practice: Build a Production-Ready Client Skeleton
31.1 Target
Build a small Java module:
rabbitmq-client-core/
RabbitMqConfig.java
RabbitMqConnectionManager.java
TopologyValidator.java
ConfirmingPublisher.java
ConsumerRunner.java
AckDispatcher.java
RabbitMetrics.java
31.2 Requirements
- one publisher connection;
- one consumer connection;
- connection names;
- heartbeat configured;
- automatic recovery enabled;
- topology validation on startup;
- publisher channel with confirms;
- mandatory publish return listener;
- consumer manual ack;
- prefetch configured;
- shutdown hook drains in-flight work;
- metrics logs for publish, ack, nack, return, recovery.
31.3 Experiments
Run these experiments:
- Start service with broker down.
- Start broker, observe recovery.
- Publish to missing binding with
mandatory=true. - Consume message and throw validation exception.
- Consume message, commit local side effect, kill app before ack.
- Restart app and verify duplicate handling strategy.
- Increase prefetch to 1000 and slow handler artificially.
- Compare memory and redelivery behavior.
31.4 Expected learning
You should see that:
- connection recovery is not business recovery;
- manual ack is a correctness boundary;
- prefetch is backpressure;
- channel ownership matters;
- publish success is not consumer success;
- shutdown design affects duplicate behavior.
32. Internal Engineering Standard Draft
Use this as a draft standard for RabbitMQ Java services:
# RabbitMQ Java Client Standard
## Connections
- Services must not create a RabbitMQ connection per message or per request.
- Services should use separate publisher and consumer connections for non-trivial workloads.
- Client-provided connection names are required.
## Channels
- Channels must not be shared across concurrent publisher threads unless access is serialized.
- Consumers should use dedicated channels per queue/workload.
- Acknowledgements must be issued on the channel that received the delivery.
## Publishing
- Critical messages must use publisher confirms.
- Critical messages should use mandatory publish or alternate exchange capture.
- Publish requests must include messageId, correlationId, type, contentType, and schema metadata.
## Consuming
- Business-critical consumers must use manual acknowledgements.
- Ack must occur only after durable business side effects are committed.
- Consumers must be idempotent.
- Prefetch must be explicitly configured and justified.
## Recovery
- Automatic recovery must be observable through metrics/logs.
- Recovery is not a replacement for idempotency.
- Shared topology should be validated or managed declaratively.
## Shutdown
- Services must mark readiness false before stopping consumers.
- In-flight messages must be drained, acked, nacked, or allowed to redeliver intentionally.
33. Self-Correction Questions
Jawab sebelum lanjut:
- Apa bedanya
ConnectionFactory,Connection, danChannel? - Mengapa connection per publish adalah anti-pattern?
- Mengapa channel tidak boleh sembarang dipakai lintas thread?
- Apa arti delivery tag scoped per channel?
- Kapan ack dilakukan dalam consumer yang menulis database?
- Apa yang automatic recovery selesaikan dan tidak selesaikan?
- Mengapa publisher dan consumer sebaiknya bisa memakai connection berbeda?
- Apa yang dilakukan saat blocked connection terjadi?
- Apa urutan shutdown consumer yang aman?
- Mengapa prefetch adalah capacity contract?
34. Ringkasan
Java RabbitMQ client architecture yang benar dimulai dari ownership:
ConnectionFactorymenyimpan konfigurasi;Connectionadalah transport mahal dan long-lived;Channeladalah AMQP session yang harus dimiliki jelas;- producer butuh confirms, mandatory handling, dan backpressure;
- consumer butuh manual ack, prefetch, idempotency, dan shutdown discipline;
- recovery membantu transport, tetapi tidak menyelesaikan correctness bisnis;
- metrics dan lifecycle management adalah bagian dari desain, bukan tambahan ops belakangan.
Part berikutnya akan membahas Producer In Action: publishing semantics, message properties, mandatory flag, publisher confirms, confirm batching, timeout handling, dan producer-side backpressure.
References
- RabbitMQ Java Client API Guide: https://www.rabbitmq.com/client-libraries/java-api-guide
- RabbitMQ Documentation — Connections: https://www.rabbitmq.com/docs/connections
- RabbitMQ Documentation — Channels: https://www.rabbitmq.com/docs/channels
- RabbitMQ Documentation — Consumers: https://www.rabbitmq.com/docs/consumers
- RabbitMQ Documentation — Consumer Acknowledgements and Publisher Confirms: https://www.rabbitmq.com/docs/confirms
- RabbitMQ Documentation — Queues and Prefetch/Consumer Overload: https://www.rabbitmq.com/docs/queues
- RabbitMQ Documentation — Publishers: https://www.rabbitmq.com/docs/publishers
- RabbitMQ Java Client Current API: https://rabbitmq.github.io/rabbitmq-java-client/api/current/
You just completed lesson 04 in start here. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.
Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.