Start HereOrdered learning track

Java RabbitMQ Client Architecture: Connection, Channel, Threading, Lifecycle

Learn Java RabbitMQ, RabbitMQ Streams, Patterns, and Deployment In Action - Part 004

Production-grade RabbitMQ Java client architecture: ConnectionFactory, Connection, Channel, threading, lifecycle, recovery, heartbeat, blocked connection handling, and safe shutdown.

19 min read3726 words
PrevNext
Lesson 0435 lesson track0106 Start Here
#java#rabbitmq#amqp-client#connection+6 more

Part 004 — Java RabbitMQ Client Architecture: Connection, Channel, Threading, Lifecycle

1. Tujuan Part Ini

Part ini membahas arsitektur Java client RabbitMQ untuk production service. Kita belum fokus ke pattern bisnis seperti retry, DLQ, outbox, atau stream processing. Fokus kita adalah pertanyaan dasar yang sering menyebabkan bug serius:

  1. berapa banyak connection yang harus dibuat;
  2. kapan membuat channel baru;
  3. apakah channel boleh dipakai lintas thread;
  4. bagaimana mengelola producer dan consumer lifecycle;
  5. bagaimana connection recovery bekerja;
  6. bagaimana heartbeat, network failure, dan blocked connection memengaruhi service;
  7. bagaimana shutdown yang benar agar tidak kehilangan pesan atau meninggalkan state ambigu.

Dalam kerangka Kaufman, part ini adalah fase remove practice barriers. Kita membangun client architecture yang stabil agar latihan berikutnya—publisher confirms, manual ack, retry, stream, benchmarking—tidak kacau karena lifecycle client salah.


2. Mental Model Java Client

RabbitMQ Java client mengekspos konsep utama AMQP 0-9-1:

Hierarki praktis:

ObjectMaknaExpensive?Threading guidance
ConnectionFactoryFactory/config holderTidak mahalBisa singleton/config bean
ConnectionTCP connection ke brokerRelatif mahalBiasanya sedikit per app instance
ChannelVirtual AMQP session di atas connectionLebih murah dari connectionJangan share sembarang lintas thread
ConsumerCallback/subscription di channelTerkait channelLifecycle harus eksplisit

Rule dasar:

Connection adalah transport. Channel adalah session/protocol lane. Producer/consumer operations berjalan di channel. Jangan desain seolah channel adalah stateless utility object.


3. Dependency dan Versi Client

Maven dependency umum:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.26.0</version>
</dependency>

Dalam project real, jangan hardcode versi di modul service. Pakai dependency management/BOM internal agar:

  • patch security bisa dinaikkan terpusat;
  • behavior recovery/metrics konsisten;
  • integration test matrix jelas;
  • library wrapper bisa dikembangkan tanpa dependency drift.

4. ConnectionFactory: Konfigurasi Transport dan Auth

ConnectionFactory menyimpan konfigurasi broker connection.

Contoh:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("rabbitmq.internal");
factory.setPort(5672);
factory.setVirtualHost("/commerce");
factory.setUsername("order-service");
factory.setPassword(System.getenv("RABBITMQ_PASSWORD"));
factory.setConnectionTimeout(10_000);
factory.setRequestedHeartbeat(30);
factory.setAutomaticRecoveryEnabled(true);
factory.setTopologyRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5_000);

Catatan:

  • credentials harus dari secret manager, bukan config file plain;
  • virtual host adalah isolation boundary;
  • heartbeat harus disesuaikan dengan network dan load balancer;
  • automatic recovery membantu, tetapi bukan pengganti idempotency;
  • topology recovery berguna, tetapi service tetap harus punya startup validation.

4.1 URI-based configuration

factory.setUri("amqps://order-service:${password}@rabbitmq.internal:5671/%2Fcommerce");

Pastikan vhost / di-encode sebagai %2F jika memakai URI.

4.2 TLS

Untuk production, sering memakai TLS:

factory.useSslProtocol();

Dalam environment regulated, TLS bukan checklist kosmetik. Pertanyaan yang harus dijawab:

  • siapa CA yang dipercaya;
  • bagaimana certificate rotation dilakukan;
  • apakah mutual TLS dipakai;
  • apakah hostname verification aktif;
  • bagaimana secret/cert disuntikkan ke container;
  • apa runbook saat certificate expired.

Security detail akan dibahas di Part 034.


5. Connection: TCP Transport dan Failure Boundary

Connection mewakili koneksi TCP ke broker. Satu connection bisa multiplex banyak channel.

5.1 Berapa connection per service instance?

Default yang masuk akal:

  • satu connection untuk publisher;
  • satu connection untuk consumers;
  • optional satu connection untuk admin/topology/health;
  • optional dedicated connection untuk high-throughput publisher.

Mengapa tidak satu connection per publish?

  • TCP handshake mahal;
  • authentication mahal;
  • broker connection count meningkat;
  • heartbeat/thread/resource overhead naik;
  • throughput buruk.

Mengapa tidak selalu satu connection untuk semua hal?

  • publisher yang blocked bisa memengaruhi consumer jika sharing transport;
  • high-volume publish bisa mengganggu latency consumer;
  • operational isolation lebih rendah;
  • shutdown producer dan consumer punya lifecycle berbeda.

Practical pattern:

5.2 Connection naming

Berikan client-provided connection name agar terlihat di RabbitMQ management UI:

Connection connection = factory.newConnection("order-service.publisher.us-east-1.instance-7");

Nama connection memudahkan incident response:

  • connection mana yang blocked;
  • service mana yang membuka terlalu banyak channel;
  • instance mana yang tidak reconnect;
  • workload mana yang publish rate-nya naik.

6. Channel: AMQP Session, Bukan Thread-Safe Utility

Channel adalah virtual session di atas connection. Banyak AMQP operation dilakukan di channel:

  • exchangeDeclare;
  • queueDeclare;
  • queueBind;
  • basicPublish;
  • basicConsume;
  • basicAck;
  • basicNack;
  • confirmSelect;
  • basicQos.

6.1 Channel lifecycle

try (Connection connection = factory.newConnection("demo")) {
    try (Channel channel = connection.createChannel()) {
        channel.exchangeDeclare("ex.demo", BuiltinExchangeType.DIRECT, true);
        channel.basicPublish("ex.demo", "demo.key", null, "hello".getBytes(StandardCharsets.UTF_8));
    }
}

Ini baik untuk demo, tetapi production service tidak membuka/menutup connection dan channel per message.

6.2 Channel per thread / channel per worker

Guideline production:

  • jangan share channel untuk concurrent publish dari banyak thread tanpa serialisasi eksplisit;
  • gunakan channel per publisher thread atau channel pool;
  • consumer channel biasanya dedicated untuk subscription tersebut;
  • ack sebaiknya dilakukan di channel yang menerima delivery atau melalui desain yang menjamin serialization.

Kenapa?

AMQP memakai frames. Concurrent operation di channel yang sama bisa menyebabkan frame interleaving dan efek samping dengan publisher confirms. Selain itu, delivery tags scoped per channel; ack dengan channel yang salah adalah bug.

6.3 Delivery tag scoped to channel

Delivery tag bukan global message id. Delivery tag hanya bermakna pada channel tempat delivery diterima.

Buruk:

// Message delivered on consumerChannelA
long tag = delivery.getEnvelope().getDeliveryTag();

// Later, wrong channel used
publisherChannel.basicAck(tag, false); // wrong model

Benar:

consumerChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

Jika processing dipindah ke worker thread, desain ack path harus tetap aman.


7. Threading Model: Consumer Callback dan Worker Pool

RabbitMQ Java client menerima frames dari broker dan menjalankan consumer callback melalui executor yang terkait connection/client.

Production consumer tidak boleh melakukan pekerjaan berat tanpa batas di callback thread jika itu menghambat delivery handling. Gunakan worker pool dengan bounded queue.

7.1 Simple callback consumer

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    try {
        process(delivery);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    } catch (Exception e) {
        channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
    }
};

channel.basicQos(50);
channel.basicConsume("q.order.commands", false, deliverCallback, consumerTag -> {});

Ini cukup untuk processing ringan. Untuk processing berat, blocking I/O, atau external call, lebih baik memisahkan delivery reception dan processing.

7.2 Worker pool with bounded executor

public final class RabbitConsumerWorker implements AutoCloseable {
    private final Channel channel;
    private final ExecutorService workers;
    private final Semaphore inFlight;

    public RabbitConsumerWorker(Channel channel, int maxInFlight, int workerThreads) {
        this.channel = channel;
        this.workers = Executors.newFixedThreadPool(workerThreads);
        this.inFlight = new Semaphore(maxInFlight);
    }

    public DeliverCallback callback() {
        return (consumerTag, delivery) -> {
            if (!inFlight.tryAcquire()) {
                // This should rarely happen if prefetch is aligned with maxInFlight.
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                return;
            }

            workers.submit(() -> {
                try {
                    process(delivery);
                    synchronized (channel) {
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    }
                } catch (RetryableException e) {
                    synchronized (channel) {
                        channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                    }
                } catch (Exception e) {
                    synchronized (channel) {
                        channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
                    }
                } finally {
                    inFlight.release();
                }
            });
        };
    }

    private void process(Delivery delivery) {
        // business logic here
    }

    @Override
    public void close() {
        workers.shutdown();
    }
}

Catatan penting:

  • contoh ini memakai synchronized(channel) untuk mencegah concurrent ack/nack di channel yang sama;
  • desain yang lebih bersih adalah satu consumer channel per worker, atau dedicated ack dispatcher thread;
  • prefetch harus selaras dengan worker capacity;
  • bounded executor lebih aman daripada unbounded queue.

7.3 Ack dispatcher pattern

Untuk throughput dan safety lebih baik, pakai single-thread ack dispatcher per channel:

Dengan ini:

  • worker tidak memanggil channel langsung;
  • ack/nack serialized;
  • delivery tag tetap diproses di channel yang benar;
  • shutdown lebih mudah dikontrol.

8. Publisher Architecture

Producer production-grade biasanya membutuhkan:

  • long-lived connection;
  • channel pool atau channel per publishing thread;
  • publisher confirms;
  • bounded local queue;
  • timeout handling;
  • returned-message listener untuk mandatory publish;
  • metrics.

8.1 Naive publisher

public void publish(byte[] body) throws IOException {
    Channel channel = connection.createChannel();
    channel.basicPublish("ex.order.events", "commerce.order.created", null, body);
    channel.close();
}

Masalah:

  • create/close channel per message mahal;
  • tidak ada publisher confirm;
  • tidak ada mandatory handling;
  • tidak ada backpressure;
  • error semantics lemah.

8.2 Better publisher skeleton

public final class ConfirmingPublisher implements AutoCloseable {
    private final Channel channel;

    public ConfirmingPublisher(Connection connection) throws IOException {
        this.channel = connection.createChannel();
        this.channel.confirmSelect();
        this.channel.addReturnListener(returned -> {
            // emit metric + persist failure + alert if critical
            System.err.printf("Unroutable message: exchange=%s routingKey=%s replyText=%s%n",
                returned.getExchange(),
                returned.getRoutingKey(),
                returned.getReplyText());
        });
    }

    public void publish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
        throws IOException, InterruptedException, TimeoutException {

        channel.basicPublish(exchange, routingKey, true, props, body);

        boolean confirmed = channel.waitForConfirms(5_000);
        if (!confirmed) {
            throw new IOException("Publish was not confirmed within timeout");
        }
    }

    @Override
    public void close() throws Exception {
        channel.close();
    }
}

Ini masih sederhana. Di throughput tinggi, waitForConfirms per message terlalu lambat. Kita akan bahas async confirms dan batching di Part 005 dan Part 026.


9. Consumer Architecture

Consumer production-grade butuh:

  • dedicated channel;
  • manual ack;
  • configured prefetch;
  • bounded worker capacity;
  • idempotent processing;
  • explicit nack/reject strategy;
  • cancellation callback;
  • shutdown drain;
  • metrics.

9.1 Consumer skeleton

public final class CommandConsumer implements AutoCloseable {
    private final Channel channel;
    private final String queue;
    private String consumerTag;

    public CommandConsumer(Connection connection, String queue, int prefetch) throws IOException {
        this.channel = connection.createChannel();
        this.queue = queue;
        this.channel.basicQos(prefetch);
    }

    public void start() throws IOException {
        DeliverCallback onDelivery = (tag, delivery) -> {
            long deliveryTag = delivery.getEnvelope().getDeliveryTag();
            try {
                handle(delivery);
                channel.basicAck(deliveryTag, false);
            } catch (RetryableException e) {
                channel.basicNack(deliveryTag, false, true);
            } catch (Exception e) {
                channel.basicNack(deliveryTag, false, false);
            }
        };

        CancelCallback onCancel = tag -> {
            // emit metric and trigger service health degradation if needed
        };

        this.consumerTag = channel.basicConsume(queue, false, onDelivery, onCancel);
    }

    private void handle(Delivery delivery) {
        // parse, validate, apply idempotency, execute business transaction
    }

    @Override
    public void close() throws Exception {
        if (consumerTag != null && channel.isOpen()) {
            channel.basicCancel(consumerTag);
        }
        channel.close();
    }
}

9.2 The consumer invariant

Ack only after business side effect is safely committed.

If message processing writes to database, order biasanya:

  1. receive delivery;
  2. validate message;
  3. check idempotency;
  4. execute business transaction;
  5. commit transaction;
  6. ack delivery.

Jika service crash setelah commit sebelum ack, message bisa redelivered. Karena itu idempotency wajib.


10. Prefetch as Capacity Contract

basicQos(prefetch) membatasi jumlah unacknowledged deliveries yang boleh outstanding.

Prefetch bukan sekadar performance tuning. Prefetch adalah contract antara broker dan consumer:

“Consumer ini sanggup memegang N message yang belum di-ack.”

10.1 Example

Jika:

  • worker threads = 10;
  • average processing time = 200 ms;
  • p95 processing time = 2 s;
  • external dependency kadang lambat;

prefetch 1000 buruk karena satu instance bisa menahan 1000 message yang belum selesai. Saat instance mati, 1000 message redelivered. Saat external dependency lambat, broker terus mengirim terlalu banyak message.

Prefetch awal lebih masuk akal:

prefetch = workerThreads * smallMultiplier
         = 10 * 2
         = 20

Lalu ukur throughput, latency, redelivery, memory, dan queue depth.

10.2 Prefetch and fairness

Prefetch terlalu tinggi bisa membuat satu consumer mengambil terlalu banyak message, sehingga consumer lain idle. Prefetch terlalu rendah bisa mengurangi throughput karena round-trip dan idle worker.

Rule:

  • CPU-bound work: prefetch mendekati worker count;
  • I/O-bound work: prefetch bisa lebih tinggi, tetapi bounded;
  • strict ordering: prefetch 1 atau single active consumer;
  • high memory payload: prefetch rendah;
  • slow external dependency: prefetch rendah + circuit breaker.

11. Heartbeat dan Network Failure

Heartbeat membantu client dan broker mendeteksi broken TCP connection yang tidak cleanly closed.

Jika heartbeat terlalu pendek:

  • false positive saat GC pause atau network jitter;
  • connection sering terputus;
  • recovery storm.

Jika heartbeat terlalu panjang:

  • failure detection lambat;
  • message stuck lebih lama;
  • failover lambat.

Common starting point: 30 sampai 60 detik, lalu validasi dengan environment nyata.

11.1 Load balancer concern

Jika connection melewati load balancer/firewall/NAT:

  • idle timeout harus lebih besar dari heartbeat behavior;
  • TCP keepalive bisa relevan;
  • connection draining saat deploy harus diuji;
  • broker/client logs harus dikorelasikan.

12. Automatic Recovery

RabbitMQ Java client mendukung automatic connection recovery. Saat connection gagal, client mencoba reconnect. Jika topology recovery enabled, client juga mencoba memulihkan channel, exchanges, queues, bindings, dan consumers yang pernah dideklarasikan melalui connection tersebut.

12.1 What recovery does not solve

Automatic recovery does not make publish exactly-once. It does not remove duplicates. It does not guarantee external side effects are atomic with ack. It does not know business idempotency.

Failure cases still exist:

FailureWhat can happenRequired design
Connection lost after publish before confirmProducer uncertainconfirm tracking + idempotent message id
Consumer commits DB then connection lost before ackMessage redeliveredidempotent consumer
Topology changed externally during outageRecovery mismatchtopology governance + startup validation
Broker blocked due to resource alarmPublish stalls/failsblocked listener + backpressure

12.2 Recovery listener

Add recovery listeners for observability:

Connection connection = factory.newConnection("order-service.consumer");

if (connection instanceof Recoverable recoverable) {
    recoverable.addRecoveryListener(new RecoveryListener() {
        @Override
        public void handleRecoveryStarted(Recoverable recoverable) {
            // metric: rabbitmq.recovery.started
        }

        @Override
        public void handleRecovery(Recoverable recoverable) {
            // metric: rabbitmq.recovery.completed
        }
    });
}

13. Topology Recovery: Helpful but Dangerous if Misunderstood

Topology recovery can redeclare topology after reconnect. This is convenient, but it can hide governance problems.

13.1 Good use

  • local/dev environment;
  • service-owned queue and binding;
  • temporary queue;
  • consumers that self-declare private topology.

13.2 Risky use

  • shared exchange managed by platform;
  • topology arguments changed by migration;
  • service has permissions to declare too much;
  • multiple versions of service declare incompatible queue args.

Example risk:

Service v1 declares:

queueDeclare("q.billing.order-events", true, false, false, Map.of(
    "x-queue-type", "classic"
));

Service v2 declares:

queueDeclare("q.billing.order-events", true, false, false, Map.of(
    "x-queue-type", "quorum"
));

This mismatch can fail declaration and disrupt startup/recovery.

Rule:

Use topology recovery intentionally. For shared production topology, prefer GitOps/operator-managed topology or strict startup validation.


14. Blocked Connection Handling

RabbitMQ can block publishers when broker resources such as memory or disk are under pressure. Java client can register BlockedListener.

connection.addBlockedListener(new BlockedListener() {
    @Override
    public void handleBlocked(String reason) {
        // stop accepting new publish requests, trip readiness, emit alert
        System.err.println("RabbitMQ connection blocked: " + reason);
    }

    @Override
    public void handleUnblocked() {
        // resume publish cautiously
        System.out.println("RabbitMQ connection unblocked");
    }
});

Production behavior:

  • pause or shed new publish workload;
  • return 503 for synchronous API if command publish is required;
  • buffer only within strict bounded limit;
  • alert on sustained blocked connection;
  • do not build unbounded in-memory publish queue.

14.1 Separate publisher and consumer connections

If publisher connection is blocked, you often do not want consumer lifecycle coupled to it. Separate connections help isolate failure domains.


15. Resource Ownership in a Java Service

A production service should have explicit ownership for RabbitMQ resources.

Suggested modules:

ComponentResponsibility
RabbitConnectionFactoryProviderBuild configured factory
RabbitConnectionManagerOwn long-lived connections
TopologyValidatorPassive declare / validate required topology
PublisherManagerOwn publishing channels and confirm strategy
ConsumerManagerStart/stop consumers
AckDispatcherSerialize ack/nack if worker pool used
RabbitHealthIndicatorConnection/channel/topology health
RabbitMetricsBinderPublish/consume/recovery metrics

16. Startup Lifecycle

Recommended startup order:

Why topology before consumers?

  • fail fast if dependency missing;
  • avoid consumer starting on wrong queue;
  • avoid publish path becoming healthy when route invalid;
  • make readiness meaningful.

16.1 Readiness vs liveness

SignalMeaningRabbitMQ relationship
LivenessProcess should be restarted if falseJVM deadlock, fatal stuck state
ReadinessService can handle trafficRabbitMQ publish/consume dependency healthy

Do not make liveness fail just because RabbitMQ is temporarily down unless the process cannot recover. Prefer readiness degradation.


17. Shutdown Lifecycle

Shutdown must prevent new work, drain in-flight work, and close channels cleanly.

Recommended order:

17.1 Consumer shutdown decision

For each in-flight message:

StateShutdown action
Not startedLet broker redeliver after cancel/close
Processing not committedNack/requeue or close channel to redeliver
Business commit doneAck if possible; rely on idempotency if not
Poison classifiedNack requeue=false to DLQ

17.2 Avoid this

System.exit(0); // without cancelling consumers, flushing confirms, or closing channels

Abrupt shutdown is sometimes unavoidable, but your normal deployment path should not create duplicate storms or ambiguous publish state.


18. Health Checks

A RabbitMQ health check should not simply be “connection object is not null”.

Better levels:

18.1 Basic connectivity

  • connection open;
  • channel can be created;
  • heartbeat not failed.

18.2 Topology dependency

  • required exchange exists;
  • required queue exists;
  • service has permissions;
  • passive declare succeeds.

18.3 Functional publish path

For critical publish-only service, optionally publish synthetic message to a health exchange/queue. Do not publish fake business messages.

18.4 Consumer health

  • consumer tag active;
  • channel open;
  • recent deliveries or queue idle expected;
  • in-flight below threshold;
  • redelivery/DLQ not exploding.

19. Metrics

At minimum, instrument client-side metrics:

Publisher metrics

MetricWhy it matters
publish attemptsinput rate
publish confirmsbroker acceptance rate
confirm latencybroker/disk/quorum pressure signal
publish failuresimmediate client errors
returned messagesrouting failures
blocked durationbroker resource alarm
local buffer depthapp backpressure

Consumer metrics

MetricWhy it matters
deliveriesinput rate
ack countsuccessful processing
nack/reject countfailure handling
processing latencybusiness handler health
in-flight countcapacity pressure
redelivery countduplicates/retry loops
consumer cancellationsbroker/topology issue
worker queue depthlocal saturation

Connection metrics

MetricWhy it matters
connection open/closedavailability
recovery started/completednetwork/broker instability
channel countresource leak
heartbeat failurestransport instability
blocked/unblockedbroker pressure

20. Error Handling Boundary

Not all exceptions mean the same thing.

Error locationExampleCorrect response
Connection openauth fail, DNS failfail readiness, retry with backoff
Channel operationprecondition failedfail fast; topology mismatch
PublishIO exceptionuncertain publish; use confirm/idempotency
Return listenerunroutablerouting failure; alert/fail command
Consumer processingvalidation errorreject/DLQ, no retry
Consumer processingtransient DB timeoutretry/nack/requeue with budget
Ackchannel closedmessage may redeliver; idempotency required

A top-tier design never catches Exception and blindly basicAcks.


21. Channel Pooling for Publishers

For multi-threaded publishers, use a channel pool or one channel per publishing thread.

21.1 Simple bounded channel pool

public final class ChannelPool implements AutoCloseable {
    private final BlockingQueue<Channel> channels;

    public ChannelPool(Connection connection, int size) throws IOException {
        this.channels = new ArrayBlockingQueue<>(size);
        for (int i = 0; i < size; i++) {
            Channel ch = connection.createChannel();
            ch.confirmSelect();
            channels.add(ch);
        }
    }

    public <T> T withChannel(ChannelOperation<T> operation) throws Exception {
        Channel channel = channels.take();
        try {
            return operation.execute(channel);
        } finally {
            if (channel.isOpen()) {
                channels.offer(channel);
            } else {
                // production code should recreate or mark pool degraded
            }
        }
    }

    @Override
    public void close() throws Exception {
        for (Channel ch : channels) {
            if (ch.isOpen()) {
                ch.close();
            }
        }
    }

    @FunctionalInterface
    public interface ChannelOperation<T> {
        T execute(Channel channel) throws Exception;
    }
}

21.2 Pool sizing

Start small:

publisherChannelPoolSize = min(availableProcessors, 8)

Then measure:

  • publish throughput;
  • confirm latency;
  • blocked time;
  • CPU;
  • broker channel count;
  • memory allocation.

More channels is not always better. It can increase broker/client overhead and make confirms harder to reason about.


22. Consumer Channel Strategy

Common options:

22.1 One channel per queue consumer

Simple and safe.

q.order.commands -> channel A -> worker pool A
q.payment.events -> channel B -> worker pool B

Good default.

22.2 One channel per worker

Useful when:

  • strict channel serialization desired;
  • each worker consumes independently;
  • simpler ack ownership;
  • higher parallelism needed.

Trade-off:

  • more channels;
  • more consumers;
  • prefetch distribution must be tuned.

22.3 One channel for many queues

Possible, but often not ideal because:

  • delivery tags share channel scope;
  • QoS interaction can surprise;
  • failure closes all consumers on that channel;
  • observability less clean.

Rule:

Prefer isolation by workload. Shared channel optimization should be justified by measurement, not habit.


23. Topology Declaration in Java

Example topology declaration:

public final class CommerceTopology {
    public void declare(Channel channel) throws IOException {
        channel.exchangeDeclare("ex.commerce.order.events", BuiltinExchangeType.TOPIC, true);
        channel.exchangeDeclare("ex.commerce.order.dlx", BuiltinExchangeType.DIRECT, true);

        Map<String, Object> queueArgs = Map.of(
            "x-dead-letter-exchange", "ex.commerce.order.dlx",
            "x-dead-letter-routing-key", "commerce.order-events.failed"
        );

        channel.queueDeclare("q.billing.order-events", true, false, false, queueArgs);
        channel.queueBind("q.billing.order-events", "ex.commerce.order.events", "commerce.order.created");
        channel.queueBind("q.billing.order-events", "ex.commerce.order.events", "commerce.order.paid");

        channel.queueDeclare("q.billing.order-events.dlq", true, false, false, Map.of());
        channel.queueBind(
            "q.billing.order-events.dlq",
            "ex.commerce.order.dlx",
            "commerce.order-events.failed"
        );
    }
}

Production note:

  • declaration must be idempotent with identical arguments;
  • mismatched durable/arguments causes precondition failure;
  • shared topology often belongs in infra repo/operator, not service code;
  • passive validation is safer for shared objects.

24. Passive Validation Example

public final class TopologyValidator {
    public void validate(Channel channel) throws IOException {
        channel.exchangeDeclarePassive("ex.commerce.order.events");
        channel.exchangeDeclarePassive("ex.commerce.order.dlx");
        channel.queueDeclarePassive("q.billing.order-events");
        channel.queueDeclarePassive("q.billing.order-events.dlq");
    }
}

If passive declare fails, the channel is usually closed by broker due to protocol exception. Production validator should create a fresh channel for validation and close it after use.

try (Channel validationChannel = connection.createChannel()) {
    validator.validate(validationChannel);
}

25. Message Properties Builder

Standardize message metadata.

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .contentType("application/json")
    .contentEncoding("utf-8")
    .deliveryMode(2) // persistent
    .messageId(UUID.randomUUID().toString())
    .correlationId(correlationId)
    .type("commerce.order.created")
    .timestamp(Date.from(Instant.now()))
    .headers(Map.of(
        "schema", "commerce.order.created.v1",
        "producer", "order-service",
        "causationId", causationId
    ))
    .build();

Important:

  • deliveryMode(2) marks message persistent;
  • messageId supports idempotency/dedup;
  • correlationId supports tracing request/workflow;
  • type helps consumer dispatch;
  • headers should not become ungoverned payload.

26. Configuration Model

Represent RabbitMQ config explicitly.

public record RabbitMqConfig(
    String host,
    int port,
    String virtualHost,
    String username,
    String password,
    int connectionTimeoutMillis,
    int heartbeatSeconds,
    boolean automaticRecovery,
    boolean topologyRecovery,
    int networkRecoveryIntervalMillis,
    int publisherChannelPoolSize,
    int consumerPrefetch
) {}

Validation:

public void validate(RabbitMqConfig cfg) {
    if (cfg.host() == null || cfg.host().isBlank()) throw new IllegalArgumentException("host required");
    if (cfg.port() <= 0) throw new IllegalArgumentException("port invalid");
    if (cfg.heartbeatSeconds() < 5) throw new IllegalArgumentException("heartbeat too low");
    if (cfg.consumerPrefetch() <= 0) throw new IllegalArgumentException("prefetch required");
}

Config should be treated as production control surface, not incidental property bag.


27. Minimal Production Wrapper

A practical internal wrapper can hide low-level lifecycle without hiding semantics.

public interface MessagePublisher {
    void publish(PublishRequest request) throws PublishException;
}

public record PublishRequest(
    String exchange,
    String routingKey,
    String messageId,
    String correlationId,
    String type,
    Map<String, Object> headers,
    byte[] body,
    boolean mandatory
) {}

But avoid wrapper that hides critical choices:

Bad:

rabbit.send("some-data");

Better:

publisher.publish(new PublishRequest(
    "ex.commerce.order.events",
    "commerce.order.created",
    messageId,
    correlationId,
    "commerce.order.created",
    headers,
    body,
    true
));

A good wrapper standardizes metadata, confirms, metrics, and error handling while keeping exchange/routing semantics visible.


28. Failure Matrix for Java Client Lifecycle

MomentFailureObservable symptomCorrect mental model
StartupBroker unreachableconnection timeoutservice not ready
StartupQueue arg mismatchprecondition failed, channel closedtopology incompatible
PublishChannel closes before confirmIOException/timeoutpublish state uncertain
PublishMessage returnedreturn listener firesrouting failure, not broker storage failure
RuntimeBroker memory alarmconnection blockedapply backpressure
ConsumeHandler throws validation errorbusiness exceptionreject/DLQ, do not infinite retry
ConsumeDB commit succeeds, ack failschannel closedduplicate likely, idempotency required
RecoveryReconnect succeedsrecovery listener firestopology/consumers may resume
ShutdownConsumer killed mid-processingredelivery latermust tolerate duplicate

29. Testing Client Lifecycle

29.1 Unit tests

Test pure components:

  • routing key builder;
  • message property builder;
  • retry classification;
  • idempotency decision;
  • topology object model;
  • config validation.

29.2 Integration tests

Use real RabbitMQ, not mocks, for:

  • exchange/queue/binding declaration;
  • mandatory publish returns;
  • publisher confirms;
  • manual ack/nack;
  • prefetch behavior;
  • DLX routing;
  • reconnect behavior.

29.3 Failure tests

Simulate:

  • broker restart;
  • close channel externally;
  • invalid queue declaration;
  • network interruption;
  • consumer throws exception;
  • worker pool saturation;
  • blocked publish path if possible.

30. Common Mistakes

30.1 Creating connection per operation

Symptom:

  • high latency;
  • broker connection churn;
  • CPU overhead;
  • connection limit issues.

Fix:

  • long-lived connections;
  • channel reuse/pool.

30.2 Sharing one channel across all publisher threads

Symptom:

  • random protocol errors;
  • confirm confusion;
  • throughput instability.

Fix:

  • channel per thread or bounded channel pool.

30.3 Auto-ack for business-critical consumer

Symptom:

  • message lost when handler crashes after delivery;
  • no redelivery.

Fix:

  • manual ack after successful commit.

30.4 Unbounded worker queue

Symptom:

  • JVM memory grows;
  • OOM under traffic spike;
  • shutdown impossible.

Fix:

  • prefetch + bounded executor + backpressure.

30.5 Blind automatic recovery trust

Symptom:

  • duplicate side effects;
  • hidden reconnect loops;
  • recovered stale topology.

Fix:

  • recovery metrics;
  • idempotency;
  • topology validation;
  • explicit failure tests.

30.6 Ack on wrong channel

Symptom:

  • unknown delivery tag;
  • channel closed;
  • redelivery storm.

Fix:

  • keep ack tied to delivery channel;
  • use ack dispatcher per channel.

31. Practice: Build a Production-Ready Client Skeleton

31.1 Target

Build a small Java module:

rabbitmq-client-core/
  RabbitMqConfig.java
  RabbitMqConnectionManager.java
  TopologyValidator.java
  ConfirmingPublisher.java
  ConsumerRunner.java
  AckDispatcher.java
  RabbitMetrics.java

31.2 Requirements

  • one publisher connection;
  • one consumer connection;
  • connection names;
  • heartbeat configured;
  • automatic recovery enabled;
  • topology validation on startup;
  • publisher channel with confirms;
  • mandatory publish return listener;
  • consumer manual ack;
  • prefetch configured;
  • shutdown hook drains in-flight work;
  • metrics logs for publish, ack, nack, return, recovery.

31.3 Experiments

Run these experiments:

  1. Start service with broker down.
  2. Start broker, observe recovery.
  3. Publish to missing binding with mandatory=true.
  4. Consume message and throw validation exception.
  5. Consume message, commit local side effect, kill app before ack.
  6. Restart app and verify duplicate handling strategy.
  7. Increase prefetch to 1000 and slow handler artificially.
  8. Compare memory and redelivery behavior.

31.4 Expected learning

You should see that:

  • connection recovery is not business recovery;
  • manual ack is a correctness boundary;
  • prefetch is backpressure;
  • channel ownership matters;
  • publish success is not consumer success;
  • shutdown design affects duplicate behavior.

32. Internal Engineering Standard Draft

Use this as a draft standard for RabbitMQ Java services:

# RabbitMQ Java Client Standard

## Connections
- Services must not create a RabbitMQ connection per message or per request.
- Services should use separate publisher and consumer connections for non-trivial workloads.
- Client-provided connection names are required.

## Channels
- Channels must not be shared across concurrent publisher threads unless access is serialized.
- Consumers should use dedicated channels per queue/workload.
- Acknowledgements must be issued on the channel that received the delivery.

## Publishing
- Critical messages must use publisher confirms.
- Critical messages should use mandatory publish or alternate exchange capture.
- Publish requests must include messageId, correlationId, type, contentType, and schema metadata.

## Consuming
- Business-critical consumers must use manual acknowledgements.
- Ack must occur only after durable business side effects are committed.
- Consumers must be idempotent.
- Prefetch must be explicitly configured and justified.

## Recovery
- Automatic recovery must be observable through metrics/logs.
- Recovery is not a replacement for idempotency.
- Shared topology should be validated or managed declaratively.

## Shutdown
- Services must mark readiness false before stopping consumers.
- In-flight messages must be drained, acked, nacked, or allowed to redeliver intentionally.

33. Self-Correction Questions

Jawab sebelum lanjut:

  1. Apa bedanya ConnectionFactory, Connection, dan Channel?
  2. Mengapa connection per publish adalah anti-pattern?
  3. Mengapa channel tidak boleh sembarang dipakai lintas thread?
  4. Apa arti delivery tag scoped per channel?
  5. Kapan ack dilakukan dalam consumer yang menulis database?
  6. Apa yang automatic recovery selesaikan dan tidak selesaikan?
  7. Mengapa publisher dan consumer sebaiknya bisa memakai connection berbeda?
  8. Apa yang dilakukan saat blocked connection terjadi?
  9. Apa urutan shutdown consumer yang aman?
  10. Mengapa prefetch adalah capacity contract?

34. Ringkasan

Java RabbitMQ client architecture yang benar dimulai dari ownership:

  • ConnectionFactory menyimpan konfigurasi;
  • Connection adalah transport mahal dan long-lived;
  • Channel adalah AMQP session yang harus dimiliki jelas;
  • producer butuh confirms, mandatory handling, dan backpressure;
  • consumer butuh manual ack, prefetch, idempotency, dan shutdown discipline;
  • recovery membantu transport, tetapi tidak menyelesaikan correctness bisnis;
  • metrics dan lifecycle management adalah bagian dari desain, bukan tambahan ops belakangan.

Part berikutnya akan membahas Producer In Action: publishing semantics, message properties, mandatory flag, publisher confirms, confirm batching, timeout handling, dan producer-side backpressure.


References

Lesson Recap

You just completed lesson 04 in start here. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.

Continue The Track

Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.