Async Python: Event Loop, Coroutine, Task, dan Backpressure
Part 022 — Async Python: Event Loop, Coroutine, Task, dan Backpressure
Membahas async Python secara praktis: event loop, coroutine, await, Task, TaskGroup, cancellation, timeout, semaphore, queue, backpressure, async context manager, dan design pitfalls.
Part 022 — Async Python: Event Loop, Coroutine, Task, dan Backpressure
1. Tujuan Part Ini
Async Python sangat powerful, tetapi juga mudah disalahgunakan.
Kesalahan umum:
- memakai async untuk CPU-bound work;
- menulis
async deftetapi tetap memanggil blocking I/O; - lupa
await; - membuat task tanpa menunggu hasilnya;
- menelan cancellation;
- tidak memberi timeout;
- tidak membatasi concurrency;
- membuat event loop nested;
- mencampur sync/async tanpa boundary;
- memakai async karena framework, bukan karena workload.
Part ini membangun mental model asyncio:
- event loop;
- coroutine;
await;- task;
- task group;
- cancellation;
- timeout;
- semaphore;
- queue;
- async context manager;
- backpressure;
- async/sync boundary;
- observability.
Target setelah part ini:
- Memahami kapan async cocok.
- Memahami event loop.
- Menulis coroutine.
- Memakai
await. - Memakai
asyncio.run. - Membuat task dengan benar.
- Memakai
TaskGroup. - Memahami cancellation.
- Memakai timeout.
- Membatasi concurrency dengan semaphore/queue.
- Menghindari blocking call dalam async.
- Menghubungkan async ke
case-trackerdan web/API systems.
2. Kapan Async Cocok?
Async cocok untuk I/O concurrency dengan banyak operasi menunggu:
- HTTP calls;
- database calls dengan async driver;
- websocket;
- long polling;
- network server;
- message broker async;
- subprocess async;
- many idle connections;
- event-driven service.
Async tidak otomatis cocok untuk:
- CPU-bound pure Python;
- script kecil tanpa bottleneck;
- library blocking yang tidak punya async API;
- operation yang butuh parallel CPU;
- codebase yang tim belum siap mental modelnya.
Rule:
Async membantu saat banyak operasi bisa yield saat menunggu I/O.
3. Event Loop Mental Model
Event loop adalah scheduler cooperative.
Ia menjalankan task sampai task mencapai await pada operasi yang belum selesai. Lalu event loop menjalankan task lain.
Diagram:
Async concurrency is cooperative:
- task must
awaitto let others run; - CPU-heavy code blocks loop;
- blocking I/O blocks loop;
- fairness depends on yielding.
4. Coroutine Function vs Coroutine Object
Coroutine function:
async def fetch_case(case_id: str) -> str:
return f"case:{case_id}"
Calling it does not run it immediately. It returns coroutine object:
coroutine = fetch_case("CASE-001")
To run:
result = await coroutine
Or from sync top-level:
import asyncio
result = asyncio.run(fetch_case("CASE-001"))
4.1 Common Bug: Forgot Await
Bad:
result = fetch_case("CASE-001")
print(result)
Output is coroutine object, not result.
Type checkers and linters can help catch un-awaited coroutines.
5. asyncio.run
Entry point from sync code:
import asyncio
async def main() -> None:
result = await fetch_case("CASE-001")
print(result)
if __name__ == "__main__":
asyncio.run(main())
asyncio.run creates event loop, runs coroutine, closes loop.
Do not call asyncio.run from inside already running event loop.
In frameworks like FastAPI, the framework owns the event loop. Your route handler should be async def and use await, not asyncio.run.
6. await
await waits for awaitable and yields control to event loop.
async def fetch_and_parse(case_id: str) -> Case:
raw = await fetch_case_payload(case_id)
return parse_case(raw)
At await, other tasks can run.
await does not necessarily create concurrency by itself. It yields to loop while waiting.
Sequential async:
a = await fetch("A")
b = await fetch("B")
Concurrent async:
task_a = asyncio.create_task(fetch("A"))
task_b = asyncio.create_task(fetch("B"))
a = await task_a
b = await task_b
7. Task
Task schedules coroutine to run on event loop.
task = asyncio.create_task(fetch_case("CASE-001"))
result = await task
Creating a task lets it run concurrently with current coroutine when control returns to loop.
7.1 Fire-and-Forget Is Dangerous
Bad:
asyncio.create_task(send_notification(case))
If you never await or track task:
- exceptions can be lost or logged late;
- program may exit before completion;
- cancellation not controlled;
- lifecycle unclear.
Better:
- await task;
- use
TaskGroup; - store task and manage lifecycle;
- background task manager in framework.
8. asyncio.gather
Run multiple awaitables concurrently and collect results.
results = await asyncio.gather(
fetch_case("CASE-001"),
fetch_case("CASE-002"),
fetch_case("CASE-003"),
)
Order matches input order.
If one raises, behavior depends on parameters and cancellation semantics. For structured concurrency, prefer TaskGroup in modern Python when appropriate.
9. TaskGroup
TaskGroup provides structured concurrency.
async def fetch_all(case_ids: list[str]) -> list[str]:
results: dict[str, str] = {}
async with asyncio.TaskGroup() as task_group:
tasks = {
case_id: task_group.create_task(fetch_case(case_id))
for case_id in case_ids
}
for case_id, task in tasks.items():
results[case_id] = task.result()
return [results[case_id] for case_id in case_ids]
Task group waits for tasks. If a task fails, siblings are cancelled and errors are grouped.
Structured concurrency helps avoid orphan tasks.
10. Async Sleep vs Blocking Sleep
Use:
await asyncio.sleep(1)
Do not use:
time.sleep(1)
inside async function.
time.sleep blocks entire event loop.
Demo:
async def bad() -> None:
time.sleep(1)
async def good() -> None:
await asyncio.sleep(1)
In async code, every blocking call is suspicious.
11. Blocking I/O in Async Code
Bad:
async def load_cases(path: Path) -> list[Case]:
raw = path.read_text(encoding="utf-8") # blocking
...
For small CLI, fine if not truly async app. But in async server, blocking file/database/network call can block all tasks.
Options:
- use async library/driver;
- move blocking call to thread with
asyncio.to_thread; - keep boundary sync and do not pretend async;
- use process pool for CPU-bound;
- redesign.
Example:
async def load_cases_async(path: Path) -> list[Case]:
return await asyncio.to_thread(load_cases, path)
to_thread is useful for blocking I/O integration, not magic CPU speedup.
12. Async HTTP Example Concept
With an async HTTP client library, code often looks like:
async def fetch_case(client: AsyncClient, case_id: str) -> CasePayload:
response = await client.get(f"/cases/{case_id}")
response.raise_for_status()
return parse_payload(response.json())
The important part:
- network call is awaitable;
- while waiting, loop can run other tasks;
- client/session lifecycle matters;
- timeouts are mandatory;
- concurrency must be bounded.
This part focuses on standard asyncio; actual HTTP client choice is dependency decision.
13. Timeout
Use asyncio.timeout:
async def fetch_with_timeout(case_id: str) -> str:
async with asyncio.timeout(5):
return await fetch_case(case_id)
Timeout triggers cancellation of enclosed operation.
Older style:
result = await asyncio.wait_for(fetch_case(case_id), timeout=5)
Timeout policy is part of failure semantics.
14. Cancellation
Cancellation is cooperative.
A task can be cancelled:
task.cancel()
The task receives CancelledError at an await point.
async def worker() -> None:
try:
while True:
await do_work()
except asyncio.CancelledError:
cleanup()
raise
Important:
Usually re-raise
CancelledErrorafter cleanup.
Do not swallow cancellation unless you have a very specific reason.
Bad:
except asyncio.CancelledError:
pass
This can break structured concurrency and shutdown.
15. try/finally for Async Cleanup
async def worker() -> None:
resource = await acquire_resource()
try:
await use_resource(resource)
finally:
await resource.close()
Async context manager:
async with resource:
...
Cleanup must handle cancellation too.
16. Async Context Manager
Define:
class AsyncResource:
async def __aenter__(self):
await self.open()
return self
async def __aexit__(self, exc_type, exc, tb):
await self.close()
Use:
async with AsyncResource() as resource:
await resource.do_work()
Common for:
- HTTP clients;
- DB sessions;
- transactions;
- message consumers;
- locks;
- lifespan resources.
17. Async Iterator
Async iterator yields items over time.
async def iter_case_events():
for event in events:
await asyncio.sleep(1)
yield event
Consume:
async for event in iter_case_events():
...
Use for:
- streaming API;
- websocket messages;
- async file/network stream;
- message queue consumer.
18. Semaphore for Concurrency Limit
Bad unbounded concurrency:
tasks = [asyncio.create_task(fetch_case(case_id)) for case_id in many_case_ids]
results = await asyncio.gather(*tasks)
If many_case_ids is huge, this can overload memory/API.
Use semaphore:
async def fetch_with_limit(
semaphore: asyncio.Semaphore,
case_id: str,
) -> str:
async with semaphore:
return await fetch_case(case_id)
async def fetch_many(case_ids: list[str], limit: int = 10) -> list[str]:
semaphore = asyncio.Semaphore(limit)
return await asyncio.gather(
*(fetch_with_limit(semaphore, case_id) for case_id in case_ids)
)
This limits concurrent active fetches.
19. Async Queue for Backpressure
asyncio.Queue:
async def producer(queue: asyncio.Queue[str], case_ids: list[str]) -> None:
for case_id in case_ids:
await queue.put(case_id)
await queue.put("STOP")
async def consumer(queue: asyncio.Queue[str]) -> None:
while True:
case_id = await queue.get()
try:
if case_id == "STOP":
return
await process_case(case_id)
finally:
queue.task_done()
For multiple consumers, send one sentinel per consumer.
Queue maxsize:
queue = asyncio.Queue(maxsize=100)
This gives backpressure.
20. Producer/Consumer Pattern
STOP = object()
async def producer(queue: asyncio.Queue[object], case_ids: list[str]) -> None:
for case_id in case_ids:
await queue.put(case_id)
async def consumer(name: str, queue: asyncio.Queue[object]) -> None:
while True:
item = await queue.get()
try:
if item is STOP:
return
await process_case(str(item))
finally:
queue.task_done()
async def run_pipeline(case_ids: list[str], worker_count: int = 5) -> None:
queue: asyncio.Queue[object] = asyncio.Queue(maxsize=100)
consumers = [
asyncio.create_task(consumer(f"worker-{index}", queue))
for index in range(worker_count)
]
await producer(queue, case_ids)
for _ in consumers:
await queue.put(STOP)
await queue.join()
for task in consumers:
await task
For production, handle exceptions carefully. A consumer crash can leave queue hanging unless managed.
TaskGroup can improve this.
21. Async Lock
asyncio.Lock protects async critical sections.
lock = asyncio.Lock()
counter = 0
async def increment() -> None:
global counter
async with lock:
counter += 1
Do not hold async lock while doing slow external await unless intended.
Bad:
async with lock:
await slow_network_call()
This blocks others from entering critical section during network wait.
22. Async Does Not Remove Race Conditions
Even single-threaded async can race logically.
if case_id not in case_by_id:
await asyncio.sleep(0)
case_by_id[case_id] = case
Another task can run during await.
Critical read-modify-write sequences that include await need careful design.
Rule:
In async code, any
awaitis a potential interleaving point.
23. asyncio.to_thread
Run blocking function in thread without blocking event loop.
async def load_cases_async(path: Path) -> list[Case]:
return await asyncio.to_thread(load_cases, path)
Useful for:
- blocking file I/O;
- legacy sync library;
- CPU-light blocking work.
Not a cure for CPU-bound pure Python. GIL still matters for threads in traditional builds.
For CPU-bound, use process pool:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(process_pool, cpu_work, input_data)
24. Async and Process Pool
from concurrent.futures import ProcessPoolExecutor
async def run_cpu_work(input_data: str) -> Result:
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
return await loop.run_in_executor(pool, cpu_bound_function, input_data)
Be careful not to create process pool per small task. Pool lifecycle should be managed.
In web apps, process pools need operational planning.
25. Async Error Handling
Sequential:
try:
result = await fetch_case(case_id)
except TimeoutError:
...
TaskGroup failures can raise exception group.
Simple per-task handling:
async def safe_fetch(case_id: str) -> CasePayload | None:
try:
return await fetch_case(case_id)
except Exception as error:
logger.warning("event=fetch_failed case_id=%s error=%s", case_id, error)
return None
Then gather:
results = await asyncio.gather(*(safe_fetch(case_id) for case_id in case_ids))
This avoids one failure cancelling all, if that is desired.
Failure policy must be explicit:
- fail fast?
- collect partial results?
- retry?
- skip and log?
- mark case failed?
- dead-letter queue?
26. Retry in Async Code
Naive retry:
async def fetch_with_retry(case_id: str, attempts: int = 3) -> str:
last_error: Exception | None = None
for attempt in range(1, attempts + 1):
try:
return await fetch_case(case_id)
except TransientError as error:
last_error = error
await asyncio.sleep(0.2 * attempt)
assert last_error is not None
raise last_error
Consider:
- which errors are retryable?
- backoff;
- jitter;
- total timeout;
- idempotency;
- rate limits;
- observability.
Do not retry all exceptions blindly.
27. Async Testing
With pytest, async tests need support plugin such as pytest-asyncio or anyio plugin.
Example with pytest-asyncio:
import pytest
@pytest.mark.asyncio
async def test_fetch_all_cases() -> None:
result = await fetch_all(["CASE-001", "CASE-002"])
assert result == [...]
If avoiding plugin, you can test simple coroutine using asyncio.run from sync test:
def test_fetch_case() -> None:
result = asyncio.run(fetch_case("CASE-001"))
assert result == "case:CASE-001"
But for many async tests, use proper plugin.
28. Testing Cancellation
Cancellation tests can be tricky.
Example:
async def cancellable_worker(event: asyncio.Event) -> None:
try:
await event.wait()
except asyncio.CancelledError:
raise
Test:
@pytest.mark.asyncio
async def test_worker_can_be_cancelled() -> None:
event = asyncio.Event()
task = asyncio.create_task(cancellable_worker(event))
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
Keep cancellation tests small.
29. Async Debug Mode
Enable debug mode:
PYTHONASYNCIODEBUG=1 python app.py
Or:
asyncio.run(main(), debug=True)
Debug mode can help detect:
- slow callbacks;
- forgotten awaits;
- wrong thread usage;
- resource warnings.
Use during development/troubleshooting.
30. Async Logging
Logging works in async code, but context matters.
Include:
- task name;
- case id;
- request id;
- operation;
- duration;
- retry attempt.
Task name:
task = asyncio.create_task(fetch_case(case_id), name=f"fetch-{case_id}")
Current task:
task = asyncio.current_task()
task_name = task.get_name() if task else "unknown"
Context variables (contextvars) can carry request IDs in async apps, but that is advanced and will appear later in observability discussions.
31. Async API Design
Do not hide async behind sync casually.
If function must await, it should be async:
async def fetch_case(...):
...
Do not call async from sync using asyncio.run deep inside library:
def get_case(...):
return asyncio.run(fetch_case(...))
This breaks when caller already has event loop.
Better:
- provide async API;
- provide separate sync wrapper at application boundary if needed;
- keep sync and async layers explicit.
32. Sync/Async Boundary
Common patterns:
32.1 Async core, sync CLI wrapper
async def async_main(argv: list[str] | None = None) -> int:
...
def main(argv: list[str] | None = None) -> int:
return asyncio.run(async_main(argv))
Good for CLI.
32.2 Sync core, async adapter
async def load_cases_async(path: Path) -> list[Case]:
return await asyncio.to_thread(load_cases, path)
Use for small integration with blocking code.
32.3 Separate implementations
For libraries, sometimes provide both:
class CaseClient:
def get_case(...): ...
class AsyncCaseClient:
async def get_case(...): ...
Do not mix randomly.
33. Case Tracker: Async Use Cases
Current CLI does not need async.
Potential async evolution:
- Fetch enrichment from external API for many cases.
- Send notifications concurrently.
- Consume case events from message broker.
- Web API with async framework.
- Websocket status updates.
- Async database driver.
For local JSON file operations, async is not necessary.
Decision:
Do not convert
case-trackerto async until there is real async I/O boundary.
34. Case Tracker: Async Enrichment Sketch
Protocol:
class AsyncCaseEnrichmentClient(Protocol):
async def enrich(self, case: Case) -> EnrichedCase:
...
Implementation uses async HTTP client.
Service:
async def enrich_cases(
cases: list[Case],
client: AsyncCaseEnrichmentClient,
*,
concurrency_limit: int = 10,
) -> list[EnrichedCase]:
semaphore = asyncio.Semaphore(concurrency_limit)
async def enrich_one(case: Case) -> EnrichedCase:
async with semaphore:
async with asyncio.timeout(5):
return await client.enrich(case)
return await asyncio.gather(*(enrich_one(case) for case in cases))
This includes:
- async dependency;
- concurrency limit;
- timeout;
- result order preserved by gather.
Error policy still needed.
35. Case Tracker: Partial Failure Policy
Instead of fail-fast:
@dataclass(frozen=True)
class EnrichmentResult:
case_id: CaseId
enriched_case: EnrichedCase | None
error: str | None
Safe function:
async def safe_enrich_one(case: Case, client: AsyncCaseEnrichmentClient) -> EnrichmentResult:
try:
async with asyncio.timeout(5):
enriched = await client.enrich(case)
return EnrichmentResult(case.id, enriched, None)
except Exception as error:
return EnrichmentResult(case.id, None, str(error))
Then gather results.
This is often better for batch operations.
36. Async Backpressure for Case Events
async def consume_case_events(queue: asyncio.Queue[CaseEvent]) -> None:
while True:
event = await queue.get()
try:
await handle_event(event)
finally:
queue.task_done()
Queue maxsize prevents memory blowup.
queue: asyncio.Queue[CaseEvent] = asyncio.Queue(maxsize=1000)
If producers are faster than consumers, await queue.put(event) slows producers.
37. Common Async Mistakes
async defwith blockingtime.sleep.- Blocking HTTP client in async function.
- Blocking DB driver in async route.
- Creating tasks and never awaiting.
- Unbounded
gatherover huge input. - No timeout.
- Swallowing
CancelledError. - Calling
asyncio.runinside event loop. - CPU-bound work in event loop.
- Shared mutable state with awaits between read/write.
- No concurrency limit.
- No clear failure policy.
- Tests rely on sleeps.
- Mixing sync/async APIs without design.
- Assuming async means faster for everything.
38. Async Decision Framework
Ask:
- Is the bottleneck I/O wait?
- Are dependencies async-compatible?
- Will many operations be in-flight concurrently?
- Is team comfortable with async mental model?
- Do we need cancellation/timeouts?
- Is backpressure designed?
- Are errors fail-fast or partial?
- Is CPU work offloaded?
- Are blocking calls isolated?
- Is observability ready?
- Are tests deterministic?
- Is async boundary clear?
If answer to 1 or 2 is no, async may not be the right tool.
39. Practice: Basic Coroutine
async def normalize_case_id_later(case_id: str) -> str:
await asyncio.sleep(0)
return case_id.strip().upper()
Run:
def test_normalize_case_id_later() -> None:
result = asyncio.run(normalize_case_id_later(" case-001 "))
assert result == "CASE-001"
This is artificial but teaches coroutine execution.
40. Practice: Concurrent Fetch Simulation
async def fetch(case_id: str) -> str:
await asyncio.sleep(0.1)
return f"fetched:{case_id}"
async def fetch_all(case_ids: list[str]) -> list[str]:
return await asyncio.gather(*(fetch(case_id) for case_id in case_ids))
Measure sequential vs concurrent with perf_counter.
41. Practice: Semaphore Limit
Implement:
async def fetch_all_limited(case_ids: list[str], limit: int) -> list[str]:
semaphore = asyncio.Semaphore(limit)
async def fetch_one(case_id: str) -> str:
async with semaphore:
return await fetch(case_id)
return await asyncio.gather(*(fetch_one(case_id) for case_id in case_ids))
Add logging to see max in-flight count.
42. Practice: Timeout
async def slow_fetch() -> str:
await asyncio.sleep(10)
return "done"
async def fetch_with_timeout() -> str:
async with asyncio.timeout(0.1):
return await slow_fetch()
Test expects timeout.
43. Practice: Cancellation
async def worker() -> None:
try:
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
# cleanup here
raise
Create task, cancel, await, assert CancelledError.
44. Practice: Async Queue
Build producer/consumer:
- queue maxsize 2;
- two consumers;
- process five case ids;
- sentinel shutdown;
- collect processed ids.
Test:
- all ids processed;
- no duplicates;
- consumers exit.
45. Self-Check
Jawab tanpa melihat materi:
- Kapan async cocok?
- Kapan async tidak cocok?
- Apa itu event loop?
- Apa itu coroutine function?
- Apa itu coroutine object?
- Kenapa lupa
awaitberbahaya? - Apa fungsi
asyncio.run? - Apa beda await sequential dan create_task concurrent?
- Apa risiko fire-and-forget task?
- Apa fungsi TaskGroup?
- Kenapa
time.sleepburuk di async function? - Apa guna
asyncio.to_thread? - Apa itu cancellation cooperative?
- Kenapa
CancelledErrorbiasanya harus di-raise ulang? - Apa guna semaphore?
- Apa guna async queue?
- Kenapa async tetap bisa race?
- Apa itu backpressure?
- Bagaimana test coroutine?
- Apa async mistake paling berbahaya?
46. Definition of Done Part 022
Kamu selesai part ini jika bisa:
- Menjelaskan event loop.
- Menulis coroutine.
- Menjalankan coroutine dengan
asyncio.run. - Menjelaskan
await. - Menjalankan tasks concurrent.
- Memakai
asyncio.gather. - Memakai
TaskGroup. - Memakai timeout.
- Menangani cancellation dengan benar.
- Memakai semaphore.
- Memakai async queue.
- Menghindari blocking call dalam async.
- Memakai
asyncio.to_threaduntuk blocking sync integration. - Menjelaskan sync/async boundary.
- Memutuskan apakah
case-trackerperlu async atau tidak.
47. Ringkasan
Async Python adalah alat untuk I/O concurrency, bukan solusi universal.
Inti part ini:
- event loop menjalankan task secara cooperative;
- coroutine harus di-await atau dijadwalkan sebagai task;
awaitadalah titik yield/interleaving;- async bagus untuk banyak I/O yang menunggu;
- async buruk untuk CPU-bound jika tidak offload;
- blocking call dalam event loop merusak concurrency;
- task lifecycle harus dikelola;
TaskGroupmembantu structured concurrency;- timeout dan cancellation wajib dipikirkan;
- semaphore dan queue memberi backpressure;
- async code tetap bisa race jika shared state berubah di antara await;
- sync/async boundary harus eksplisit;
case-trackertidak perlu async sampai ada real async I/O boundary.
Part berikutnya akan membahas performance engineering: measurement before optimization, profiling, benchmarking, algorithmic complexity, memory, and practical optimization strategy.
48. Referensi
- Python Documentation —
asyncio. - Python Documentation — Coroutines and Tasks.
- Python Documentation — Event Loop.
- Python Documentation — Developing with asyncio.
- Python Documentation — Synchronization Primitives.
- Python Documentation — Queues.
- Python Documentation —
concurrent.futures.
You just completed lesson 22 in deepen practice. Use the series map if you want to review the broader track, or continue directly into the next lesson while the context is still warm.
Keep the momentum while the lesson is still fresh. Move backward for review or continue forward into the next concept.