Deepen PracticeOrdered learning track

Kafka Producer Implementation in Java and Spring Kafka

Learn Java Microservices Communication - Part 073

Production-grade Kafka producer implementation in Java and Spring Kafka: producer architecture, KafkaTemplate, serialization, keys, headers, idempotence, acks, retries, batching, outbox integration, transactions, observability, testing, and production policy.

11 min read2115 words
PrevNext
Lesson 7396 lesson track53–79 Deepen Practice
#java#microservices#communication#kafka+6 more

Part 073 — Kafka Producer Implementation in Java and Spring Kafka

A Kafka producer is not just this:

kafkaTemplate.send("case-events", event);

That line hides many production decisions:

  • which topic?
  • which key?
  • which schema?
  • which headers?
  • which serializer?
  • what happens if send fails?
  • is the producer idempotent?
  • what acknowledgement level is required?
  • what retry behavior is enabled?
  • are records batched?
  • how is ordering preserved?
  • how does this integrate with the database transaction?
  • how is publish latency observed?
  • how do we prevent missing events?
  • how do consumers deduplicate duplicate publishes?
  • how do we test the produced record?

A top-tier engineer treats producer code as a reliability boundary.

The producer is the first half of the event contract.


1. Producer Responsibility

Producer responsibilities:

domain event -> event contract -> serialized broker record

A production producer owns:

  • event type,
  • topic,
  • message key,
  • payload schema,
  • headers,
  • event ID,
  • correlation/causation,
  • serialization,
  • publish durability,
  • error handling,
  • metrics,
  • logs,
  • contract tests,
  • outbox integration.

The producer should not be random code scattered across use cases.

Use a dedicated publisher boundary.

public interface CaseEventPublisher {
    void publish(CaseDomainEvent event);
}

Infrastructure implementation:

public final class KafkaCaseEventPublisher implements CaseEventPublisher {
    private final KafkaTemplate<String, CaseEventEnvelope> kafkaTemplate;
    private final CaseEventMapper mapper;
    private final CaseEventTopicPolicy topicPolicy;

    @Override
    public void publish(CaseDomainEvent event) {
        CaseEventEnvelope envelope = mapper.toEnvelope(event);
        ProducerRecord<String, CaseEventEnvelope> record =
            topicPolicy.toProducerRecord(envelope);

        kafkaTemplate.send(record);
    }
}

Business code should not manually build ProducerRecord everywhere.


2. Producer Boundary Architecture

Keep responsibilities separate:

ComponentResponsibility
Domain eventfact from domain model
Publisher portapplication boundary
Mapperdomain event → event contract
Topic policytopic/key/header selection
Serializercontract → bytes
Kafka producerbroker publishing
Outbox relayreliable publication after DB commit

This separation makes testing and governance possible.


3. Do Not Publish Inside Domain Model

Bad:

public final class CaseAggregate {
    private final KafkaTemplate<String, Object> kafkaTemplate;

    public void escalate(...) {
        // mutate state
        kafkaTemplate.send("case-events", event);
    }
}

Problems:

  • domain depends on infrastructure,
  • hard to test,
  • dual-write risk,
  • transaction boundary unclear,
  • topic policy hidden,
  • event mapping scattered.

Better:

public final class CaseAggregate {
    private final List<DomainEvent> events = new ArrayList<>();

    public void escalate(...) {
        // mutate state
        events.add(new CaseEscalated(...));
    }

    public List<DomainEvent> pullEvents() {
        List<DomainEvent> copy = List.copyOf(events);
        events.clear();
        return copy;
    }
}

Application service persists state and records outbox/event messages.


4. Direct Publish vs Outbox Publish

Direct publish:

kafkaTemplate.send(record);

Outbox publish:

write outbox row in DB transaction
relay publishes later

Direct publish may be acceptable for:

  • telemetry,
  • non-critical analytics,
  • cache invalidation where loss is tolerable,
  • best-effort notifications,
  • test/demo systems.

Outbox publish is preferred when:

  • event reflects committed business state,
  • consumers rely on event for correctness,
  • missing event is unacceptable,
  • saga/workflow depends on event,
  • audit/projection/search must be reliable.

Production rule:

If event represents durable business state, use outbox or equivalent.

5. KafkaTemplate Basics

Spring Kafka KafkaTemplate is the common producer abstraction.

Example:

@Service
public final class KafkaCaseEventPublisher {
    private final KafkaTemplate<String, CaseEventEnvelope> kafkaTemplate;

    public CompletableFuture<SendResult<String, CaseEventEnvelope>> publish(
        CaseEventEnvelope envelope
    ) {
        ProducerRecord<String, CaseEventEnvelope> record =
            new ProducerRecord<>(
                "case-events",
                envelope.subjectId(),
                envelope
            );

        record.headers().add("event_type", envelope.type().getBytes(StandardCharsets.UTF_8));
        record.headers().add("event_id", envelope.id().getBytes(StandardCharsets.UTF_8));

        return kafkaTemplate.send(record);
    }
}

Do not ignore the returned future for critical direct publishes.

For outbox relay, the relay must wait for broker acknowledgement before marking row published.


6. Producer Record Anatomy

A Kafka record has:

  • topic,
  • partition optional,
  • timestamp optional,
  • key,
  • value,
  • headers.
ProducerRecord<String, CaseEventEnvelope> record =
    new ProducerRecord<>(
        "case-events",
        null,
        envelope.occurredAt().toEpochMilli(),
        envelope.caseId(),
        envelope,
        headers
    );

Important:

FieldContract implication
topicevent family / routing
keypartitioning and ordering
valuepayload
headersmetadata
timestampevent or producer timestamp depending policy
partitionrarely set manually; can override partitioner

Prefer explicit key.

Avoid manually setting partition unless you have a strong reason.


7. Key Strategy

Key should be produced by policy.

public interface KafkaKeyStrategy<T> {
    String keyFor(T event);
}

Example:

public final class CaseEventKeyStrategy implements KafkaKeyStrategy<CaseEventEnvelope> {
    @Override
    public String keyFor(CaseEventEnvelope event) {
        return event.subjectId(); // caseId
    }
}

Validate:

String key = keyStrategy.keyFor(envelope);

if (key == null || key.isBlank()) {
    throw new InvalidKafkaRecordException("case-events requires non-empty caseId key");
}

Producer key mistakes break ordering.

Make them fail fast.


8. Header Strategy

Headers should be consistent.

Example:

public final class EventHeaders {
    public static final String EVENT_ID = "event_id";
    public static final String EVENT_TYPE = "event_type";
    public static final String EVENT_VERSION = "event_version";
    public static final String CORRELATION_ID = "correlation_id";
    public static final String CAUSATION_ID = "causation_id";
    public static final String PRODUCER = "producer";
    public static final String SCHEMA_ID = "schema_id";
}

Add headers:

private Headers headersFor(CaseEventEnvelope envelope) {
    RecordHeaders headers = new RecordHeaders();

    put(headers, EventHeaders.EVENT_ID, envelope.id());
    put(headers, EventHeaders.EVENT_TYPE, envelope.type());
    put(headers, EventHeaders.EVENT_VERSION, Integer.toString(envelope.version()));
    put(headers, EventHeaders.CORRELATION_ID, envelope.correlationId());
    put(headers, EventHeaders.CAUSATION_ID, envelope.causationId());
    put(headers, EventHeaders.PRODUCER, "case-service");

    return headers;
}

private static void put(Headers headers, String key, String value) {
    if (value != null) {
        headers.add(key, value.getBytes(StandardCharsets.UTF_8));
    }
}

Do not put secrets in headers.

Headers are often visible to many tools.


9. Event Envelope

Example Java record:

public record CaseEventEnvelope(
    String id,
    String source,
    String type,
    int version,
    String subjectId,
    Instant occurredAt,
    String correlationId,
    String causationId,
    long aggregateVersion,
    Object data
) {}

This is similar in spirit to CloudEvents.

You can use CloudEvents libraries or your own standardized envelope.

The important thing is consistency.

Every event should have:

  • stable event ID,
  • source,
  • type,
  • version/schema,
  • subject/resource,
  • occurrence time,
  • correlation/causation,
  • payload.

10. Serialization Choices

Common producer serialization choices:

FormatProducer implication
JSONeasy to inspect, larger, needs schema discipline
JSON Schemavalidation and compatibility
Avroschema registry friendly, compact
Protobufcompact, generated code
CloudEvents JSONstandardized envelope
CloudEvents binary modemetadata in headers, data as payload

Do not serialize Java objects with Java native serialization.

Kafka records are integration contracts.

Use explicit, language-neutral formats.


11. Producer Config: Reliability Baseline

Key Kafka producer configurations:

acks=all
enable.idempotence=true
retries=2147483647
delivery.timeout.ms=120000
request.timeout.ms=30000
max.in.flight.requests.per.connection=5

Meaning:

ConfigMeaning
acks=allwait for all in-sync replicas according to broker settings
enable.idempotence=trueproducer retries do not create duplicates within idempotent producer scope
retriesallow retry of transient send failures
delivery.timeout.msupper bound for record delivery
request.timeout.msbroker request timeout
max.in.flight.requests.per.connectionaffects ordering with retries/idempotence

Exact values depend on Kafka version and platform defaults.

But the principle is stable:

critical events need strong broker acknowledgement and idempotent producer behavior

Do not use defaults blindly for business-critical publishing.


12. Idempotent Producer

Kafka idempotent producer prevents duplicate records caused by producer retries within its guarantee scope.

It uses producer IDs and sequence numbers.

Benefits:

  • safer retries,
  • reduced duplicate publish from transient broker errors,
  • better ordering behavior with configured in-flight requests.

Limits:

  • does not solve database + broker dual-write,
  • does not deduplicate application-generated duplicate events,
  • does not make consumers idempotent,
  • does not prevent duplicate outbox relay publish after crash before mark-published,
  • does not make external side effects exactly once.

Use idempotent producer.

Still use outbox and idempotent consumers where required.


13. Acknowledgement Level

acks controls broker acknowledgement.

Common values:

acksMeaning
0producer does not wait
1leader acknowledges
all / -1all in-sync replicas acknowledge according to broker config

For critical events:

acks=all

But acks=all alone is not enough.

Broker topic replication and min.insync.replicas matter.

Producer reliability is producer + broker topic configuration.

A producer cannot compensate for a badly configured topic.


14. Delivery Timeout

delivery.timeout.ms bounds how long producer will try to deliver a record.

If exceeded, send fails.

This matters for outbox relay:

relay should not mark row published if delivery timeout occurs

For direct publish:

application must decide whether failure means command fails or event is best-effort

If a business transaction already committed and direct publish fails, you have missing event risk.

That is why outbox exists.


15. Batching and Linger

Producer performance config:

batch.size=32768
linger.ms=5
compression.type=zstd

Trade-off:

ConfigEffect
batch.sizemax batch bytes per partition
linger.mswait briefly to batch more records
compression.typereduce bytes, increase CPU
buffer.memoryproducer memory buffer

Higher batching improves throughput.

It may add latency.

For user-facing command events, small linger may be fine.

For analytics high-throughput events, larger batching may be good.

Measure.

Do not copy performance configs from unrelated workloads.


16. Compression

Compression can reduce broker/network cost.

Options depend on platform:

compression.type=none
compression.type=gzip
compression.type=snappy
compression.type=lz4
compression.type=zstd

Choose based on:

  • payload size,
  • CPU budget,
  • broker support,
  • consumer support,
  • latency sensitivity,
  • network cost.

For small events, compression may not help.

For large JSON events, it often helps.

Benchmark with real payloads.


17. Producer Buffering and Backpressure

If broker is slow/unavailable, producer buffers records.

Eventually send() can block or fail depending config and buffer exhaustion.

Watch:

buffer.memory
max.block.ms
delivery.timeout.ms
record-error-rate
record-retry-rate
buffer-available-bytes

Application policy:

  • fail fast for direct critical publish?
  • rely on outbox backlog for durable events?
  • shed optional telemetry?
  • throttle producers?
  • alert on outbox pending age?

Backpressure must be explicit.


18. Outbox Relay Producer

Outbox relay publish must:

  1. read pending outbox rows,
  2. map row to ProducerRecord,
  3. send to Kafka,
  4. wait for broker ack,
  5. mark row published,
  6. retry failure without changing event ID.

Example:

public final class OutboxKafkaRelay {
    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, byte[]> kafkaTemplate;

    public void publishBatch() {
        List<OutboxMessage> batch = outboxRepository.claimPending(100);

        for (OutboxMessage message : batch) {
            publishOne(message);
        }
    }

    private void publishOne(OutboxMessage message) {
        ProducerRecord<String, byte[]> record = toRecord(message);

        try {
            kafkaTemplate.send(record).get(30, TimeUnit.SECONDS);
            outboxRepository.markPublished(message.id());
        } catch (Exception ex) {
            outboxRepository.markPublishFailed(message.id(), ex);
        }
    }
}

This is simple but blocks per record.

You can optimize with async sends and controlled concurrency.

Correctness first, optimization second.


19. Async Relay With Controlled Concurrency

public void publishBatchAsync() {
    List<OutboxMessage> batch = outboxRepository.claimPending(100);

    List<CompletableFuture<Void>> futures = batch.stream()
        .map(message -> kafkaTemplate.send(toRecord(message))
            .thenAccept(result -> outboxRepository.markPublished(message.id()))
            .exceptionally(error -> {
                outboxRepository.markPublishFailed(message.id(), error);
                return null;
            }))
        .toList();

    CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
}

Risks:

  • too many concurrent sends,
  • ordering per aggregate,
  • DB update storm,
  • partial failures,
  • shutdown in middle.

Use bounded concurrency.

Preserve ordering where required.


20. Producer Transactions

Kafka supports producer transactions for atomic writes to multiple Kafka partitions/topics and offset commits in stream-processing patterns.

This can be useful for Kafka-to-Kafka pipelines.

But producer transactions do not solve:

database write + Kafka publish

unless your architecture includes a carefully designed transaction integration.

For typical Java microservice with PostgreSQL + Kafka:

use database transaction + outbox

rather than trying to make DB and Kafka one distributed transaction.

Kafka transactions are powerful, but scope matters.


21. Topic Policy Object

Create explicit topic policy.

public record TopicPolicy(
    String topic,
    boolean keyRequired,
    String keyField,
    String compatibilityMode,
    int eventMajorVersion
) {}

Mapper:

public ProducerRecord<String, CaseEventEnvelope> toRecord(CaseEventEnvelope envelope) {
    String key = envelope.subjectId();

    if (key == null || key.isBlank()) {
        throw new InvalidMessageKeyException("case-events key is required");
    }

    return new ProducerRecord<>(
        "case-events",
        key,
        envelope
    );
}

Topic policy should be testable.


22. Producer Interceptors

Kafka producer interceptors can observe or mutate records before send and after acknowledgement.

Use carefully.

Good uses:

  • add standard headers,
  • metrics,
  • tracing,
  • policy validation,
  • redaction checks.

Avoid:

  • hidden business logic,
  • changing event type,
  • changing key,
  • adding required fields invisibly,
  • expensive blocking operations.

Interceptors are cross-cutting.

Contracts should still be visible in code and tests.


23. Tracing

Producer should propagate trace/correlation context.

Headers:

traceparent
tracestate
correlation_id
causation_id

OpenTelemetry instrumentation may handle trace headers.

Application should ensure business correlation/causation IDs exist.

Do not rely on trace ID alone for business process correlation because traces may be sampled.


24. Observability

Producer metrics:

producer.records.sent.total{topic,event_type,status}
producer.send.duration{topic,event_type}
producer.send.failures.total{topic,event_type,reason}
producer.record.size.bytes{topic,event_type}
producer.serialization.failures.total{topic,event_type}
producer.null_key.total{topic,event_type}
producer.retry.total{topic,event_type}
producer.buffer.exhausted.total{producer}
outbox.rows.created.total{event_type}
outbox.pending.count
outbox.oldest_pending_age.seconds
outbox.publish.failures.total{reason}

Logs:

  • publish failure,
  • invalid key,
  • serialization failure,
  • outbox relay failure,
  • schema registry failure,
  • topic missing/authorization failure,
  • event contract validation failure.

Do not log full payload by default.


25. Alerts

Useful producer alerts:

AlertMeaning
send failure spikebroker/network/auth/schema issue
serialization failure > 0producer contract bug
null key on keyed topicordering bug
outbox pending age highevent publication delayed
outbox pending count highbroker/relay issue
producer buffer exhaustionbroker slow/down or producer overload
unauthorized topic errorACL/config issue
record too largeschema/payload bug
schema registry failuresproducer cannot serialize/register

Producer alerts should route to event owner.


26. Testing Producer Contract

Test produced record.

@Test
void publishesCaseEscalatedWithCorrectTopicKeyAndHeaders() {
    CaseEscalated event = new CaseEscalated(
        new EventId("evt-123"),
        new CaseId("CASE-100"),
        new EscalationId("ESC-900"),
        42L
    );

    ProducerRecord<String, CaseEventEnvelope> record =
        topicPolicy.toRecord(mapper.toEnvelope(event));

    assertThat(record.topic()).isEqualTo("case-events");
    assertThat(record.key()).isEqualTo("CASE-100");
    assertThat(header(record, "event_id")).isEqualTo("evt-123");
    assertThat(header(record, "event_type")).isEqualTo("com.example.case.CaseEscalated.v1");
}

This catches many integration bugs before broker tests.


27. Testing Serialization

Use real serializer.

@Test
void serializedEventCanBeDeserializedByConsumerSchema() {
    CaseEventEnvelope envelope = fixtureEnvelope();

    byte[] bytes = serializer.serialize("case-events", envelope);

    CaseEventEnvelope decoded = deserializer.deserialize("case-events", bytes);

    assertThat(decoded.id()).isEqualTo(envelope.id());
}

For schema registry, use test registry/mock registry where appropriate.

Serialization is part of contract.


28. Embedded Kafka / Testcontainers

Use integration tests for:

  • actual Kafka producer config,
  • topic creation,
  • serializer/deserializer,
  • headers,
  • key,
  • send failures,
  • retry behavior,
  • record too large,
  • transaction/idempotence where needed.

Testcontainers with Kafka is common for realistic integration tests.

Embedded broker can be useful for fast Spring tests.

Do not test all logic only with mocks.


29. Outbox Transaction Test

@Test
void businessCommitCreatesOutboxRow() {
    useCase.createEscalation(command);

    List<OutboxMessage> rows = outboxRepository.findPending();

    assertThat(rows).hasSize(1);
    assertThat(rows.get(0).aggregateId()).isEqualTo("CASE-100");
    assertThat(rows.get(0).messageKey()).isEqualTo("CASE-100");
}

Rollback test:

@Test
void rollbackDoesNotCreateOutboxRow() {
    assertThatThrownBy(() -> useCase.createEscalation(invalidCommand))
        .isInstanceOf(DomainException.class);

    assertThat(outboxRepository.findAll()).isEmpty();
}

This proves the main outbox invariant.


30. Producer Failure Test

@Test
void relayDoesNotMarkPublishedWhenKafkaSendFails() {
    OutboxMessage message = outboxRepository.insert(pendingMessage("evt-123"));

    kafkaBroker.failSends();

    relay.publishBatch();

    OutboxMessage updated = outboxRepository.get(message.id());

    assertThat(updated.status()).isEqualTo(OutboxStatus.FAILED_RETRYABLE);
    assertThat(updated.publishedAt()).isNull();
}

Never mark published before broker ack.


31. Production Producer Configuration Template

kafkaProducer:
  service: case-service

  defaults:
    acks: all
    enableIdempotence: true
    retries: 2147483647
    deliveryTimeoutMs: 120000
    requestTimeoutMs: 30000
    maxInFlightRequestsPerConnection: 5
    lingerMs: 5
    batchSizeBytes: 32768
    compressionType: zstd

  topics:
    case-events:
      owner: case-platform
      key:
        field: caseId
        required: true
      schema:
        format: json-schema
        compatibility: full-transitive
      publishing:
        mode: outbox
        directPublishAllowed: false
      observability:
        nullKeyAlert: true
        serializationFailureAlert: true
        outboxPendingAgeSecondsAlert: 60

Configuration should be reviewed like code.


32. Common Anti-Patterns

32.1 kafkaTemplate.send() everywhere

No governance, no key consistency, no testing.

32.2 Direct publish after DB commit

Dual-write missing-event risk.

32.3 Null key for ordered event

Ordering breaks.

32.4 New event ID on relay retry

Consumer dedup breaks.

32.5 Ignoring send future for critical direct publish

Failures disappear.

32.6 Marking outbox row published before broker ack

Message loss.

32.7 Logging full event payload

Privacy/security risk.

32.8 Producer config copied blindly

Wrong latency/reliability trade-offs.

32.9 Schema registry error treated as transient forever

Poison producer loop.

32.10 No producer contract tests

Topic/key/header drift reaches production.


33. Decision Model

Producer design starts with event criticality.


34. Design Checklist

Before shipping a Kafka producer:

  • Is publish direct or outbox?
  • If business event, is outbox used?
  • Is topic policy explicit?
  • Is key required and tested?
  • Is event ID stable?
  • Are headers standardized?
  • Is schema registered/validated?
  • Is serializer tested?
  • Is producer idempotence enabled?
  • Is acks appropriate?
  • Are retries bounded by delivery timeout?
  • Is batching/linger measured?
  • Is compression measured?
  • Are send failures observed?
  • Does relay mark published only after ack?
  • Are duplicate publishes safe?
  • Are producer metrics/alerts configured?
  • Are contract tests written?
  • Is payload logging safe?

35. The Real Lesson

Kafka producer code is not plumbing.

It is where your domain facts become durable integration contracts.

A production-grade producer guarantees:

correct event
+ correct topic
+ correct key
+ correct schema
+ correct headers
+ durable publish path
+ stable identity
+ observable failure

If the producer is sloppy, every consumer pays.

Get producer design right first.


References

Lesson Recap

You just completed lesson 73 in deepen practice. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.

Continue The Track

Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.