Async/Await in Python: Patterns Beyond the Basics
Exploring structured concurrency, task groups, and error propagation in Python asyncio — with testable code snippets.
I’ve been writing Python async code for years, but I keep hitting the same edge cases: silently swallowed exceptions, tasks that outlive their scope, and dangling coroutines. This post tests my understanding of three patterns that fix these issues.
The Problem: Fire-and-Forget Is a Leak
Starting a task without tracking it is the most common async antipattern:
import asyncio
async def leaky_example():
# This task runs detached - if it errors, you never know
asyncio.create_task(background_work())
await asyncio.sleep(0.1)
print("Done, but was background_work really done?")
async def background_work():
await asyncio.sleep(0.5)
raise ValueError("Something went wrong in background!")
asyncio.run(leaky_example())
The ValueError is silently swallowed. The process exits cleanly. You’d never know.
Pattern 1: Task Groups (Python 3.11+)
TaskGroup enforces structured concurrency — all child tasks complete (or error) before the group exits:
import asyncio
async def task_group_demo():
async with asyncio.TaskGroup() as tg:
tg.create_task(worker("A", 0.3))
tg.create_task(worker("B", 0.5))
tg.create_task(worker("C", 0.1))
print("All tasks complete")
async def worker(name: str, delay: float):
await asyncio.sleep(delay)
print(f"Worker {name} done")
return name
# asyncio.run(task_group_demo())
If any child raises, all siblings are cancelled immediately and the exception propagates. No silent failures.
Pattern 2: Timeout with asyncio.timeout (3.11+)
Old-style asyncio.wait_for() wrapped the entire coroutine. asyncio.timeout is a context manager — cleaner scoping:
import asyncio
async def timeout_demo():
try:
async with asyncio.timeout(1.0):
result = await slow_operation()
print(f"Got result: {result}")
except TimeoutError:
print("Operation timed out")
async def slow_operation():
await asyncio.sleep(3)
return 42
# asyncio.run(timeout_demo())
One gotcha I missed: asyncio.timeout(0) is immediate timeout, but asyncio.timeout(None) is no timeout. Using None when you think you passed a float creates subtle bugs.
Pattern 3: Async Queue With Backpressure
Producer-consumer with controlled concurrency:
import asyncio
async def queue_demo():
queue: asyncio.Queue[str] = asyncio.Queue(maxsize=5)
async def producer():
for i in range(10):
item = f"item-{i}"
await queue.put(item)
print(f"Produced {item}")
await queue.put(None) # sentinel
async def consumer(n: int):
while True:
item = await queue.get()
if item is None:
await queue.put(None) # pass sentinel for other consumers
break
await asyncio.sleep(0.1) # simulate work
print(f"Consumer {n} processed {item}")
async with asyncio.TaskGroup() as tg:
tg.create_task(producer())
tg.create_task(consumer(1))
tg.create_task(consumer(2))
# asyncio.run(queue_demo())
The sentinel pattern (None as poison pill) works for N consumers, but you need N sentinels. A common bug: sending one sentinel for three consumers — two hang forever.
Edge Cases I Missed
- CancelledError rescue:
TaskGroupre-raisesExceptionGroup. You needexcept*(3.11+) to handle individual exceptions in the group. - Queue maxsize=0:
asyncio.Queue(0)is unbounded. You must usemaxsizefor backpressure. - Shield vs TaskGroup:
asyncio.shield()prevents cancellation, but inside aTaskGroup, if a sibling raises, shielded tasks still get cancelled. Shield doesn’t protect you from structured concurrency.
Verdict
My understanding of structured concurrency was superficial. I knew TaskGroup existed but didn’t understand ExceptionGroup propagation. Pattern 1 alone fixes 80% of the async bugs I’ve encountered in production.
Score: 7.5/10 — solid on patterns, weak on edge cases (ExceptionGroup handling).