Deepen PracticeOrdered learning track

Java Stream Client: Environment, Producer, Consumer, Offset Tracking

Learn Java RabbitMQ, RabbitMQ Streams, Patterns, and Deployment In Action - Part 020

Production-oriented guide to the RabbitMQ Stream Java Client, covering Environment lifecycle, stream creation, producer confirms, message building, deduplication, consumers, offset specifications, server-side and external offset tracking, flow control, shutdown, testing, and service design.

19 min read3657 words
PrevNext
Lesson 2035 lesson track2029 Deepen Practice
#java#rabbitmq#rabbitmq-streams#stream-java-client+4 more

Part 020 — Java Stream Client: Environment, Producer, Consumer, Offset Tracking

The RabbitMQ Stream Java Client is the stream-native way to create streams, publish messages, consume messages, and manage offsets from Java.

If the AMQP Java client is centered around Connection, Channel, basicPublish, and basicConsume, the Stream Java Client is centered around:

  • Environment — long-lived entry point to a broker/cluster;
  • Producer — append messages to a stream;
  • Consumer — read messages from a stream;
  • OffsetSpecification — choose where consumption starts;
  • offset tracking — decide where the consumer can resume after restart.

This part is intentionally practical. We will build Java service skeletons that are safe enough to evolve into production code.


1. Kaufman Deconstruction

To use the Stream Java Client well, decompose the skill into ten capabilities:

  1. Environment lifecycle — create one long-lived environment per service process and close it cleanly.
  2. Stream topology management — create streams deliberately, avoid accidental test-style creation in production paths.
  3. Producer correctness — use confirms, bounded in-flight messages, durable message identity, and optional deduplication.
  4. Message contract mapping — encode domain envelope into Stream Message properties/application properties/body.
  5. Consumer start position — understand first, last, next, offset, and timestamp.
  6. Offset tracking — choose server-side automatic, server-side manual, or external offset storage.
  7. Processing correctness — commit business effect before storing offset.
  8. Flow control — avoid unbounded processing and call processed() when using flow strategies that require it.
  9. Recovery behavior — reason about producer reconnect, consumer re-subscription, and duplicate processing.
  10. Observability/testing — instrument publishes, confirmations, handler latency, offset lag, and replay correctness.

The practical standard:

A stream consumer is not correct because it "received" a message. It is correct only when business state and offset progress have a safe relationship.


2. Maven/Gradle Dependency Boundary

Use the official RabbitMQ Stream Java Client.

Maven pattern:

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>stream-client</artifactId>
  <version>${rabbitmq.stream.client.version}</version>
</dependency>

Gradle pattern:

dependencies {
    implementation("com.rabbitmq:stream-client:${rabbitmqStreamClientVersion}")
}

Do not hardcode an old version in learning material. In a production repository, pin the version in dependency management and upgrade using release notes plus integration tests.

Recommended project modules:

messaging-stream-contracts
messaging-stream-client
order-event-producer
order-projection-consumer
stream-test-support

Keep message contract code separated from producer/consumer lifecycle code.


3. Local Broker Setup for Stream Development

For local development, the stream plugin must be enabled.

Example:

docker run -it --rm --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -p 5552:5552 \
  rabbitmq:4-management

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

For a real environment, do not stop at "plugin enabled". You also need:

  • stream listener port exposed;
  • TLS plan;
  • advertised host/port correctness;
  • vhost/user/permission model;
  • retention policy;
  • disk sizing;
  • metrics and alerts;
  • replica placement.

The Java Stream Client connects using a rabbitmq-stream:// URI for plaintext and rabbitmq-stream+tls:// for TLS.


4. Environment: The Long-Lived Entry Point

The Environment is the primary entry point. Treat it like a service-level resource.

import com.rabbitmq.stream.Environment;

public final class StreamEnvironmentProvider implements AutoCloseable {
    private final Environment environment;

    public StreamEnvironmentProvider(String uri) {
        this.environment = Environment.builder()
            .uri(uri)
            .build();
    }

    public Environment environment() {
        return environment;
    }

    @Override
    public void close() {
        environment.close();
    }
}

Usage:

try (StreamEnvironmentProvider provider =
         new StreamEnvironmentProvider("rabbitmq-stream://app:secret@rabbitmq-0:5552/%2f")) {
    Environment env = provider.environment();
    // create producers and consumers
}

Production invariant:

Do not create an Environment per message, per request, or per handler invocation.

One service process usually creates one environment during startup and closes it during shutdown.


5. Cluster Connection Strategy

A single URI is fine for local development. In production, provide multiple entry points.

Environment environment = Environment.builder()
    .uris(List.of(
        "rabbitmq-stream://stream-user:secret@rabbitmq-0.messaging:5552/%2f",
        "rabbitmq-stream://stream-user:secret@rabbitmq-1.messaging:5552/%2f",
        "rabbitmq-stream://stream-user:secret@rabbitmq-2.messaging:5552/%2f"
    ))
    .build();

Design notes:

ConcernRule
initial connectionuse multiple broker URIs or stable entry point
leader/replica localitylet client use stream metadata where possible
Kubernetesensure advertised host/port are reachable from clients
TLScertificate SANs must match names returned by broker hints
load balanceruseful, but metadata/hints must still be understood

Connection problems in stream deployments often come from the broker returning hostnames that the client cannot resolve or certificates that do not match advertised names.


6. Stream Creation: Application-Owned or Platform-Owned?

The Stream Java Client can create streams.

environment.streamCreator()
    .stream("order.events.v1.stream")
    .create();

But production topology should not be accidentally created by arbitrary application startup unless that is an explicit platform decision.

ModelFitRisk
app creates stream on startupsmall product teams, local/dev, controlled ownershipaccidental drift, hidden retention defaults
platform IaC creates streamregulated/prod environmentsslower change cycle but safer governance
CI/CD topology migrationmature platform teamsrequires strong review/test process

Recommended approach:

  • local/dev: app can create missing streams;
  • staging/prod: topology managed by IaC/operator/migration with explicit retention and ownership;
  • application startup validates expected stream exists and settings are acceptable.

Pseudo-validation:

public final class StreamTopologyValidator {
    private final Environment environment;

    public StreamTopologyValidator(Environment environment) {
        this.environment = environment;
    }

    public void validateRequiredStreams() {
        // Use management API, Stream API, or platform-provided metadata checks.
        // Fail fast if required stream is missing or retention does not match contract.
    }
}

Do not let retention default by accident.


7. Producer: The Append Path

A producer appends messages to one stream.

import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Producer;

Producer producer = environment.producerBuilder()
    .stream("order.events.v1.stream")
    .build();

Close it during shutdown:

producer.close();

Production invariant:

A producer is not just an API object. It is a reliability boundary around confirms, retry, deduplication, in-flight limits, and observability.


8. Building Stream Messages

RabbitMQ Stream messages use a message abstraction with properties, application properties, annotations, and body.

Example mapping from domain envelope:

import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.Producer;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

public final class StreamMessageFactory {
    private final Producer producer;
    private final JsonCodec jsonCodec;

    public StreamMessageFactory(Producer producer, JsonCodec jsonCodec) {
        this.producer = producer;
        this.jsonCodec = jsonCodec;
    }

    public Message orderEvent(OrderEventEnvelope envelope) {
        byte[] body = jsonCodec.toJsonBytes(envelope.payload());

        return producer.messageBuilder()
            .properties()
                .messageId(envelope.eventId())
                .correlationId(envelope.correlationId())
                .contentType("application/json")
                .messageBuilder()
            .applicationProperties()
                .entry("eventType", envelope.eventType())
                .entry("schemaVersion", envelope.schemaVersion())
                .entry("aggregateId", envelope.aggregateId())
                .entry("aggregateVersion", envelope.aggregateVersion())
                .entry("producer", envelope.producer())
                .entry("partitionKey", envelope.partitionKey())
                .messageBuilder()
            .addData(body)
            .build();
    }
}

Keep the mapping deterministic. Avoid placing critical business state only in headers/properties if consumers will persist/replay using only body.

Recommended mapping:

Domain fieldStream field
event IDproperties.messageId and envelope body
correlation IDproperties.correlationId and envelope body
content typeproperties.contentType
event typeapplication property + envelope body
schema versionapplication property + envelope body
aggregate IDapplication property + envelope body
partition keyapplication property + envelope body
payloadbody

Duplicating key routing/trace fields in both properties and body is acceptable when it improves operations and consumer usability.


9. Producer Confirms

Stream publishing is asynchronous. A confirmation handler is invoked when the broker confirms receipt/persistence outcome.

Minimal example:

CountDownLatch confirms = new CountDownLatch(1);
AtomicReference<Throwable> failure = new AtomicReference<>();

producer.send(message, confirmationStatus -> {
    if (!confirmationStatus.isConfirmed()) {
        failure.set(new IllegalStateException(
            "Message was not confirmed: " + confirmationStatus.getCode()
        ));
    }
    confirms.countDown();
});

if (!confirms.await(5, TimeUnit.SECONDS)) {
    throw new TimeoutException("Timed out waiting for stream publish confirm");
}

if (failure.get() != null) {
    throw failure.get();
}

That is fine for tests, but not for high-throughput production because it blocks per message.

Production producer should:

  • limit in-flight messages;
  • handle confirms asynchronously;
  • avoid heavy work in confirm callback;
  • record publish latency;
  • retry only when safe;
  • deduplicate when ambiguity exists;
  • drain on shutdown.

10. Bounded In-Flight Producer

A simple bounded producer wrapper:

import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.Producer;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public final class BoundedStreamPublisher implements AutoCloseable {
    private final Producer producer;
    private final Semaphore inFlight;
    private final Duration acquireTimeout;

    public BoundedStreamPublisher(
        Producer producer,
        int maxInFlight,
        Duration acquireTimeout
    ) {
        this.producer = producer;
        this.inFlight = new Semaphore(maxInFlight);
        this.acquireTimeout = acquireTimeout;
    }

    public CompletableFuture<Void> publish(Message message) {
        boolean acquired;
        try {
            acquired = inFlight.tryAcquire(
                acquireTimeout.toMillis(),
                TimeUnit.MILLISECONDS
            );
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return CompletableFuture.failedFuture(e);
        }

        if (!acquired) {
            return CompletableFuture.failedFuture(
                new IllegalStateException("stream publisher backpressure timeout")
            );
        }

        CompletableFuture<Void> result = new CompletableFuture<>();

        producer.send(message, confirmation -> {
            try {
                if (confirmation.isConfirmed()) {
                    result.complete(null);
                } else {
                    result.completeExceptionally(
                        new IllegalStateException(
                            "stream publish rejected: " + confirmation.getCode()
                        )
                    );
                }
            } finally {
                inFlight.release();
            }
        });

        return result;
    }

    @Override
    public void close() {
        producer.close();
    }
}

Why bounded in-flight matters:

  • prevents unbounded heap growth;
  • turns broker/disk pressure into application-level backpressure;
  • makes shutdown drainable;
  • gives clear metrics.

Metrics:

MetricMeaning
stream.publisher.inflightcurrent outstanding messages
stream.publisher.confirm.latencypublish-to-confirm duration
stream.publisher.confirm.failednegative confirms/errors
stream.publisher.backpressure.timeoutproducer cannot acquire capacity
stream.publisher.message.bytespayload size distribution

11. Producer Deduplication

Stream producer deduplication helps when a message is persisted but the producer does not receive the confirm due to network failure.

Conceptually, deduplication depends on:

  • producer name;
  • monotonically increasing publishing ID;
  • single producer instance/thread discipline for a given name.

Example pattern:

Producer producer = environment.producerBuilder()
    .name("order-service-outbox-relay-0")
    .stream("order.events.v1.stream")
    .build();

Design rules:

RuleWhy
producer name must be stablebroker tracks published IDs per producer identity
do not share same producer name across active instancesIDs can collide or move backward
preserve publish order for a named producerdedup assumes monotonic sequence discipline
still keep event ID idempotencydedup does not replace consumer correctness

A good producer name includes a stable shard identity:

order-outbox-relay-shard-00
order-outbox-relay-shard-01
order-outbox-relay-shard-02

Bad producer names:

producer
order-service
${random-uuid-on-every-start}

Random names defeat dedup across restarts. Shared names across active instances corrupt the monotonic identity model.


12. Outbox Relay to Stream

A production service usually should not publish directly to a stream inside the same transaction where it writes business state. Use outbox when database state and stream event must be consistent.

Outbox relay publishing loop:

public final class StreamOutboxRelay {
    private final OutboxRepository outboxRepository;
    private final StreamMessageFactory messageFactory;
    private final BoundedStreamPublisher publisher;

    public void runBatch(int batchSize) {
        List<OutboxRecord> records = outboxRepository.lockNextBatch(batchSize);

        for (OutboxRecord record : records) {
            Message message = messageFactory.orderEvent(record.toEnvelope());

            publisher.publish(message)
                .thenRun(() -> outboxRepository.markPublished(record.id()))
                .exceptionally(ex -> {
                    outboxRepository.markPublishFailed(record.id(), ex.getMessage());
                    return null;
                });
        }
    }
}

The exact transaction and locking details depend on your database model. The invariant is stable:

Mark outbox row published only after stream publish confirm.

If the process crashes after confirm but before marking published, it may republish later. That is why producer deduplication and event ID idempotency matter.


13. Consumer: Reading From a Stream

Minimal consumer:

import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.OffsetSpecification;

Consumer consumer = environment.consumerBuilder()
    .stream("order.events.v1.stream")
    .offset(OffsetSpecification.first())
    .messageHandler((context, message) -> {
        byte[] body = message.getBodyAsBinary();
        // process body
    })
    .build();

Close it during shutdown:

consumer.close();

By default, stream consumption starts at next if no offset is specified. This means the consumer waits for future records and does not replay existing records.

Be explicit. Never rely on default start position in production.


14. Offset Specifications

SpecificationMeaningUse
OffsetSpecification.first()first retained messagefull replay/rebuild
OffsetSpecification.last()last chunktail near latest data
OffsetSpecification.next()only future recordslive-only consumer
OffsetSpecification.offset(n)exact offsetresume/reprocess from known point
OffsetSpecification.timestamp(t)first records near timestampincident replay/backfill

Example:

Consumer consumer = environment.consumerBuilder()
    .stream("order.events.v1.stream")
    .offset(OffsetSpecification.timestamp(incidentStart.toEpochMilli()))
    .messageHandler((context, message) -> {
        // filter message timestamp if exact boundary matters
    })
    .build();

Timestamp attach can return records slightly before the requested timestamp because streams use chunks/segment boundaries. Application code should filter if exact event-time boundary matters.


15. Server-Side Automatic Offset Tracking

Server-side offset tracking stores consumer progress in RabbitMQ. To use it, give the consumer a stable name.

Consumer consumer = environment.consumerBuilder()
    .stream("order.events.v1.stream")
    .name("order-read-model")
    .autoTrackingStrategy()
        .messageCountBeforeStorage(10_000)
        .flushInterval(Duration.ofSeconds(5))
        .builder()
    .messageHandler((context, message) -> {
        processMessage(message);
    })
    .build();

Pros:

  • simple;
  • no separate offset database;
  • good for consumers where duplicate replay is acceptable;
  • integrates with stream-native restart behavior.

Cons:

  • not atomic with your business database;
  • storing by message count/time may checkpoint before some asynchronous processing is complete if used carelessly;
  • consumer name collision creates confusing progress behavior.

Use automatic tracking when processing is synchronous and idempotent, or when occasional duplicate/loss-risk is acceptable for non-critical consumers.

For critical stateful projections, manual or external tracking is usually clearer.


16. Server-Side Manual Offset Tracking

Manual tracking gives application code control over when to store offsets.

Consumer consumer = environment.consumerBuilder()
    .stream("order.events.v1.stream")
    .name("order-read-model")
    .manualTrackingStrategy()
        .checkInterval(Duration.ofSeconds(10))
        .builder()
    .messageHandler((context, message) -> {
        OrderEvent event = decode(message);
        projectionRepository.applyIdempotently(event);

        // Store only after business effect is durable.
        context.storeOffset();
    })
    .build();

This is safer than automatic tracking when you want the handler to decide the checkpoint boundary.

However, it is still not atomic with a business database. If the process crashes after database commit but before storeOffset(), the message may be replayed. That is acceptable if the handler is idempotent.

Correctness rule:

business commit first
then offset store
duplicates possible
skips avoided

17. External Offset Tracking With a Database

When stream processing updates a database, store the checkpoint in the same database transaction.

Schema:

CREATE TABLE stream_checkpoint (
    stream_name     varchar(255) NOT NULL,
    consumer_name   varchar(255) NOT NULL,
    partition_name  varchar(255) NOT NULL DEFAULT '',
    last_offset     bigint NOT NULL,
    updated_at      timestamp NOT NULL DEFAULT current_timestamp,
    PRIMARY KEY (stream_name, consumer_name, partition_name)
);

Consumer startup:

long lastOffset = checkpointRepository
    .findLastOffset("order.events.v1.stream", "order-read-model")
    .orElse(-1L);

OffsetSpecification start = lastOffset < 0
    ? OffsetSpecification.first()
    : OffsetSpecification.offset(lastOffset + 1);

Consumer with external offset:

Consumer consumer = environment.consumerBuilder()
    .stream("order.events.v1.stream")
    .offset(start)
    .noTrackingStrategy()
    .messageHandler((context, message) -> {
        OrderEvent event = decode(message);
        long offset = context.offset();

        projectionRepository.inTransaction(() -> {
            projectionRepository.applyIdempotently(event);
            checkpointRepository.store(
                "order.events.v1.stream",
                "order-read-model",
                "",
                offset
            );
        });
    })
    .build();

Why this is strong:

  • projection state and checkpoint move together;
  • restart position reflects durable business state;
  • duplicate risk is still handled by idempotent apply;
  • skip risk is minimized.

Caveat:

  • if processing is asynchronous/concurrent, you cannot simply store the highest offset that completed. You need gap tracking.

18. External Offset Tracking With Subscription Listener

A more stream-native external tracking pattern uses a subscription listener.

Consumer consumer = environment.consumerBuilder()
    .stream("order.events.v1.stream")
    .noTrackingStrategy()
    .subscriptionListener(subscriptionContext -> {
        long lastOffset = checkpointRepository
            .findLastOffset("order.events.v1.stream", "order-read-model")
            .orElse(-1L);

        subscriptionContext.offsetSpecification(
            lastOffset < 0
                ? OffsetSpecification.first()
                : OffsetSpecification.offset(lastOffset + 1)
        );
    })
    .messageHandler((context, message) -> {
        OrderEvent event = decode(message);
        long offset = context.offset();

        projectionRepository.inTransaction(() -> {
            projectionRepository.applyIdempotently(event);
            checkpointRepository.store(
                "order.events.v1.stream",
                "order-read-model",
                "",
                offset
            );
        });
    })
    .build();

This is useful when the consumer can re-subscribe after topology changes and you want the latest external checkpoint to control restart behavior.


19. Idempotent Projection Handler

Example projection table:

CREATE TABLE order_projection (
    order_id           varchar(64) PRIMARY KEY,
    aggregate_version  bigint NOT NULL,
    status             varchar(64) NOT NULL,
    total_amount       numeric(19, 2),
    updated_at         timestamp NOT NULL
);

CREATE TABLE processed_event (
    consumer_name varchar(255) NOT NULL,
    event_id      varchar(128) NOT NULL,
    processed_at  timestamp NOT NULL DEFAULT current_timestamp,
    PRIMARY KEY (consumer_name, event_id)
);

Handler logic:

public void applyIdempotently(OrderEvent event) {
    if (processedEventRepository.exists("order-read-model", event.eventId())) {
        return;
    }

    OrderProjection current = projectionRepository.find(event.orderId());

    if (current != null && current.aggregateVersion() >= event.aggregateVersion()) {
        processedEventRepository.insert("order-read-model", event.eventId());
        return;
    }

    projectionRepository.upsert(
        event.orderId(),
        event.aggregateVersion(),
        event.status(),
        event.totalAmount(),
        event.occurredAt()
    );

    processedEventRepository.insert("order-read-model", event.eventId());
}

This protects against:

  • duplicate stream delivery;
  • replay;
  • producer retry ambiguity;
  • old/stale events;
  • consumer restart after DB commit before offset storage.

20. Consumer Flow Control

Stream consumers receive data in chunks and use credits. If handler work is asynchronous, you must ensure the client does not overrun your process.

Example using a flow strategy that requires processed():

Consumer consumer = environment.consumerBuilder()
    .stream("order.events.v1.stream")
    .flow()
        .strategy(ConsumerFlowStrategy.creditWhenHalfMessagesProcessed())
        .builder()
    .messageHandler((context, message) -> {
        workerPool.submit(() -> {
            try {
                processMessage(message);
            } finally {
                context.processed();
            }
        });
    })
    .build();

If you use a strategy that depends on processed(), call it exactly once for each message after actual processing is complete.

Wrong:

.messageHandler((context, message) -> {
    workerPool.submit(() -> processMessage(message));
    context.processed(); // too early
})

The wrong version tells the client there is capacity before the actual work finishes.


21. Concurrency and Offset Gaps

If a consumer processes messages concurrently, offsets can complete out of order.

If you store offset 11 while offset 10 is still incomplete, a crash can skip offset 10.

Solutions:

Option A — Serial Processing

Simplest and safest.

.messageHandler((context, message) -> {
    processMessage(message);
    storeOffset(context.offset());
})

Option B — Partition by Key and Process Serially Per Key

Allows concurrency across independent keys, serial order within key.

orderId O-1 -> lane 1
orderId O-2 -> lane 2
orderId O-3 -> lane 3

Still hard to checkpoint globally if completion is out of order.

Option C — Gap-Aware Offset Tracker

Maintain completed offset set and advance checkpoint only through contiguous completion.

public final class ContiguousOffsetTracker {
    private long highestStored;
    private final NavigableSet<Long> completed = new TreeSet<>();

    public ContiguousOffsetTracker(long highestStored) {
        this.highestStored = highestStored;
    }

    public synchronized OptionalLong markCompleted(long offset) {
        completed.add(offset);

        long candidate = highestStored + 1;
        boolean advanced = false;

        while (completed.remove(candidate)) {
            highestStored = candidate;
            candidate++;
            advanced = true;
        }

        return advanced ? OptionalLong.of(highestStored) : OptionalLong.empty();
    }
}

Use this only when you have strong tests. Many systems are better off serializing processing per consumer and scaling via partitions/super streams.


22. Single Active Consumer Preview

Single active consumer lets several consumer instances share a stream and name, while only one active instance receives messages at a time. This helps maintain order and continuity.

Consumer consumer = environment.consumerBuilder()
    .stream("order.events.v1.stream")
    .name("order-read-model")
    .singleActiveConsumer()
    .manualTrackingStrategy()
        .builder()
    .messageHandler((context, message) -> {
        processMessage(message);
        context.storeOffset();
    })
    .build();

Use when:

  • order matters;
  • exactly one active instance should process the stream;
  • standby instances should take over on crash;
  • throughput of one active consumer is enough.

If throughput is not enough, use Super Streams later rather than unsafe concurrency inside one ordered lane.


23. Error Handling in Stream Consumers

There is no queue-style per-message DLX flow for stream processing. A consumer must decide what to do with a bad record.

Failure classes:

FailureExampleStrategy
transient infrastructureDB timeoutretry locally with budget
downstream unavailableprojection DB downstop consumer or pause processing
invalid schemaunknown required fieldpark error and advance only if policy allows
poison business dataimpossible state transitionpark + alert + manual decision
unsafe side effectexternal API duplicate riskledger + idempotency key

Example parking table:

CREATE TABLE stream_poison_event (
    stream_name      varchar(255) NOT NULL,
    consumer_name    varchar(255) NOT NULL,
    offset_value     bigint NOT NULL,
    event_id         varchar(128),
    reason           text NOT NULL,
    payload          text NOT NULL,
    created_at       timestamp NOT NULL DEFAULT current_timestamp,
    PRIMARY KEY (stream_name, consumer_name, offset_value)
);

Decision:

  • If event must be fixed before progress, stop and alert.
  • If event can be skipped after parking, persist parking record and store offset.
  • If event is transient, retry with bounded policy before stopping.

Do not silently swallow stream consumer errors.


24. Retry Policy for Stream Consumer Handler

A bounded local retry pattern:

public void processWithRetry(Message message) {
    int maxAttempts = 3;
    Duration delay = Duration.ofMillis(200);

    for (int attempt = 1; attempt <= maxAttempts; attempt++) {
        try {
            processMessage(message);
            return;
        } catch (TransientDependencyException ex) {
            if (attempt == maxAttempts) {
                throw ex;
            }
            sleep(delay.multipliedBy(attempt));
        }
    }
}

When local retry fails, prefer stopping the consumer over advancing offset if correctness requires that message.

For high-throughput consumers, do not block stream client threads for long sleeps. Dispatch to controlled worker pools or use pause/restart patterns at service level.


25. Graceful Shutdown

A stream service should shut down in this order:

  1. stop accepting new external work;
  2. stop/close consumers or pause intake;
  3. wait for in-flight handlers to complete within deadline;
  4. commit final offsets/checkpoints;
  5. drain producer in-flight confirms;
  6. close producers;
  7. close environment;
  8. emit shutdown metrics/logs.

Example skeleton:

public final class OrderProjectionStreamApp implements AutoCloseable {
    private final Environment environment;
    private final Consumer consumer;
    private final ExecutorService workers;

    @Override
    public void close() {
        consumer.close();
        workers.shutdown();
        try {
            if (!workers.awaitTermination(30, TimeUnit.SECONDS)) {
                workers.shutdownNow();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            workers.shutdownNow();
        } finally {
            environment.close();
        }
    }
}

Shutdown rule:

A fast shutdown that corrupts offset/business state is not graceful. It is just quick data loss.


26. Observability

Minimum producer metrics:

MetricTags
rabbitmq.stream.publish.countstream, producer, result
rabbitmq.stream.publish.confirm.latencystream, producer
rabbitmq.stream.publish.inflightstream, producer
rabbitmq.stream.publish.bytesstream, eventType
rabbitmq.stream.publish.backpressurestream, producer

Minimum consumer metrics:

MetricTags
rabbitmq.stream.consume.countstream, consumer, eventType, result
rabbitmq.stream.consume.latencystream, consumer, eventType
rabbitmq.stream.consumer.offsetstream, consumer
rabbitmq.stream.consumer.lagstream, consumer
rabbitmq.stream.consumer.error.countstream, consumer, errorClass
rabbitmq.stream.consumer.checkpoint.latencystream, consumer

Structured log fields:

{
  "stream": "order.events.v1.stream",
  "consumer": "order-read-model",
  "offset": 123456,
  "eventId": "01JZ8M8RZ9N9B5SHGBAJ7H7W2Q",
  "eventType": "OrderSubmitted",
  "aggregateId": "O-1001",
  "aggregateVersion": 12,
  "correlationId": "quote-req-77",
  "result": "processed"
}

Logs without offset are weak for stream debugging.


27. Health Checks

Readiness should fail when:

  • environment cannot connect;
  • required stream does not exist;
  • producer cannot publish/confirm within budget;
  • consumer lag is beyond safe threshold for critical projections;
  • downstream dependency for consumer is unavailable;
  • disk/retention risk is known through broker metrics.

Liveness should be conservative. Do not restart a process just because downstream DB is temporarily slow if restart will amplify replay. Prefer readiness false plus controlled backoff.

Health categories:

CheckLivenessReadiness
JVM not deadlockedyesyes
stream broker reachablemaybeyes
publish confirm probenoyes
consumer lag too highnoyes/alert
downstream DB unavailablenoyes
poison event encounterednodepends on policy

28. Testing Strategy

28.1 Unit Tests

Test:

  • envelope-to-message mapping;
  • message-to-envelope decoding;
  • idempotent projection logic;
  • checkpoint update logic;
  • contiguous offset tracker;
  • poison classification.

28.2 Integration Tests

Use real RabbitMQ with stream plugin.

Test:

  1. create stream;
  2. publish N records;
  3. consume from first;
  4. store offset;
  5. restart consumer;
  6. verify resume;
  7. replay from first into fresh projection;
  8. verify deterministic result.

28.3 Failure Tests

Test crash points:

Crash pointExpected outcome
after DB commit before offset storeduplicate replay, idempotent result
after offset store before DB commitshould be impossible in design
after publish confirm before outbox markrepublish possible, dedup/idempotency handles
broker restart during consumere-subscribe/resume
consumer down past lag SLOalert

28.4 Replay Test

A replay test proves your consumer is actually stream-safe.

build projection A from live run
build projection B from replay from first
compare domain-relevant state

If replay produces different business state, your handler depends on hidden time, side effects, or non-deterministic ordering.


29. Minimal End-to-End Example

This example is intentionally compact. It is not a full production framework, but it shows the control points.

public final class OrderEventStreamExample {
    public static void main(String[] args) throws Exception {
        Environment environment = Environment.builder()
            .uri("rabbitmq-stream://guest:guest@localhost:5552/%2f")
            .build();

        String stream = "order.events.v1.stream";

        environment.streamCreator()
            .stream(stream)
            .create();

        Producer producer = environment.producerBuilder()
            .name("order-demo-producer")
            .stream(stream)
            .build();

        CountDownLatch confirmed = new CountDownLatch(1);

        Message message = producer.messageBuilder()
            .properties()
                .messageId(UUID.randomUUID())
                .correlationId(UUID.randomUUID())
                .contentType("application/json")
                .messageBuilder()
            .applicationProperties()
                .entry("eventType", "OrderSubmitted")
                .entry("schemaVersion", 1)
                .entry("aggregateId", "O-1001")
                .entry("aggregateVersion", 1L)
                .messageBuilder()
            .addData("{\"orderId\":\"O-1001\",\"status\":\"SUBMITTED\"}"
                .getBytes(StandardCharsets.UTF_8))
            .build();

        producer.send(message, status -> {
            if (!status.isConfirmed()) {
                System.err.println("Publish failed: " + status.getCode());
            }
            confirmed.countDown();
        });

        if (!confirmed.await(10, TimeUnit.SECONDS)) {
            throw new TimeoutException("publish confirm timeout");
        }

        CountDownLatch consumed = new CountDownLatch(1);

        Consumer consumer = environment.consumerBuilder()
            .stream(stream)
            .offset(OffsetSpecification.first())
            .messageHandler((context, msg) -> {
                System.out.println("offset=" + context.offset());
                System.out.println(new String(msg.getBodyAsBinary(), StandardCharsets.UTF_8));
                consumed.countDown();
            })
            .build();

        consumed.await(10, TimeUnit.SECONDS);

        consumer.close();
        producer.close();
        environment.close();
    }
}

Do not copy this directly into production. Use it to verify API understanding.


30. Production Service Blueprint

Recommended classes:

StreamClientProperties
StreamEnvironmentFactory
StreamTopologyValidator
OrderEventMessageFactory
BoundedStreamPublisher
OrderOutboxRelay
OrderEventDecoder
OrderProjectionConsumer
StreamCheckpointRepository
StreamPoisonEventRepository
StreamMetrics

Class responsibilities:

ClassResponsibility
StreamEnvironmentFactorycreate/close environment
StreamTopologyValidatorfail fast on missing/wrong streams
OrderEventMessageFactorydomain envelope to stream message
BoundedStreamPublisherin-flight, confirms, backpressure
OrderOutboxRelaydatabase outbox to stream with confirm
OrderProjectionConsumerstream consume loop
StreamCheckpointRepositoryexternal offset storage
StreamPoisonEventRepositorypark invalid records
StreamMetricscounters/timers/gauges

31. Common Java Client Mistakes

31.1 Creating Environment Too Often

Bad:

public void publish(OrderEvent event) {
    Environment env = Environment.builder().build();
    Producer p = env.producerBuilder().stream("order.events.v1.stream").build();
    p.send(toMessage(event), status -> {});
    env.close();
}

This destroys resource efficiency and reliability.

31.2 Ignoring Confirms

Bad:

producer.send(message, status -> {});

If you do not observe confirm status, you do not know whether publish succeeded.

31.3 Storing Offset Before Business Commit

Bad:

.messageHandler((context, message) -> {
    context.storeOffset();
    projectionRepository.apply(decode(message));
})

Crash after offset store means skipped data.

31.4 Consumer Name Collision

Bad:

.name("consumer")

Multiple logical consumers using the same name can corrupt offset expectations.

Use:

order-read-model
fraud-feature-builder
audit-exporter

31.5 Unsafe Replay Side Effects

Bad:

.messageHandler((context, message) -> {
    emailClient.sendWelcomeEmail(decode(message).customerEmail());
    context.storeOffset();
})

Replay will send again unless protected by side-effect ledger.


32. Decision Framework: Offset Strategy

SituationRecommended strategy
simple logging/metrics consumerserver-side automatic
synchronous idempotent handlerserver-side manual
database projectionexternal DB checkpoint in same transaction
asynchronous handler with ordering requirementserial processing or gap-aware tracker
super stream consumer groupserver-side tracking or partition-aware external tracking
replay/debug toolno persistent tracking or separate tool name
unsafe side-effect consumeravoid replay; use side-effect ledger

Do not choose automatic tracking just because it is convenient. Choose the strategy that matches failure semantics.


33. Operational Runbook

Symptom: Publish confirm latency increases

Likely causes:

  • disk pressure;
  • replica synchronization pressure;
  • large messages;
  • broker under load;
  • network latency.

Actions:

  1. check broker disk and stream metrics;
  2. inspect message size distribution;
  3. check replica health;
  4. reduce producer rate or apply backpressure;
  5. scale/partition if sustained.

Symptom: Consumer lag grows

Likely causes:

  • handler slow;
  • downstream DB slow;
  • consumer stopped;
  • poison event loop;
  • insufficient parallelism/partitioning.

Actions:

  1. check consumer process health;
  2. check handler latency by event type;
  3. check downstream dependency;
  4. inspect error logs with offset/event ID;
  5. decide scale, stop, park, or replay.

Symptom: Consumer restarts from unexpected offset

Likely causes:

  • wrong consumer name;
  • stored server-side offset overrides configured offset;
  • external checkpoint stale;
  • multiple instances sharing name unexpectedly;
  • retention removed old offset.

Actions:

  1. identify consumer name;
  2. inspect stored offset/checkpoint;
  3. compare with oldest retained offset;
  4. confirm whether consumer uses server-side or external tracking;
  5. restart with explicit offset only after understanding override rules.

34. Practice Plan

Practice 1 — Hello Stream With Explicit Offset

  • Create stream.
  • Publish 100 records.
  • Consume from first.
  • Consume from next and verify no old records are delivered.
  • Consume from offset(50).

Practice 2 — Confirm-Aware Producer

  • Implement bounded publisher.
  • Inject broker restart during publish.
  • Verify no unbounded memory growth.
  • Record failed/timeout confirms.

Practice 3 — Manual Offset Tracking

  • Consume records.
  • Commit projection.
  • Store offset manually.
  • Crash after DB commit but before offset store.
  • Verify idempotent replay.

Practice 4 — External Checkpoint

  • Store projection and checkpoint in one transaction.
  • Restart consumer.
  • Verify resume from checkpoint + 1.

Practice 5 — Replay Safety

  • Build projection from live consume.
  • Drop projection DB.
  • Rebuild from stream first.
  • Compare results.

35. Self-Correction Rubric

You can use the Stream Java Client well when you can explain and implement:

SkillEvidence
environment lifecycleone long-lived env, clean shutdown
producer confirmsbounded in-flight, async confirms, metrics
dedup modelstable producer name and monotonic identity discipline
message mappingproperties/application properties/body mapping is explicit
offset specificationevery consumer start position is intentional
server-side trackingstable consumer names and checkpoint timing understood
external trackingDB state and checkpoint can commit atomically
replay safetyhandler is idempotent and side effects are guarded
flow controlno unbounded worker queue; processed() used correctly
failure testingcrash points produce duplicate-safe behavior, not skips

36. Closing Model

The Stream Java Client gives you a powerful API, but the API is not the hard part.

The hard part is the correctness relationship:

append event safely
observe confirm
consume from explicit position
apply business effect idempotently
store offset only after durable effect
monitor lag and retention risk

If you preserve that relationship, RabbitMQ Streams become a strong building block for replayable Java systems.

If you break that relationship, streams can skip data, duplicate side effects, or silently drift from business truth.


References

Lesson Recap

You just completed lesson 20 in deepen practice. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.

Continue The Track

Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.