Deepen PracticeOrdered learning track

R2DBC Mental Model

Learn Java Data Access Pattern In Action - Part 049

R2DBC mental model untuk Java reactive data access: non-blocking relational access, Publisher/Flux/Mono, connection lifecycle, pooling, transaction operator, backpressure, mapping, timeout, retry, dan production caveats.

12 min read2376 words
PrevNext
Lesson 4960 lesson track34–50 Deepen Practice
#java#data-access#r2dbc#reactive+5 more

Part 049 — R2DBC Mental Model

R2DBC bukan JDBC yang dibungkus Mono/Flux.

R2DBC adalah SPI reactive untuk akses database relasional secara non-blocking.

Jika JDBC punya mental model:

Thread blocks while waiting for database I/O.

R2DBC punya mental model:

Event loop subscribes to asynchronous database I/O.
Result arrives later through Publisher signals.

Ini bisa meningkatkan scalability pada workload tertentu, tetapi juga membawa kompleksitas transaction, connection lifecycle, context propagation, backpressure, debugging, dan driver maturity.

Part ini membahas mental model R2DBC untuk production-grade Java data access.


1. Core Thesis

R2DBC adalah alat untuk non-blocking relational data access.

Ia cocok ketika:

  • stack aplikasi reactive end-to-end;
  • banyak concurrent I/O-bound request;
  • blocking thread cost signifikan;
  • driver/database support matang;
  • team menguasai reactive programming;
  • transaction dan connection lifecycle dikelola dengan benar.

Ia tidak otomatis membuat database lebih cepat.

Rule utama:

Reactive data access removes blocking threads.
It does not remove database bottleneck.

Database tetap punya:

  • connection limit;
  • lock;
  • transaction isolation;
  • index;
  • slow query;
  • row contention;
  • timeout;
  • backpressure mismatch;
  • resource lifecycle.

2. JDBC vs R2DBC Mental Model

JDBC:

try (Connection connection = dataSource.getConnection()) {
    PreparedStatement ps = connection.prepareStatement(sql);
    ResultSet rs = ps.executeQuery();

    while (rs.next()) {
        ...
    }
}

Thread menunggu selama:

  • connection acquisition;
  • query execution;
  • result fetching;
  • network I/O.

R2DBC:

Mono<CaseFileRow> row =
        databaseClient.sql("""
            select id, case_number, status
            from case_file
            where id = :id
            """)
        .bind("id", id)
        .map((r, meta) -> mapCaseFile(r))
        .one();

Execution terjadi ketika subscribed. Result dikirim melalui reactive signals.


3. Reactive Streams Basics

Reactive Streams punya konsep:

  • Publisher;
  • Subscriber;
  • Subscription;
  • demand/request;
  • signal onNext;
  • signal onError;
  • signal onComplete.

In Reactor:

  • Mono<T>: 0..1 item;
  • Flux<T>: 0..N item.

R2DBC API mengembalikan Publisher.

Key point:

Nothing happens until subscribed.

In WebFlux, framework subscribes to returned Mono/Flux.


4. R2DBC Is Not ORM

R2DBC is lower-level relational access.

It does not provide:

  • JPA persistence context;
  • dirty checking;
  • lazy loading entity graph;
  • automatic cascade;
  • @Version entity lifecycle semantics like ORM;
  • transparent Unit of Work.

Higher-level libraries like Spring Data R2DBC add repository abstractions, but mental model remains reactive SQL mapping, not JPA ORM.


5. R2DBC Components

Typical stack:

Application / WebFlux Handler
  -> Service returning Mono/Flux
    -> Repository / DAO
      -> DatabaseClient / R2dbcEntityTemplate / ConnectionFactory
        -> R2DBC Driver
          -> Database

Important components:

  • ConnectionFactory;
  • connection pool;
  • DatabaseClient;
  • R2dbcEntityTemplate;
  • transaction manager/operator;
  • codecs/converters;
  • reactive scheduler/event loop.

6. Connection Lifecycle

In JDBC, connection is held by thread/blocking call.

In R2DBC, connection is acquired asynchronously and must stay associated with reactive chain.

Example with Spring transaction:

@Transactional
public Mono<ApproveCaseResult> approve(ApproveCaseCommand command) {
    return caseRepository.loadForApproval(command.caseId())
            .switchIfEmpty(Mono.error(new CaseNotFound(command.caseId())))
            .flatMap(caseFile -> {
                caseFile.approve(command.actorId(), command.reason());
                return caseRepository.save(caseFile)
                        .then(auditRepository.append(...))
                        .then(outboxRepository.append(...))
                        .thenReturn(ApproveCaseResult.from(caseFile));
            });
}

The transaction connection is propagated through Reactor context, not ThreadLocal in the same way as imperative JDBC.

Breaking the chain can lose transaction context.


7. Transaction Propagation Is Context-Based

Reactive transaction state cannot rely on normal blocking ThreadLocal assumptions.

In Reactor/Spring reactive transactions, transaction context is propagated through reactive chain.

Bad:

@Transactional
public Mono<Void> approve(Command command) {
    caseRepository.update(command);

    return Mono.just(null);
}

If repository call is not part of returned chain, it may execute outside expected transaction or not execute as intended.

Correct:

@Transactional
public Mono<Void> approve(Command command) {
    return caseRepository.update(command)
            .then(auditRepository.append(...))
            .then(outboxRepository.append(...));
}

Always compose and return the chain.


8. TransactionalOperator

Explicit transaction operator:

public Mono<ApproveCaseResult> approve(ApproveCaseCommand command) {
    return transactionalOperator.transactional(
            commandDedup.start(command)
                    .then(caseRepository.loadForApproval(command.caseId()))
                    .switchIfEmpty(Mono.error(new CaseNotFound(command.caseId())))
                    .flatMap(caseFile -> {
                        caseFile.approve(command.actorId(), command.reason());

                        return caseRepository.save(caseFile)
                                .then(auditRepository.append(...))
                                .then(outboxRepository.append(...))
                                .then(commandDedup.complete(command.id(), ApproveCaseResult.from(caseFile)))
                                .thenReturn(ApproveCaseResult.from(caseFile));
                    })
    );
}

This makes transaction boundary explicit.


9. Reactive Transaction Rule

Inside a reactive transaction:

Do not subscribe manually.
Do not block.
Do not start unrelated chain outside returned Publisher.
Do not jump scheduler unnecessarily.
Do not call blocking JDBC.

Manual subscribe() inside service is a serious smell.

Bad:

repository.save(entity).subscribe();
return Mono.just(result);

This detaches work from transaction/error handling.


10. Connection Pool Still Exists

R2DBC still uses database connections.

Even if non-blocking, DB can process only limited concurrent queries.

Connection pool controls concurrency to DB.

If pool size = 50, only about 50 concurrent database interactions can hold connections.

Reactive does not mean infinite DB concurrency.

Backpressure must protect database.


11. Event Loop Must Not Block

In reactive stack, event-loop threads must stay non-blocking.

Bad:

.map(row -> blockingHttpClient.call(row.id()))

Bad:

.map(row -> jdbcTemplate.queryForObject(...))

Bad:

.map(row -> Files.readString(path))

If blocking unavoidable, isolate on bounded elastic scheduler and understand cost. But mixing blocking I/O defeats the main benefit.


12. Blocking Call Detection

Use tools/practices:

  • BlockHound in tests/dev;
  • code review;
  • thread naming;
  • reactive scheduler metrics;
  • latency diagnostics.

If reactive pipeline blocks event loop, p99 latency can collapse under load.


13. Backpressure

Reactive Streams allow downstream to signal demand.

But database backpressure is limited by:

  • query execution already started;
  • DB returns rows through driver;
  • connection occupied while streaming;
  • transaction open until completion;
  • database may not pause work perfectly.

Backpressure helps memory/consumer flow, not query cost itself.

Do not run unbounded queries just because Flux supports backpressure.


14. Flux Result Streaming

Flux<CaseExportRow> rows =
        databaseClient.sql("""
            select id, case_number, status
            from case_file
            where tenant_id = :tenantId
            order by id
            """)
        .bind("tenantId", tenantId)
        .map((r, m) -> mapExportRow(r))
        .all();

Cautions:

  • connection held until Flux completes/cancelled;
  • transaction/snapshot may be long;
  • slow subscriber holds DB resources;
  • export writing to remote file may back up;
  • error/cancel must release resources.

For large export, chunked keyset often still better.


15. Chunked Reactive Read

Mono<Void> exportAll(CaseFileIdCursor start) {
    return Flux.generate(
                () -> start,
                (cursor, sink) -> {
                    sink.next(cursor);
                    return cursor;
                }
            )
            .concatMap(cursor ->
                    query.readAfter(cursor, 1000)
                         .flatMapMany(slice -> {
                             if (slice.items().isEmpty()) {
                                 return Flux.empty();
                             }
                             return writer.write(slice.items())
                                     .thenMany(Flux.just(slice.nextCursor()));
                         })
            )
            .then();
}

In practice, recursive/expand pattern may be cleaner.

Key idea:

Bound each DB query.
Release connection between chunks if possible.
Persist progress for resumability.

16. R2DBC Mapping

With Spring DatabaseClient:

private CaseFileRow mapCaseFile(Row row) {
    return new CaseFileRow(
            new CaseFileId(row.get("id", UUID.class)),
            new TenantId(row.get("tenant_id", UUID.class)),
            new CaseNumber(row.get("case_number", String.class)),
            CaseStatus.fromDbCode(row.get("status", String.class)),
            row.get("version", Long.class),
            row.get("updated_at", Instant.class)
    );
}

Mapping discipline remains:

  • required fields;
  • nullable left joins;
  • enum codes;
  • value objects;
  • timestamp semantics.

Reactive does not remove mapping risks.


17. Spring Data R2DBC

Spring Data R2DBC provides repositories/templates.

It is not Spring Data JPA.

Differences:

  • no JPA persistence context;
  • no lazy loading;
  • no dirty checking;
  • limited relationship mapping compared to ORM;
  • explicit saves/queries;
  • transaction context reactive.

Use it as reactive SQL mapping tool, not JPA replacement.


18. R2DBC Entity Save Semantics

Spring Data R2DBC save may decide insert/update based on ID/newness strategy.

Be explicit for critical operations.

For domain command, prefer intent-specific SQL:

Mono<Integer> updateStatusWithVersion(...) {
    return databaseClient.sql("""
        update case_file
        set status = :nextStatus,
            version = version + 1,
            updated_at = :now
        where tenant_id = :tenantId
          and id = :caseId
          and version = :expectedVersion
        """)
        .bind(...)
        .fetch()
        .rowsUpdated();
}

Then check row count.


19. Rows Updated in Reactive Chain

return caseDao.approve(command)
        .flatMap(updated -> {
            if (updated == 0) {
                return Mono.error(new OptimisticConflict(command.caseId()));
            }
            if (updated != 1) {
                return Mono.error(new DataAccessInvariantViolation(...));
            }
            return Mono.empty();
        });

Affected rows remain correctness signal.


20. Conditional Update Pattern

public Mono<Void> approveWithExpectedVersion(ApproveCaseCommand command) {
    return databaseClient.sql("""
            update case_file
            set status = 'APPROVED',
                approved_by = :actorId,
                approved_at = :now,
                updated_at = :now,
                version = version + 1
            where tenant_id = :tenantId
              and id = :caseId
              and status = 'UNDER_REVIEW'
              and version = :expectedVersion
            """)
            .bind("tenantId", command.tenantId().value())
            .bind("caseId", command.caseId().value())
            .bind("actorId", command.actorId().value())
            .bind("now", command.requestedAt())
            .bind("expectedVersion", command.expectedVersion())
            .fetch()
            .rowsUpdated()
            .flatMap(count -> count == 1
                    ? Mono.empty()
                    : Mono.error(new CaseTransitionConflict(command.caseId())));
}

This is reactive SQL-first mutation.


21. Outbox in R2DBC

Outbox insert in same reactive transaction:

return caseDao.approve(command)
        .then(auditDao.append(audit))
        .then(outboxDao.append(event));

Do not publish to broker directly inside transaction.

Reactive outbox publisher can separately poll/claim/publish/mark.

Same outbox correctness rules apply.


22. Error Handling

Reactive errors propagate through onError.

return repository.save(...)
        .onErrorMap(ex -> sqlErrorClassifier.isDuplicate(ex, "uq_case_number"),
                ex -> new DuplicateCaseNumber(caseNumber, ex));

Be careful:

  • onErrorResume can swallow errors accidentally;
  • rollback depends on error signal leaving transactional boundary;
  • mapping to fallback success can commit partial transaction if not designed.

23. Error Swallowing Pitfall

Bad:

return auditDao.append(audit)
        .onErrorResume(ex -> Mono.empty());

If audit is required for command correctness, swallowing error allows transaction to commit without audit.

Use fallback only when optional by contract.


24. Rollback Semantics

Reactive transaction rolls back when returned Publisher terminates with error.

If you catch and convert error to success inside transaction, transaction may commit.

Example:

return transactionalOperator.transactional(
        caseDao.approve(command)
                .then(outboxDao.append(event))
                .onErrorResume(ex -> Mono.empty()) // commits if no error leaves
);

This is dangerous.


25. Timeout

Use timeouts:

return caseDao.search(query)
        .timeout(Duration.ofSeconds(1));

But also use:

  • driver/database statement timeout if available;
  • transaction timeout;
  • pool acquisition timeout.

Reactive timeout cancels subscription, but database query cancellation semantics depend driver/database.

Do not rely solely on Reactor timeout for DB resource control.


26. Retry

Reactive retry:

.retryWhen(Retry.backoff(3, Duration.ofMillis(50))
        .filter(this::isRetryableTransactionFailure))

Rules:

  • retry whole transaction;
  • idempotency required;
  • do not retry optimistic conflict blindly;
  • do not retry validation/duplicate business errors;
  • use jitter/backoff/budget.

Wrap transaction inside retry or retry transaction function carefully.


27. Correct Retry Placement

Retry entire transactional operation:

return retryPolicy.execute(() ->
        transactionalOperator.transactional(
                approveTransaction(command)
        )
);

Conceptually:

retry {
  begin tx
  all DB writes
  commit
}

Do not retry only the final insert after transaction failure.


28. Pool Exhaustion

Reactive app can accept many concurrent requests cheaply, but DB pool may be small.

If many requests wait for connection:

  • latency spikes;
  • timeouts;
  • memory queues grow;
  • backpressure may not protect HTTP ingress by default.

Use:

  • pool limits;
  • request rate limit;
  • bulkhead;
  • timeout;
  • bounded concurrency operators;
  • load shedding.

29. Bounded Concurrency

For processing items:

Flux.fromIterable(items)
        .flatMap(item -> repository.process(item), 16)

The second parameter bounds concurrency.

Without bound:

.flatMap(item -> repository.process(item))

can create too much DB pressure.

For DB operations, always think about concurrency limit.


30. concatMap vs flatMap

concatMap:

sequential, preserves order, lower concurrency

flatMap:

concurrent, order not guaranteed, can overwhelm DB if unbounded

flatMapSequential:

concurrent but emits ordered

Choose deliberately.


31. Transaction and flatMap

Inside one transaction, concurrent DB operations over same connection may not be supported/meaningful.

Bad:

@Transactional
public Mono<Void> saveAll(List<Row> rows) {
    return Flux.fromIterable(rows)
            .flatMap(row -> dao.insert(row)) // concurrent within tx?
            .then();
}

Use sequential or batch:

.concatMap(row -> dao.insert(row))

or proper batch statement/chunk.


32. Reactive Batch

R2DBC batch support depends driver/API.

For high-volume batch, evaluate:

  • driver batch performance;
  • multi-row insert;
  • COPY/bulk loader if DB supports;
  • JDBC batch in separate worker if reactive not needed;
  • backfill architecture.

Reactive does not automatically make batch faster.


33. Testing Reactive Data Access

Use real DB and StepVerifier.

@Test
void approveUpdatesOneRow() {
    StepVerifier.create(useCase.approve(command))
            .expectNextMatches(result -> result.status() == APPROVED)
            .verifyComplete();

    StepVerifier.create(caseQuery.find(command.caseId()))
            .assertNext(row -> assertThat(row.status()).isEqualTo(APPROVED))
            .verifyComplete();
}

Test transaction rollback, not just successful signal.


34. Rollback Test

@Test
void rollbackWhenOutboxFails() {
    StepVerifier.create(useCase.approveWithBrokenOutbox(command))
            .expectError()
            .verify();

    StepVerifier.create(caseQuery.find(command.caseId()))
            .assertNext(row -> assertThat(row.status()).isEqualTo(UNDER_REVIEW))
            .verifyComplete();
}

This proves reactive transaction is wired.


35. Test No Manual Subscribe

Code review/static analysis should flag .subscribe() inside service/data layer.

In tests, manual subscribe can hide failures.

Use StepVerifier or return Publisher to test framework.


36. BlockHound Test

In reactive stack, enable BlockHound in tests/dev to detect blocking calls.

It can catch:

  • Thread.sleep;
  • blocking JDBC;
  • blocking file I/O;
  • synchronous HTTP client.

Some libraries need allowlist. Use carefully but seriously.


37. Observability

Metrics:

r2dbc.pool.acquired
r2dbc.pool.pending
r2dbc.pool.max
data.query.duration{query}
data.query.timeout.count{query}
data.query.error.count{query,error_type}
reactor.scheduler.tasks.pending
http.server.requests.active
transaction.retry.count

Trace reactive chain with query names.

Reactive debugging without observability is painful.


38. Debugging Reactive Stack

Reactive stack traces can be hard.

Use:

  • checkpoint labels in critical flows;
  • operator naming where supported;
  • structured logging with correlation ID;
  • Reactor context propagation;
  • metrics per query/use case.

Example:

return caseDao.approve(command)
        .checkpoint("ApproveCaseUseCase.approve.caseDao.approve")
        .then(outboxDao.append(event).checkpoint("ApproveCaseUseCase.approve.outbox"));

Use checkpoints selectively to avoid overhead.


39. Context Propagation

ThreadLocal-based context such as tenant/security/MDC may not propagate automatically in reactive pipelines.

Use:

  • Reactor Context;
  • framework security context integration;
  • explicit tenant parameter in query object;
  • context propagation libraries carefully.

Best for data access:

Pass tenant/scope explicitly.

Do not rely on hidden ThreadLocal tenant in reactive data access unless framework guarantees propagation and tests cover it.


40. Tenant Scope

Reactive query should still include tenant explicitly:

databaseClient.sql("""
    select ...
    from case_file
    where tenant_id = :tenantId
      and id = :caseId
    """)

Tenant context propagation bugs can become cross-tenant data leak.

Explicit query object is safer.


41. Reactive Repository Contract

Method naming still matters:

Mono<CaseFileRow> findByTenantAndId(TenantId tenantId, CaseFileId id);
Flux<CaseDashboardRow> search(CaseDashboardQuery query);
Mono<Integer> approveWithExpectedVersion(ApproveCaseCommand command);
Mono<Void> insertOutbox(OutboxEvent event);

Return type semantics:

  • Mono.empty() for not found;
  • Mono.error for semantic failure;
  • Flux for bounded/streaming rows;
  • never return null.

42. Mono.empty vs Error

Find optional:

Mono<CaseFileRow> findById(...)

Empty means not found.

Required get:

Mono<CaseFileRow> getById(...)

can:

.switchIfEmpty(Mono.error(new CaseNotFound(id)))

Be consistent.


43. Backpressure and API Response

Returning Flux<Row> directly to HTTP streaming endpoint can keep DB connection open while client consumes.

If client slow, DB resource held.

For large responses, consider:

  • chunk and buffer to file/object storage;
  • async export job;
  • page/cursor API;
  • rate limit streaming;
  • disconnect/cancel handling.

44. Cancellation

Reactive streams can be cancelled.

If client disconnects:

  • Publisher cancellation should release DB resources;
  • transaction should rollback if in progress;
  • driver support matters.

Test cancellation for long streams if critical.


45. R2DBC and Stored Procedures/Advanced DB Features

Support varies by driver/database.

Before adopting R2DBC for SQL-heavy legacy DB, verify:

  • transaction support;
  • generated keys/returning;
  • batch;
  • cursors;
  • stored procedure;
  • JSON/array types;
  • lock syntax;
  • time types;
  • SSL/auth features;
  • observability.

Driver maturity is a production criterion.


46. R2DBC and Migrations

Schema migrations are typically still run by blocking tools at startup/deploy, or external migration process.

Do not block reactive event loop running migration.

Better:

  • run Flyway/Liquibase before app starts;
  • run migration job in deployment pipeline;
  • use blocking migration on startup before reactive server starts if acceptable.

Runtime request path should not run migrations.


47. R2DBC Adoption Path

Do not rewrite whole app because reactive is trendy.

Adoption path:

  1. Identify I/O-bound endpoints with high concurrency need.
  2. Verify full stack can be non-blocking.
  3. Verify driver/database feature support.
  4. Build one query service/use case.
  5. Add pool/concurrency limits.
  6. Add transaction/rollback tests.
  7. Add BlockHound/observability.
  8. Load test with realistic DB.
  9. Compare with JDBC + virtual threads/simple blocking stack.

48. When R2DBC Is a Bad Fit

Bad fit if:

  • app is mostly CPU-bound;
  • DB is bottleneck;
  • team unfamiliar with reactive;
  • many blocking libraries unavoidable;
  • transaction logic complex and easier imperative;
  • ORM/JPA features needed;
  • driver lacks needed DB support;
  • debugging/observability not ready;
  • workload low concurrency;
  • virtual threads solve thread-scaling sufficiently.

Reactive complexity must buy something measurable.


49. Review Checklist

  • Is stack reactive end-to-end?
  • Are there blocking calls in pipeline?
  • Is connection pool bounded?
  • Is DB concurrency limited?
  • Is transaction boundary composed in returned Publisher?
  • No manual subscribe() in service/data layer?
  • Tenant/scope explicit?
  • Rows updated checked?
  • Errors mapped without swallowing required failures?
  • Retry retries whole transaction and is idempotent?
  • Timeout at query/transaction/request level?
  • flatMap concurrency bounded?
  • Large streams do not hold connection indefinitely?
  • Rollback tests with real DB?
  • Driver supports required features?
  • Observability exists?

50. Anti-Pattern: Wrapping JDBC in Mono

Bad:

Mono.fromCallable(() -> jdbcTemplate.queryForObject(...))

This is blocking JDBC on a scheduler, not R2DBC.

It may be acceptable as bridge on bounded elastic for legacy code, but do not call it non-blocking data access.


51. Anti-Pattern: Manual Subscribe

dao.save(row).subscribe();

Breaks transaction/error flow.

Return/compose Publisher.


52. Anti-Pattern: Unbounded flatMap to DB

Can overload connection pool/database.

Bound concurrency.


53. Anti-Pattern: Swallow Error Inside Transaction

Can commit partial operation.

Only recover errors that are optional by contract.


54. Anti-Pattern: Reactive Stream as Export Without Resource Plan

Slow client holds DB connection.

Use async export/chunking when needed.


55. Mini Lab

Design R2DBC use case:

Approve case:
- command idempotency;
- load current case;
- expected version;
- update status;
- insert audit;
- insert outbox;
- transaction rollback;
- duplicate command replay;
- timeout;
- retry deadlock only.

Tasks:

  1. Define reactive repository methods.
  2. Compose transaction chain.
  3. Decide Mono.empty vs errors.
  4. Check update count.
  5. Ensure no manual subscribe.
  6. Map duplicate idempotency key.
  7. Add rollback test with StepVerifier.
  8. Add timeout.
  9. Add retry around whole transaction.
  10. Add metrics/checkpoints.

56. Summary

R2DBC is a powerful non-blocking relational data access model, but it is not a magic performance switch.

You must master:

  • Reactive Streams;
  • Mono/Flux;
  • connection lifecycle;
  • transaction context;
  • TransactionalOperator;
  • no manual subscribe;
  • no blocking event loop;
  • connection pool limits;
  • backpressure limits;
  • row mapping;
  • conditional update;
  • outbox in reactive transaction;
  • error propagation and rollback;
  • timeout/retry;
  • bounded concurrency;
  • testing with StepVerifier;
  • BlockHound;
  • observability;
  • tenant context;
  • cancellation;
  • driver maturity;
  • adoption decision.

Part berikutnya membahas Reactive Data Access Tradeoffs: kapan reactive masuk akal, kapan JDBC lebih benar, thread model, blocking trap, pool vs event loop, virtual threads comparison, and production decision framework.


57. References

Lesson Recap

You just completed lesson 49 in deepen practice. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.

Continue The Track

Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.