Asyncio Queue: Timeout Behavior and Error Handling

Exploring the nuances of asyncio.Queue, specifically timeout behavior in get/put operations, and how to write robust queue-based async code.

I’ve been using asyncio.Queue in several of my projects, but the behavior around timeouts and error handling was never clear — especially around get() and put() operations. This post tests my understanding with practical code samples.

The Problem: No Direct Timeout in get()/put()

Unlike Python’s queue.Queue, asyncio.Queue doesn’t support timeouts on get() or put() directly.

import asyncio

async def no_timeout_demo():
    queue = asyncio.Queue(maxsize=1)
    await queue.put("first")
    
    # This blocks forever if no one consumes the item
    # await queue.get() # No timeout supported directly
    
    # This will also block until space is available
    # await queue.put("second") # No timeout either

asyncio.run(no_timeout_demo())

Pattern 1: Using asyncio.wait_for() for Timeout

You have to wrap get()/put() in asyncio.wait_for() to apply timeouts:

import asyncio

async def timeout_demo():
    queue = asyncio.Queue(maxsize=1)
    await queue.put("first")
    
    try:
        # This times out after 0.1s if nothing is available
        item = await asyncio.wait_for(queue.get(), timeout=0.1)
        print(f"Got item: {item}")
    except asyncio.TimeoutError:
        print("Get timed out")

    try:
        # This times out if queue is full
        await asyncio.wait_for(queue.put("second"), timeout=0.1)
        print("Put succeeded")
    except asyncio.TimeoutError:
        print("Put timed out")

asyncio.run(timeout_demo())

Pattern 2: Handling QueueFull and QueueEmpty Errors

Even without timeouts, these exceptions can be raised:

import asyncio

async def error_handling_demo():
    queue = asyncio.Queue(maxsize=1)
    await queue.put("item")
    
    try:
        # This raises QueueFull
        await queue.put_nowait("another")
    except asyncio.QueueFull:
        print("Queue is full")
    
    # Remove the item to make room
    item = await queue.get()
    print(f"Got: {item}")
    
    try:
        # This raises QueueEmpty
        await queue.get_nowait()
    except asyncio.QueueEmpty:
        print("Queue is empty")

asyncio.run(error_handling_demo())

Pattern 3: Graceful Shutdown with Queue

Proper shutdown requires careful use of None sentinels and timeout handling:

import asyncio

async def graceful_shutdown_demo():
    queue = asyncio.Queue(maxsize=5)
    
    async def producer():
        for i in range(5):
            await queue.put(f"item-{i}")
            await asyncio.sleep(0.1)
        # Send sentinels for graceful shutdown
        for _ in range(3):  # number of consumers
            await queue.put(None)
    
    async def consumer(name):
        while True:
            try:
                # Use timeout to avoid hanging forever
                item = await asyncio.wait_for(queue.get(), timeout=1.0)
                if item is None:
                    print(f"Consumer {name} shutting down")
                    break
                print(f"Consumer {name} got {item}")
                await asyncio.sleep(0.2)
            except asyncio.TimeoutError:
                print(f"Consumer {name} timed out, shutting down")
                break
    
    tasks = [
        asyncio.create_task(producer()),
        asyncio.create_task(consumer("A")),
        asyncio.create_task(consumer("B"))
    ]
    
    await asyncio.gather(*tasks)

# asyncio.run(graceful_shutdown_demo())

Edge Cases I Missed

  1. put_nowait and get_nowait: These will error immediately on queue full/empty, unlike their await counterparts which block.
  2. asyncio.wait_for vs asyncio.timeout: wait_for is a function; asyncio.timeout is a context manager. Both work but wait_for is more common in older code.
  3. Timeout on task_done(): join() waits for task_done() — a TimeoutError here can be subtle if you don’t catch it.

What I Got Wrong

  • I initially thought asyncio.Queue had a timeout parameter for get() and put() — it doesn’t. All timeout logic must be handled via wait_for or timeouts around the context.
  • I also assumed get_nowait() would block and raise an error, but it just raises immediately if the queue is empty, without any blocking.

Verdict

The asyncio.Queue API is consistent with queue.Queue in terms of what it does but more constrained — no direct timeout support. This requires either wrapping in timeouts or relying on task_done() semantics to manage lifecycles. My understanding of blocking behavior is correct, but I underestimated the need for explicit timeout handling.

Score: 7.0/10 — solid foundation, improved knowledge on error management.