Java Stream Client: Environment, Producer, Consumer, Offset Tracking
Learn Java RabbitMQ, RabbitMQ Streams, Patterns, and Deployment In Action - Part 020
Production-oriented guide to the RabbitMQ Stream Java Client, covering Environment lifecycle, stream creation, producer confirms, message building, deduplication, consumers, offset specifications, server-side and external offset tracking, flow control, shutdown, testing, and service design.
Part 020 — Java Stream Client: Environment, Producer, Consumer, Offset Tracking
The RabbitMQ Stream Java Client is the stream-native way to create streams, publish messages, consume messages, and manage offsets from Java.
If the AMQP Java client is centered around Connection, Channel, basicPublish, and basicConsume, the Stream Java Client is centered around:
Environment— long-lived entry point to a broker/cluster;Producer— append messages to a stream;Consumer— read messages from a stream;OffsetSpecification— choose where consumption starts;- offset tracking — decide where the consumer can resume after restart.
This part is intentionally practical. We will build Java service skeletons that are safe enough to evolve into production code.
1. Kaufman Deconstruction
To use the Stream Java Client well, decompose the skill into ten capabilities:
- Environment lifecycle — create one long-lived environment per service process and close it cleanly.
- Stream topology management — create streams deliberately, avoid accidental test-style creation in production paths.
- Producer correctness — use confirms, bounded in-flight messages, durable message identity, and optional deduplication.
- Message contract mapping — encode domain envelope into Stream
Messageproperties/application properties/body. - Consumer start position — understand
first,last,next,offset, andtimestamp. - Offset tracking — choose server-side automatic, server-side manual, or external offset storage.
- Processing correctness — commit business effect before storing offset.
- Flow control — avoid unbounded processing and call
processed()when using flow strategies that require it. - Recovery behavior — reason about producer reconnect, consumer re-subscription, and duplicate processing.
- Observability/testing — instrument publishes, confirmations, handler latency, offset lag, and replay correctness.
The practical standard:
A stream consumer is not correct because it "received" a message. It is correct only when business state and offset progress have a safe relationship.
2. Maven/Gradle Dependency Boundary
Use the official RabbitMQ Stream Java Client.
Maven pattern:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>stream-client</artifactId>
<version>${rabbitmq.stream.client.version}</version>
</dependency>
Gradle pattern:
dependencies {
implementation("com.rabbitmq:stream-client:${rabbitmqStreamClientVersion}")
}
Do not hardcode an old version in learning material. In a production repository, pin the version in dependency management and upgrade using release notes plus integration tests.
Recommended project modules:
messaging-stream-contracts
messaging-stream-client
order-event-producer
order-projection-consumer
stream-test-support
Keep message contract code separated from producer/consumer lifecycle code.
3. Local Broker Setup for Stream Development
For local development, the stream plugin must be enabled.
Example:
docker run -it --rm --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-p 5552:5552 \
rabbitmq:4-management
docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream
For a real environment, do not stop at "plugin enabled". You also need:
- stream listener port exposed;
- TLS plan;
- advertised host/port correctness;
- vhost/user/permission model;
- retention policy;
- disk sizing;
- metrics and alerts;
- replica placement.
The Java Stream Client connects using a rabbitmq-stream:// URI for plaintext and rabbitmq-stream+tls:// for TLS.
4. Environment: The Long-Lived Entry Point
The Environment is the primary entry point. Treat it like a service-level resource.
import com.rabbitmq.stream.Environment;
public final class StreamEnvironmentProvider implements AutoCloseable {
private final Environment environment;
public StreamEnvironmentProvider(String uri) {
this.environment = Environment.builder()
.uri(uri)
.build();
}
public Environment environment() {
return environment;
}
@Override
public void close() {
environment.close();
}
}
Usage:
try (StreamEnvironmentProvider provider =
new StreamEnvironmentProvider("rabbitmq-stream://app:secret@rabbitmq-0:5552/%2f")) {
Environment env = provider.environment();
// create producers and consumers
}
Production invariant:
Do not create an
Environmentper message, per request, or per handler invocation.
One service process usually creates one environment during startup and closes it during shutdown.
5. Cluster Connection Strategy
A single URI is fine for local development. In production, provide multiple entry points.
Environment environment = Environment.builder()
.uris(List.of(
"rabbitmq-stream://stream-user:secret@rabbitmq-0.messaging:5552/%2f",
"rabbitmq-stream://stream-user:secret@rabbitmq-1.messaging:5552/%2f",
"rabbitmq-stream://stream-user:secret@rabbitmq-2.messaging:5552/%2f"
))
.build();
Design notes:
| Concern | Rule |
|---|---|
| initial connection | use multiple broker URIs or stable entry point |
| leader/replica locality | let client use stream metadata where possible |
| Kubernetes | ensure advertised host/port are reachable from clients |
| TLS | certificate SANs must match names returned by broker hints |
| load balancer | useful, but metadata/hints must still be understood |
Connection problems in stream deployments often come from the broker returning hostnames that the client cannot resolve or certificates that do not match advertised names.
6. Stream Creation: Application-Owned or Platform-Owned?
The Stream Java Client can create streams.
environment.streamCreator()
.stream("order.events.v1.stream")
.create();
But production topology should not be accidentally created by arbitrary application startup unless that is an explicit platform decision.
| Model | Fit | Risk |
|---|---|---|
| app creates stream on startup | small product teams, local/dev, controlled ownership | accidental drift, hidden retention defaults |
| platform IaC creates stream | regulated/prod environments | slower change cycle but safer governance |
| CI/CD topology migration | mature platform teams | requires strong review/test process |
Recommended approach:
- local/dev: app can create missing streams;
- staging/prod: topology managed by IaC/operator/migration with explicit retention and ownership;
- application startup validates expected stream exists and settings are acceptable.
Pseudo-validation:
public final class StreamTopologyValidator {
private final Environment environment;
public StreamTopologyValidator(Environment environment) {
this.environment = environment;
}
public void validateRequiredStreams() {
// Use management API, Stream API, or platform-provided metadata checks.
// Fail fast if required stream is missing or retention does not match contract.
}
}
Do not let retention default by accident.
7. Producer: The Append Path
A producer appends messages to one stream.
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Producer;
Producer producer = environment.producerBuilder()
.stream("order.events.v1.stream")
.build();
Close it during shutdown:
producer.close();
Production invariant:
A producer is not just an API object. It is a reliability boundary around confirms, retry, deduplication, in-flight limits, and observability.
8. Building Stream Messages
RabbitMQ Stream messages use a message abstraction with properties, application properties, annotations, and body.
Example mapping from domain envelope:
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.Producer;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
public final class StreamMessageFactory {
private final Producer producer;
private final JsonCodec jsonCodec;
public StreamMessageFactory(Producer producer, JsonCodec jsonCodec) {
this.producer = producer;
this.jsonCodec = jsonCodec;
}
public Message orderEvent(OrderEventEnvelope envelope) {
byte[] body = jsonCodec.toJsonBytes(envelope.payload());
return producer.messageBuilder()
.properties()
.messageId(envelope.eventId())
.correlationId(envelope.correlationId())
.contentType("application/json")
.messageBuilder()
.applicationProperties()
.entry("eventType", envelope.eventType())
.entry("schemaVersion", envelope.schemaVersion())
.entry("aggregateId", envelope.aggregateId())
.entry("aggregateVersion", envelope.aggregateVersion())
.entry("producer", envelope.producer())
.entry("partitionKey", envelope.partitionKey())
.messageBuilder()
.addData(body)
.build();
}
}
Keep the mapping deterministic. Avoid placing critical business state only in headers/properties if consumers will persist/replay using only body.
Recommended mapping:
| Domain field | Stream field |
|---|---|
| event ID | properties.messageId and envelope body |
| correlation ID | properties.correlationId and envelope body |
| content type | properties.contentType |
| event type | application property + envelope body |
| schema version | application property + envelope body |
| aggregate ID | application property + envelope body |
| partition key | application property + envelope body |
| payload | body |
Duplicating key routing/trace fields in both properties and body is acceptable when it improves operations and consumer usability.
9. Producer Confirms
Stream publishing is asynchronous. A confirmation handler is invoked when the broker confirms receipt/persistence outcome.
Minimal example:
CountDownLatch confirms = new CountDownLatch(1);
AtomicReference<Throwable> failure = new AtomicReference<>();
producer.send(message, confirmationStatus -> {
if (!confirmationStatus.isConfirmed()) {
failure.set(new IllegalStateException(
"Message was not confirmed: " + confirmationStatus.getCode()
));
}
confirms.countDown();
});
if (!confirms.await(5, TimeUnit.SECONDS)) {
throw new TimeoutException("Timed out waiting for stream publish confirm");
}
if (failure.get() != null) {
throw failure.get();
}
That is fine for tests, but not for high-throughput production because it blocks per message.
Production producer should:
- limit in-flight messages;
- handle confirms asynchronously;
- avoid heavy work in confirm callback;
- record publish latency;
- retry only when safe;
- deduplicate when ambiguity exists;
- drain on shutdown.
10. Bounded In-Flight Producer
A simple bounded producer wrapper:
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.Producer;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public final class BoundedStreamPublisher implements AutoCloseable {
private final Producer producer;
private final Semaphore inFlight;
private final Duration acquireTimeout;
public BoundedStreamPublisher(
Producer producer,
int maxInFlight,
Duration acquireTimeout
) {
this.producer = producer;
this.inFlight = new Semaphore(maxInFlight);
this.acquireTimeout = acquireTimeout;
}
public CompletableFuture<Void> publish(Message message) {
boolean acquired;
try {
acquired = inFlight.tryAcquire(
acquireTimeout.toMillis(),
TimeUnit.MILLISECONDS
);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(e);
}
if (!acquired) {
return CompletableFuture.failedFuture(
new IllegalStateException("stream publisher backpressure timeout")
);
}
CompletableFuture<Void> result = new CompletableFuture<>();
producer.send(message, confirmation -> {
try {
if (confirmation.isConfirmed()) {
result.complete(null);
} else {
result.completeExceptionally(
new IllegalStateException(
"stream publish rejected: " + confirmation.getCode()
)
);
}
} finally {
inFlight.release();
}
});
return result;
}
@Override
public void close() {
producer.close();
}
}
Why bounded in-flight matters:
- prevents unbounded heap growth;
- turns broker/disk pressure into application-level backpressure;
- makes shutdown drainable;
- gives clear metrics.
Metrics:
| Metric | Meaning |
|---|---|
stream.publisher.inflight | current outstanding messages |
stream.publisher.confirm.latency | publish-to-confirm duration |
stream.publisher.confirm.failed | negative confirms/errors |
stream.publisher.backpressure.timeout | producer cannot acquire capacity |
stream.publisher.message.bytes | payload size distribution |
11. Producer Deduplication
Stream producer deduplication helps when a message is persisted but the producer does not receive the confirm due to network failure.
Conceptually, deduplication depends on:
- producer name;
- monotonically increasing publishing ID;
- single producer instance/thread discipline for a given name.
Example pattern:
Producer producer = environment.producerBuilder()
.name("order-service-outbox-relay-0")
.stream("order.events.v1.stream")
.build();
Design rules:
| Rule | Why |
|---|---|
| producer name must be stable | broker tracks published IDs per producer identity |
| do not share same producer name across active instances | IDs can collide or move backward |
| preserve publish order for a named producer | dedup assumes monotonic sequence discipline |
| still keep event ID idempotency | dedup does not replace consumer correctness |
A good producer name includes a stable shard identity:
order-outbox-relay-shard-00
order-outbox-relay-shard-01
order-outbox-relay-shard-02
Bad producer names:
producer
order-service
${random-uuid-on-every-start}
Random names defeat dedup across restarts. Shared names across active instances corrupt the monotonic identity model.
12. Outbox Relay to Stream
A production service usually should not publish directly to a stream inside the same transaction where it writes business state. Use outbox when database state and stream event must be consistent.
Outbox relay publishing loop:
public final class StreamOutboxRelay {
private final OutboxRepository outboxRepository;
private final StreamMessageFactory messageFactory;
private final BoundedStreamPublisher publisher;
public void runBatch(int batchSize) {
List<OutboxRecord> records = outboxRepository.lockNextBatch(batchSize);
for (OutboxRecord record : records) {
Message message = messageFactory.orderEvent(record.toEnvelope());
publisher.publish(message)
.thenRun(() -> outboxRepository.markPublished(record.id()))
.exceptionally(ex -> {
outboxRepository.markPublishFailed(record.id(), ex.getMessage());
return null;
});
}
}
}
The exact transaction and locking details depend on your database model. The invariant is stable:
Mark outbox row published only after stream publish confirm.
If the process crashes after confirm but before marking published, it may republish later. That is why producer deduplication and event ID idempotency matter.
13. Consumer: Reading From a Stream
Minimal consumer:
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.OffsetSpecification;
Consumer consumer = environment.consumerBuilder()
.stream("order.events.v1.stream")
.offset(OffsetSpecification.first())
.messageHandler((context, message) -> {
byte[] body = message.getBodyAsBinary();
// process body
})
.build();
Close it during shutdown:
consumer.close();
By default, stream consumption starts at next if no offset is specified. This means the consumer waits for future records and does not replay existing records.
Be explicit. Never rely on default start position in production.
14. Offset Specifications
| Specification | Meaning | Use |
|---|---|---|
OffsetSpecification.first() | first retained message | full replay/rebuild |
OffsetSpecification.last() | last chunk | tail near latest data |
OffsetSpecification.next() | only future records | live-only consumer |
OffsetSpecification.offset(n) | exact offset | resume/reprocess from known point |
OffsetSpecification.timestamp(t) | first records near timestamp | incident replay/backfill |
Example:
Consumer consumer = environment.consumerBuilder()
.stream("order.events.v1.stream")
.offset(OffsetSpecification.timestamp(incidentStart.toEpochMilli()))
.messageHandler((context, message) -> {
// filter message timestamp if exact boundary matters
})
.build();
Timestamp attach can return records slightly before the requested timestamp because streams use chunks/segment boundaries. Application code should filter if exact event-time boundary matters.
15. Server-Side Automatic Offset Tracking
Server-side offset tracking stores consumer progress in RabbitMQ. To use it, give the consumer a stable name.
Consumer consumer = environment.consumerBuilder()
.stream("order.events.v1.stream")
.name("order-read-model")
.autoTrackingStrategy()
.messageCountBeforeStorage(10_000)
.flushInterval(Duration.ofSeconds(5))
.builder()
.messageHandler((context, message) -> {
processMessage(message);
})
.build();
Pros:
- simple;
- no separate offset database;
- good for consumers where duplicate replay is acceptable;
- integrates with stream-native restart behavior.
Cons:
- not atomic with your business database;
- storing by message count/time may checkpoint before some asynchronous processing is complete if used carelessly;
- consumer name collision creates confusing progress behavior.
Use automatic tracking when processing is synchronous and idempotent, or when occasional duplicate/loss-risk is acceptable for non-critical consumers.
For critical stateful projections, manual or external tracking is usually clearer.
16. Server-Side Manual Offset Tracking
Manual tracking gives application code control over when to store offsets.
Consumer consumer = environment.consumerBuilder()
.stream("order.events.v1.stream")
.name("order-read-model")
.manualTrackingStrategy()
.checkInterval(Duration.ofSeconds(10))
.builder()
.messageHandler((context, message) -> {
OrderEvent event = decode(message);
projectionRepository.applyIdempotently(event);
// Store only after business effect is durable.
context.storeOffset();
})
.build();
This is safer than automatic tracking when you want the handler to decide the checkpoint boundary.
However, it is still not atomic with a business database. If the process crashes after database commit but before storeOffset(), the message may be replayed. That is acceptable if the handler is idempotent.
Correctness rule:
business commit first
then offset store
duplicates possible
skips avoided
17. External Offset Tracking With a Database
When stream processing updates a database, store the checkpoint in the same database transaction.
Schema:
CREATE TABLE stream_checkpoint (
stream_name varchar(255) NOT NULL,
consumer_name varchar(255) NOT NULL,
partition_name varchar(255) NOT NULL DEFAULT '',
last_offset bigint NOT NULL,
updated_at timestamp NOT NULL DEFAULT current_timestamp,
PRIMARY KEY (stream_name, consumer_name, partition_name)
);
Consumer startup:
long lastOffset = checkpointRepository
.findLastOffset("order.events.v1.stream", "order-read-model")
.orElse(-1L);
OffsetSpecification start = lastOffset < 0
? OffsetSpecification.first()
: OffsetSpecification.offset(lastOffset + 1);
Consumer with external offset:
Consumer consumer = environment.consumerBuilder()
.stream("order.events.v1.stream")
.offset(start)
.noTrackingStrategy()
.messageHandler((context, message) -> {
OrderEvent event = decode(message);
long offset = context.offset();
projectionRepository.inTransaction(() -> {
projectionRepository.applyIdempotently(event);
checkpointRepository.store(
"order.events.v1.stream",
"order-read-model",
"",
offset
);
});
})
.build();
Why this is strong:
- projection state and checkpoint move together;
- restart position reflects durable business state;
- duplicate risk is still handled by idempotent apply;
- skip risk is minimized.
Caveat:
- if processing is asynchronous/concurrent, you cannot simply store the highest offset that completed. You need gap tracking.
18. External Offset Tracking With Subscription Listener
A more stream-native external tracking pattern uses a subscription listener.
Consumer consumer = environment.consumerBuilder()
.stream("order.events.v1.stream")
.noTrackingStrategy()
.subscriptionListener(subscriptionContext -> {
long lastOffset = checkpointRepository
.findLastOffset("order.events.v1.stream", "order-read-model")
.orElse(-1L);
subscriptionContext.offsetSpecification(
lastOffset < 0
? OffsetSpecification.first()
: OffsetSpecification.offset(lastOffset + 1)
);
})
.messageHandler((context, message) -> {
OrderEvent event = decode(message);
long offset = context.offset();
projectionRepository.inTransaction(() -> {
projectionRepository.applyIdempotently(event);
checkpointRepository.store(
"order.events.v1.stream",
"order-read-model",
"",
offset
);
});
})
.build();
This is useful when the consumer can re-subscribe after topology changes and you want the latest external checkpoint to control restart behavior.
19. Idempotent Projection Handler
Example projection table:
CREATE TABLE order_projection (
order_id varchar(64) PRIMARY KEY,
aggregate_version bigint NOT NULL,
status varchar(64) NOT NULL,
total_amount numeric(19, 2),
updated_at timestamp NOT NULL
);
CREATE TABLE processed_event (
consumer_name varchar(255) NOT NULL,
event_id varchar(128) NOT NULL,
processed_at timestamp NOT NULL DEFAULT current_timestamp,
PRIMARY KEY (consumer_name, event_id)
);
Handler logic:
public void applyIdempotently(OrderEvent event) {
if (processedEventRepository.exists("order-read-model", event.eventId())) {
return;
}
OrderProjection current = projectionRepository.find(event.orderId());
if (current != null && current.aggregateVersion() >= event.aggregateVersion()) {
processedEventRepository.insert("order-read-model", event.eventId());
return;
}
projectionRepository.upsert(
event.orderId(),
event.aggregateVersion(),
event.status(),
event.totalAmount(),
event.occurredAt()
);
processedEventRepository.insert("order-read-model", event.eventId());
}
This protects against:
- duplicate stream delivery;
- replay;
- producer retry ambiguity;
- old/stale events;
- consumer restart after DB commit before offset storage.
20. Consumer Flow Control
Stream consumers receive data in chunks and use credits. If handler work is asynchronous, you must ensure the client does not overrun your process.
Example using a flow strategy that requires processed():
Consumer consumer = environment.consumerBuilder()
.stream("order.events.v1.stream")
.flow()
.strategy(ConsumerFlowStrategy.creditWhenHalfMessagesProcessed())
.builder()
.messageHandler((context, message) -> {
workerPool.submit(() -> {
try {
processMessage(message);
} finally {
context.processed();
}
});
})
.build();
If you use a strategy that depends on processed(), call it exactly once for each message after actual processing is complete.
Wrong:
.messageHandler((context, message) -> {
workerPool.submit(() -> processMessage(message));
context.processed(); // too early
})
The wrong version tells the client there is capacity before the actual work finishes.
21. Concurrency and Offset Gaps
If a consumer processes messages concurrently, offsets can complete out of order.
If you store offset 11 while offset 10 is still incomplete, a crash can skip offset 10.
Solutions:
Option A — Serial Processing
Simplest and safest.
.messageHandler((context, message) -> {
processMessage(message);
storeOffset(context.offset());
})
Option B — Partition by Key and Process Serially Per Key
Allows concurrency across independent keys, serial order within key.
orderId O-1 -> lane 1
orderId O-2 -> lane 2
orderId O-3 -> lane 3
Still hard to checkpoint globally if completion is out of order.
Option C — Gap-Aware Offset Tracker
Maintain completed offset set and advance checkpoint only through contiguous completion.
public final class ContiguousOffsetTracker {
private long highestStored;
private final NavigableSet<Long> completed = new TreeSet<>();
public ContiguousOffsetTracker(long highestStored) {
this.highestStored = highestStored;
}
public synchronized OptionalLong markCompleted(long offset) {
completed.add(offset);
long candidate = highestStored + 1;
boolean advanced = false;
while (completed.remove(candidate)) {
highestStored = candidate;
candidate++;
advanced = true;
}
return advanced ? OptionalLong.of(highestStored) : OptionalLong.empty();
}
}
Use this only when you have strong tests. Many systems are better off serializing processing per consumer and scaling via partitions/super streams.
22. Single Active Consumer Preview
Single active consumer lets several consumer instances share a stream and name, while only one active instance receives messages at a time. This helps maintain order and continuity.
Consumer consumer = environment.consumerBuilder()
.stream("order.events.v1.stream")
.name("order-read-model")
.singleActiveConsumer()
.manualTrackingStrategy()
.builder()
.messageHandler((context, message) -> {
processMessage(message);
context.storeOffset();
})
.build();
Use when:
- order matters;
- exactly one active instance should process the stream;
- standby instances should take over on crash;
- throughput of one active consumer is enough.
If throughput is not enough, use Super Streams later rather than unsafe concurrency inside one ordered lane.
23. Error Handling in Stream Consumers
There is no queue-style per-message DLX flow for stream processing. A consumer must decide what to do with a bad record.
Failure classes:
| Failure | Example | Strategy |
|---|---|---|
| transient infrastructure | DB timeout | retry locally with budget |
| downstream unavailable | projection DB down | stop consumer or pause processing |
| invalid schema | unknown required field | park error and advance only if policy allows |
| poison business data | impossible state transition | park + alert + manual decision |
| unsafe side effect | external API duplicate risk | ledger + idempotency key |
Example parking table:
CREATE TABLE stream_poison_event (
stream_name varchar(255) NOT NULL,
consumer_name varchar(255) NOT NULL,
offset_value bigint NOT NULL,
event_id varchar(128),
reason text NOT NULL,
payload text NOT NULL,
created_at timestamp NOT NULL DEFAULT current_timestamp,
PRIMARY KEY (stream_name, consumer_name, offset_value)
);
Decision:
- If event must be fixed before progress, stop and alert.
- If event can be skipped after parking, persist parking record and store offset.
- If event is transient, retry with bounded policy before stopping.
Do not silently swallow stream consumer errors.
24. Retry Policy for Stream Consumer Handler
A bounded local retry pattern:
public void processWithRetry(Message message) {
int maxAttempts = 3;
Duration delay = Duration.ofMillis(200);
for (int attempt = 1; attempt <= maxAttempts; attempt++) {
try {
processMessage(message);
return;
} catch (TransientDependencyException ex) {
if (attempt == maxAttempts) {
throw ex;
}
sleep(delay.multipliedBy(attempt));
}
}
}
When local retry fails, prefer stopping the consumer over advancing offset if correctness requires that message.
For high-throughput consumers, do not block stream client threads for long sleeps. Dispatch to controlled worker pools or use pause/restart patterns at service level.
25. Graceful Shutdown
A stream service should shut down in this order:
- stop accepting new external work;
- stop/close consumers or pause intake;
- wait for in-flight handlers to complete within deadline;
- commit final offsets/checkpoints;
- drain producer in-flight confirms;
- close producers;
- close environment;
- emit shutdown metrics/logs.
Example skeleton:
public final class OrderProjectionStreamApp implements AutoCloseable {
private final Environment environment;
private final Consumer consumer;
private final ExecutorService workers;
@Override
public void close() {
consumer.close();
workers.shutdown();
try {
if (!workers.awaitTermination(30, TimeUnit.SECONDS)) {
workers.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
workers.shutdownNow();
} finally {
environment.close();
}
}
}
Shutdown rule:
A fast shutdown that corrupts offset/business state is not graceful. It is just quick data loss.
26. Observability
Minimum producer metrics:
| Metric | Tags |
|---|---|
rabbitmq.stream.publish.count | stream, producer, result |
rabbitmq.stream.publish.confirm.latency | stream, producer |
rabbitmq.stream.publish.inflight | stream, producer |
rabbitmq.stream.publish.bytes | stream, eventType |
rabbitmq.stream.publish.backpressure | stream, producer |
Minimum consumer metrics:
| Metric | Tags |
|---|---|
rabbitmq.stream.consume.count | stream, consumer, eventType, result |
rabbitmq.stream.consume.latency | stream, consumer, eventType |
rabbitmq.stream.consumer.offset | stream, consumer |
rabbitmq.stream.consumer.lag | stream, consumer |
rabbitmq.stream.consumer.error.count | stream, consumer, errorClass |
rabbitmq.stream.consumer.checkpoint.latency | stream, consumer |
Structured log fields:
{
"stream": "order.events.v1.stream",
"consumer": "order-read-model",
"offset": 123456,
"eventId": "01JZ8M8RZ9N9B5SHGBAJ7H7W2Q",
"eventType": "OrderSubmitted",
"aggregateId": "O-1001",
"aggregateVersion": 12,
"correlationId": "quote-req-77",
"result": "processed"
}
Logs without offset are weak for stream debugging.
27. Health Checks
Readiness should fail when:
- environment cannot connect;
- required stream does not exist;
- producer cannot publish/confirm within budget;
- consumer lag is beyond safe threshold for critical projections;
- downstream dependency for consumer is unavailable;
- disk/retention risk is known through broker metrics.
Liveness should be conservative. Do not restart a process just because downstream DB is temporarily slow if restart will amplify replay. Prefer readiness false plus controlled backoff.
Health categories:
| Check | Liveness | Readiness |
|---|---|---|
| JVM not deadlocked | yes | yes |
| stream broker reachable | maybe | yes |
| publish confirm probe | no | yes |
| consumer lag too high | no | yes/alert |
| downstream DB unavailable | no | yes |
| poison event encountered | no | depends on policy |
28. Testing Strategy
28.1 Unit Tests
Test:
- envelope-to-message mapping;
- message-to-envelope decoding;
- idempotent projection logic;
- checkpoint update logic;
- contiguous offset tracker;
- poison classification.
28.2 Integration Tests
Use real RabbitMQ with stream plugin.
Test:
- create stream;
- publish N records;
- consume from
first; - store offset;
- restart consumer;
- verify resume;
- replay from
firstinto fresh projection; - verify deterministic result.
28.3 Failure Tests
Test crash points:
| Crash point | Expected outcome |
|---|---|
| after DB commit before offset store | duplicate replay, idempotent result |
| after offset store before DB commit | should be impossible in design |
| after publish confirm before outbox mark | republish possible, dedup/idempotency handles |
| broker restart during consume | re-subscribe/resume |
| consumer down past lag SLO | alert |
28.4 Replay Test
A replay test proves your consumer is actually stream-safe.
build projection A from live run
build projection B from replay from first
compare domain-relevant state
If replay produces different business state, your handler depends on hidden time, side effects, or non-deterministic ordering.
29. Minimal End-to-End Example
This example is intentionally compact. It is not a full production framework, but it shows the control points.
public final class OrderEventStreamExample {
public static void main(String[] args) throws Exception {
Environment environment = Environment.builder()
.uri("rabbitmq-stream://guest:guest@localhost:5552/%2f")
.build();
String stream = "order.events.v1.stream";
environment.streamCreator()
.stream(stream)
.create();
Producer producer = environment.producerBuilder()
.name("order-demo-producer")
.stream(stream)
.build();
CountDownLatch confirmed = new CountDownLatch(1);
Message message = producer.messageBuilder()
.properties()
.messageId(UUID.randomUUID())
.correlationId(UUID.randomUUID())
.contentType("application/json")
.messageBuilder()
.applicationProperties()
.entry("eventType", "OrderSubmitted")
.entry("schemaVersion", 1)
.entry("aggregateId", "O-1001")
.entry("aggregateVersion", 1L)
.messageBuilder()
.addData("{\"orderId\":\"O-1001\",\"status\":\"SUBMITTED\"}"
.getBytes(StandardCharsets.UTF_8))
.build();
producer.send(message, status -> {
if (!status.isConfirmed()) {
System.err.println("Publish failed: " + status.getCode());
}
confirmed.countDown();
});
if (!confirmed.await(10, TimeUnit.SECONDS)) {
throw new TimeoutException("publish confirm timeout");
}
CountDownLatch consumed = new CountDownLatch(1);
Consumer consumer = environment.consumerBuilder()
.stream(stream)
.offset(OffsetSpecification.first())
.messageHandler((context, msg) -> {
System.out.println("offset=" + context.offset());
System.out.println(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8));
consumed.countDown();
})
.build();
consumed.await(10, TimeUnit.SECONDS);
consumer.close();
producer.close();
environment.close();
}
}
Do not copy this directly into production. Use it to verify API understanding.
30. Production Service Blueprint
Recommended classes:
StreamClientProperties
StreamEnvironmentFactory
StreamTopologyValidator
OrderEventMessageFactory
BoundedStreamPublisher
OrderOutboxRelay
OrderEventDecoder
OrderProjectionConsumer
StreamCheckpointRepository
StreamPoisonEventRepository
StreamMetrics
Class responsibilities:
| Class | Responsibility |
|---|---|
StreamEnvironmentFactory | create/close environment |
StreamTopologyValidator | fail fast on missing/wrong streams |
OrderEventMessageFactory | domain envelope to stream message |
BoundedStreamPublisher | in-flight, confirms, backpressure |
OrderOutboxRelay | database outbox to stream with confirm |
OrderProjectionConsumer | stream consume loop |
StreamCheckpointRepository | external offset storage |
StreamPoisonEventRepository | park invalid records |
StreamMetrics | counters/timers/gauges |
31. Common Java Client Mistakes
31.1 Creating Environment Too Often
Bad:
public void publish(OrderEvent event) {
Environment env = Environment.builder().build();
Producer p = env.producerBuilder().stream("order.events.v1.stream").build();
p.send(toMessage(event), status -> {});
env.close();
}
This destroys resource efficiency and reliability.
31.2 Ignoring Confirms
Bad:
producer.send(message, status -> {});
If you do not observe confirm status, you do not know whether publish succeeded.
31.3 Storing Offset Before Business Commit
Bad:
.messageHandler((context, message) -> {
context.storeOffset();
projectionRepository.apply(decode(message));
})
Crash after offset store means skipped data.
31.4 Consumer Name Collision
Bad:
.name("consumer")
Multiple logical consumers using the same name can corrupt offset expectations.
Use:
order-read-model
fraud-feature-builder
audit-exporter
31.5 Unsafe Replay Side Effects
Bad:
.messageHandler((context, message) -> {
emailClient.sendWelcomeEmail(decode(message).customerEmail());
context.storeOffset();
})
Replay will send again unless protected by side-effect ledger.
32. Decision Framework: Offset Strategy
| Situation | Recommended strategy |
|---|---|
| simple logging/metrics consumer | server-side automatic |
| synchronous idempotent handler | server-side manual |
| database projection | external DB checkpoint in same transaction |
| asynchronous handler with ordering requirement | serial processing or gap-aware tracker |
| super stream consumer group | server-side tracking or partition-aware external tracking |
| replay/debug tool | no persistent tracking or separate tool name |
| unsafe side-effect consumer | avoid replay; use side-effect ledger |
Do not choose automatic tracking just because it is convenient. Choose the strategy that matches failure semantics.
33. Operational Runbook
Symptom: Publish confirm latency increases
Likely causes:
- disk pressure;
- replica synchronization pressure;
- large messages;
- broker under load;
- network latency.
Actions:
- check broker disk and stream metrics;
- inspect message size distribution;
- check replica health;
- reduce producer rate or apply backpressure;
- scale/partition if sustained.
Symptom: Consumer lag grows
Likely causes:
- handler slow;
- downstream DB slow;
- consumer stopped;
- poison event loop;
- insufficient parallelism/partitioning.
Actions:
- check consumer process health;
- check handler latency by event type;
- check downstream dependency;
- inspect error logs with offset/event ID;
- decide scale, stop, park, or replay.
Symptom: Consumer restarts from unexpected offset
Likely causes:
- wrong consumer name;
- stored server-side offset overrides configured offset;
- external checkpoint stale;
- multiple instances sharing name unexpectedly;
- retention removed old offset.
Actions:
- identify consumer name;
- inspect stored offset/checkpoint;
- compare with oldest retained offset;
- confirm whether consumer uses server-side or external tracking;
- restart with explicit offset only after understanding override rules.
34. Practice Plan
Practice 1 — Hello Stream With Explicit Offset
- Create stream.
- Publish 100 records.
- Consume from
first. - Consume from
nextand verify no old records are delivered. - Consume from
offset(50).
Practice 2 — Confirm-Aware Producer
- Implement bounded publisher.
- Inject broker restart during publish.
- Verify no unbounded memory growth.
- Record failed/timeout confirms.
Practice 3 — Manual Offset Tracking
- Consume records.
- Commit projection.
- Store offset manually.
- Crash after DB commit but before offset store.
- Verify idempotent replay.
Practice 4 — External Checkpoint
- Store projection and checkpoint in one transaction.
- Restart consumer.
- Verify resume from checkpoint + 1.
Practice 5 — Replay Safety
- Build projection from live consume.
- Drop projection DB.
- Rebuild from stream
first. - Compare results.
35. Self-Correction Rubric
You can use the Stream Java Client well when you can explain and implement:
| Skill | Evidence |
|---|---|
| environment lifecycle | one long-lived env, clean shutdown |
| producer confirms | bounded in-flight, async confirms, metrics |
| dedup model | stable producer name and monotonic identity discipline |
| message mapping | properties/application properties/body mapping is explicit |
| offset specification | every consumer start position is intentional |
| server-side tracking | stable consumer names and checkpoint timing understood |
| external tracking | DB state and checkpoint can commit atomically |
| replay safety | handler is idempotent and side effects are guarded |
| flow control | no unbounded worker queue; processed() used correctly |
| failure testing | crash points produce duplicate-safe behavior, not skips |
36. Closing Model
The Stream Java Client gives you a powerful API, but the API is not the hard part.
The hard part is the correctness relationship:
append event safely
observe confirm
consume from explicit position
apply business effect idempotently
store offset only after durable effect
monitor lag and retention risk
If you preserve that relationship, RabbitMQ Streams become a strong building block for replayable Java systems.
If you break that relationship, streams can skip data, duplicate side effects, or silently drift from business truth.
References
- RabbitMQ Stream Java Client Reference: https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/
- RabbitMQ Documentation — Stream Plugin: https://www.rabbitmq.com/docs/stream
- RabbitMQ Documentation — Streams and Super Streams: https://www.rabbitmq.com/docs/streams
- RabbitMQ Tutorial — Java Stream Hello World: https://www.rabbitmq.com/tutorials/tutorial-one-java-stream
- RabbitMQ Tutorial — Java Stream Offset Tracking: https://www.rabbitmq.com/tutorials/tutorial-two-java-stream
- RabbitMQ Blog — Offset Tracking with RabbitMQ Streams: https://www.rabbitmq.com/blog/2021/09/13/rabbitmq-streams-offset-tracking
You just completed lesson 20 in deepen practice. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.
Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.