33  Asynchronous Programming

NoteCore idea

asyncio enables high-concurrency I/O in a single thread via native coroutines. The key: await suspends the current coroutine and gives control to the event loop, which runs other coroutines while I/O waits.

In this chapter you will learn to:

  1. Define a native coroutine with async def and await.
  2. Schedule and run coroutines with asyncio.run, asyncio.create_task, and asyncio.gather.
  3. Iterate over coroutines as they finish with asyncio.as_completed.
  4. Throttle concurrency with asyncio.Semaphore.
  5. Run blocking code from async with loop.run_in_executor.

33.1 A few definitions

  • Coroutine: a function defined with async def. Calling it does not run the body — it returns a coroutine object.
  • Task: a coroutine wrapped in a Future, scheduled by the event loop.
  • Awaitable: anything with __await__ — coroutines, Tasks, Futures.

A useful reading trick: when you see result = await some_coroutine(), read it as result = some_coroutine() — the value flow looks like a normal function call. The difference is that the current coroutine yields control while waiting, so the event loop can run other coroutines.

33.2 A first asyncio program

asyncio.run(coro) is the entry point in a script. It starts an event loop, runs the top-level coroutine, and shuts the loop down.

TipTop-level await in notebooks

The cells below use await main() directly because Jupyter (and Quarto’s Jupyter engine) already runs an event loop, so calling asyncio.run from inside it raises RuntimeError. In a .py script, replace each await main() with asyncio.run(main()).

import asyncio

async def hello(name, delay):
    await asyncio.sleep(delay)
    return f"hello, {name}"

async def main():
    return await hello("world", 0.05)

await main()
'hello, world'

Walking through the pieces:

  • async def hello(...) declares a coroutine function. Calling hello("world", 0.05) does not run the body — it returns a coroutine object that has to be await-ed (or scheduled as a task) before it does anything.
  • await asyncio.sleep(delay) is the canonical “yield control for a while” call. The current coroutine pauses; the event loop is free to run anything else; after the delay, the loop resumes this coroutine.
  • return f"hello, {name}" is just like a normal function return — the await on the caller’s side receives this value.
  • await main() runs main to completion. In a Jupyter cell we use await directly because Jupyter already has an event loop; in a script you’d write asyncio.run(main()).

The general pattern: async def defines a coroutine; await is the yield point. A coroutine is just code until something — asyncio.run, create_task, or another await — drives it.

flowchart TB
  L["EventLoop"]
  L -->|schedule| C1["coroutine A"]
  L -->|schedule| C2["coroutine B"]
  L -->|schedule| C3["coroutine C"]
  C1 -. "await asyncio.sleep" .-> L
  C2 -. "await asyncio.sleep" .-> L
  C3 -. "await asyncio.sleep" .-> L

await hands control back to the loop; the loop runs whatever else is ready. That’s the entire mechanism behind concurrent I/O on a single thread.

33.3 Running coroutines concurrently

To run several coroutines concurrently, schedule them with asyncio.create_task or collect them with asyncio.gather:

import time

async def main():
    t0 = time.perf_counter()
    results = await asyncio.gather(
        hello("a", 0.1),
        hello("b", 0.1),
        hello("c", 0.1),
    )
    return results, f"{time.perf_counter() - t0:.2f}s"

await main()
(['hello, a', 'hello, b', 'hello, c'], '0.10s')

Walking through gather:

  • hello("a", 0.1), hello("b", 0.1), hello("c", 0.1) are three coroutine objects — not yet running.
  • asyncio.gather(*coros) schedules them all and returns an awaitable that completes when every one finishes.
  • await on that awaitable waits for the whole batch. While each hello is paused inside asyncio.sleep, the event loop can run the others — that’s where the concurrency comes from.
  • Results come back in the order the coroutines were submitted, not the order they finished. Compare with as_completed later in this chapter.

Three sleeps of 0.1 seconds finished in ~0.1 seconds — they were concurrent, not sequential. asyncio.gather waits for all of them and returns their results in order.

asyncio.create_task(coro) schedules a coroutine and returns a Task immediately, so you can fire something off and continue:

async def main():
    task = asyncio.create_task(hello("background", 0.05))
    print("(doing something else)")
    return await task

await main()
(doing something else)
'hello, background'

Walking through this shape:

  • asyncio.create_task(coro) wraps the coroutine in a Task and schedules it on the loop now. The function returns immediately — the task starts running while we keep going.
  • print("(doing something else)") runs while the task waits in the background. The two interleave on the same thread.
  • await task joins the task and returns its result. If we never awaited, the task would still run, but its return value would be lost.

The general pattern: gather for “wait for all of these”; create_task for “fire and forget, await later”. Both schedule coroutines; the difference is who does the awaiting and when.

33.4 Structured concurrency with TaskGroup

Python 3.11 added asyncio.TaskGroup — the modern replacement for gather when you want structured concurrency. If any task raises, sibling tasks are cancelled and the group re-raises an ExceptionGroup. Prefer it for new code. Build it in two steps.

Step 1: the happy path. Three tasks scheduled in a group; the block exits when all of them finish:

async def main():
    async with asyncio.TaskGroup() as tg:
        t1 = tg.create_task(hello("a", 0.1))
        t2 = tg.create_task(hello("b", 0.1))
        t3 = tg.create_task(hello("c", 0.1))
    return [t1.result(), t2.result(), t3.result()]

await main()
['hello, a', 'hello, b', 'hello, c']
  • async with asyncio.TaskGroup() as tg: opens an async context manager — same shape as with, but it implements __aenter__ / __aexit__ (the awaitable cousins of __enter__ / __exit__).
  • tg.create_task(coro) schedules a task that’s owned by the group. Each call returns immediately, just like asyncio.create_task.
  • The async with block exits after every owned task completes — the group implicitly awaits its children. No explicit gather needed.
  • t1.result() reads the finished task’s return value. Because the group already awaited the tasks, .result() is non-blocking here.

Step 2: a sibling fails. When any task raises, the group cancels the other tasks and re-raises the collected failures as an ExceptionGroup. Catch them with except*:

async def boom():
    raise ValueError("kaboom")

async def main():
    errors = []
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(hello("a", 0.1))
            tg.create_task(boom())
            tg.create_task(hello("c", 0.1))
    except* ValueError as eg:
        errors.extend(str(e) for e in eg.exceptions)
    return errors

await main()
['kaboom']
  • When boom() raises, the group requests cancellation on every other task and re-raises an ExceptionGroup once they all settle.
  • except* ValueError is except for groups — it matches every ValueError inside the ExceptionGroup. Other types in the group would propagate.
  • eg.exceptions is the tuple of matched exceptions inside that subgroup.
  • No orphan tasks running silently after a failure — that’s the safety the structured form gives you.

The general pattern: prefer TaskGroup over gather for new code. Same concurrency, plus structured cancellation and clean error aggregation. See Section 30.6 for the ExceptionGroup / except* machinery in detail.

33.5 Deadlines with asyncio.timeout

asyncio.timeout (3.11) is a context manager — anything awaited inside the block is bounded by the deadline. Cleaner than wait_for and composes with TaskGroup.

async def main():
    try:
        async with asyncio.timeout(0.05):
            await asyncio.sleep(0.5)
    except TimeoutError:
        return "timed out"

await main()
'timed out'

Walking through the timeout dance:

  • asyncio.timeout(0.05) is an async context manager that arms a 50-millisecond deadline when we enter.
  • Inside the block, await asyncio.sleep(0.5) would normally pause for half a second — but the deadline fires first.
  • When the deadline expires, the timeout context manager cancels the awaiting coroutine. The await raises CancelledError internally, which the context manager translates to TimeoutError on exit.
  • The outer try catches TimeoutError. The cleanup is automatic — the timer was scoped to the async with block.

Inside the block, any await that exceeds the deadline raises TimeoutError. Outside, the timer is gone — no cleanup needed.

The general pattern: asyncio.timeout(secs) puts a deadline on a region of awaits — far cleaner than threading a timeout argument through every call.

33.6 Iterating as coroutines complete

asyncio.as_completed is the async cousin of concurrent.futures.as_completed. It yields awaitables in the order they finish, not the order you submitted them:

async def slow(label, delay):
    await asyncio.sleep(delay)
    return label

async def main():
    coros = [slow("a", 0.3), slow("b", 0.1), slow("c", 0.2)]
    finished = []
    for coro in asyncio.as_completed(coros):
        finished.append(await coro)
    return finished

await main()
['b', 'c', 'a']

Walking through the loop:

  • coros is a list of three coroutine objects with delays 0.3, 0.1, 0.2 — submission order, not finish order.
  • asyncio.as_completed(coros) returns an iterator. Each item is an awaitable that becomes ready when some coroutine in the input finishes — but you don’t know which one until you await.
  • await coro inside the loop unwraps the next-finished coroutine’s return value. The shortest sleep finishes first, so b arrives first, then c, then a.
  • We collect the results in finished in the order they arrived — ['b', 'c', 'a'].

b finished first because it had the shortest sleep. The loop awaits each future as it becomes ready.

The general pattern: gather returns results in submission order after everything finishes; as_completed lets you process each result as it arrives — useful for streaming output, progress reporting, or early-exit logic.

33.7 Throttling with Semaphore

You don’t always want unbounded concurrency. A semaphore limits how many coroutines can be inside a critical region at once:

async def fetch(label, delay, sem):
    async with sem:
        await asyncio.sleep(delay)
        return label

async def main():
    sem = asyncio.Semaphore(2)
    coros = [fetch(f"r{i}", 0.05, sem) for i in range(6)]
    t0 = time.perf_counter()
    results = await asyncio.gather(*coros)
    return results, f"{time.perf_counter() - t0:.3f}s"

await main()
(['r0', 'r1', 'r2', 'r3', 'r4', 'r5'], '0.151s')

Walking through the throttle:

  • asyncio.Semaphore(2) builds a counter starting at 2. Each async with sem: decrements it on entry and increments it on exit.
  • When the counter is at 0, a new async with sem: awaits until someone releases — that’s the cooperative throttle.
  • fetch does its sleep inside the async with sem: block, so only two fetch coroutines can be sleeping at once. The other four wait their turn.
  • asyncio.gather(*coros) schedules all six. The semaphore enforces the cap; gather waits for all of them.

Six tasks; two-at-a-time throttle; six 0.05-s tasks complete in ~0.15 s instead of ~0.30 s sequential or ~0.05 s unbounded. The async with sem: form is exactly like the synchronous with — it acquires on entry, releases on exit.

The general pattern: when you want concurrency up to a limit (rate limits, connection pools, “don’t pound this API”), share one Semaphore across the tasks and gate the work region with async with.

33.8 Async file I/O? No. Async network I/O? Yes.

The realistic shape of async code uses httpx.AsyncClient (or aiohttp) to fetch many URLs concurrently:

import asyncio, httpx
from collections import Counter

MAX_CONCURRENT = 5

async def get_one(client, url, sem):
    async with sem:
        resp = await client.get(url, timeout=5)
        return resp.status_code

async def main(urls):
    sem = asyncio.Semaphore(MAX_CONCURRENT)
    async with httpx.AsyncClient() as client:
        statuses = await asyncio.gather(
            *(get_one(client, u, sem) for u in urls)
        )
    return Counter(statuses)

async with httpx.AsyncClient() is itself an async context manager — the __aenter__/__aexit__ analogues of __enter__/__exit__.

33.9 Bridging async and blocking code

If you must call a blocking function from an async context, hand it to a thread pool with loop.run_in_executor:

import time

def blocking(n):
    time.sleep(0.05)
    return n * 2

async def main():
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, blocking, 21)

await main()
42

Walking through the bridge:

  • blocking is a plain (non-async) function — time.sleep would freeze the event loop if called directly from a coroutine.
  • asyncio.get_running_loop() returns the current event loop. We need it because run_in_executor is a method on the loop, not a top-level function.
  • loop.run_in_executor(None, blocking, 21) schedules blocking(21) on a thread pool and returns an awaitable. The event loop stays free to run other coroutines while the thread does its blocking work.
  • await waits for the thread pool result without blocking the loop.
  • None means “use the default thread pool.” Pass a ProcessPoolExecutor instead if the function is CPU-bound and benefits from a real second core.

The pattern keeps the event loop responsive — the blocking work happens off-loop.

The general pattern: never call a blocking function directly from a coroutine. Hand it to run_in_executor so the event loop keeps serving the rest of your coroutines.

TipWhy this matters

asyncio is not faster than threads for a single operation — it’s better for many concurrent operations in one process. Where threads need OS scheduling and memory per thread (MB), asyncio handles thousands of concurrent connections with one thread and bytes per coroutine. The await keyword is the cooperative yield point where the event loop takes back control.

33.10 Build: a throttled, deadline-bounded fetcher with TaskGroup

Every real async program has the same shape: fan many I/O calls out concurrently, throttle the count so we don’t overwhelm the upstream, and put a deadline on each call so a single slow target can’t stall the batch. We’ll layer those three pieces — TaskGroup, Semaphore, asyncio.timeout — onto a simulated fetcher.

Step 1: a TaskGroup fan-out with structured error handling. TaskGroup is the modern replacement for gather. If any task raises, the others are cancelled and the failures arrive in an ExceptionGroup:

import asyncio

async def fetch(url):
    await asyncio.sleep(0.05)
    if url == "boom":
        raise RuntimeError(f"upstream rejected {url!r}")
    return f"<{url}>"

async def main():
    urls = ["a", "b", "c"]
    results = []
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch(u)) for u in urls]
    return [t.result() for t in tasks]

await main()
['<a>', '<b>', '<c>']

async with asyncio.TaskGroup() as tg: opens a structured-concurrency block. Each tg.create_task(coro) schedules a coroutine; the block doesn’t exit until every owned task settles. Because no task raised here, all three return cleanly and t.result() is safe to call after the block.

Step 2: throttle with a Semaphore. The fetcher above runs every URL concurrently — fine for three URLs, dangerous for a thousand against a real API. asyncio.Semaphore(N) gates the work region so at most N coroutines run inside the critical section at once:

import time

async def fetch(url, sem):
    async with sem:                      # acquire on entry, release on exit
        await asyncio.sleep(0.05)
        return f"<{url}>"

async def main():
    sem = asyncio.Semaphore(2)           # at most 2 concurrent fetches
    urls = [f"u{i}" for i in range(6)]
    t0 = time.perf_counter()
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch(u, sem)) for u in urls]
    elapsed = time.perf_counter() - t0
    return [t.result() for t in tasks], round(elapsed, 2)

await main()
(['<u0>', '<u1>', '<u2>', '<u3>', '<u4>', '<u5>'], 0.15)

async with sem: is the cooperative gate — when the count is at zero, the next entry awaits until someone leaves. Six tasks at 0.05 s each, two at a time, take roughly 3 × 0.05 ≈ 0.15 s. Without the semaphore, all six would sleep concurrently and finish in 0.05 s; with Semaphore(1) they’d serialise to 0.30 s. The semaphore is the choke point.

Step 3: per-task deadline with asyncio.timeout. Even with a semaphore, one stuck upstream can pin a slot. asyncio.timeout(secs) puts a deadline around a region of awaits — anything that exceeds it raises TimeoutError and the slot is released:

async def fetch(url, sem, deadline):
    try:
        async with sem:
            async with asyncio.timeout(deadline):
                await asyncio.sleep(0.5 if url == "slow" else 0.05)
                return f"<{url}>"
    except TimeoutError:
        return f"<TIMEOUT {url}>"

async def main():
    sem = asyncio.Semaphore(3)
    urls = ["a", "b", "slow", "c", "d"]
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(fetch(u, sem, deadline=0.1)) for u in urls]
    return [t.result() for t in tasks]

await main()
['<a>', '<b>', '<TIMEOUT slow>', '<c>', '<d>']

asyncio.timeout(0.1) arms a 100-ms deadline on the inner block. The “slow” URL would sleep 0.5 s — past the deadline — so the timeout cancels its await and raises TimeoutError, which the local try/except catches and converts to a sentinel value. Every other URL completes normally. Crucially: the semaphore is released when the inner block exits (whether via success or TimeoutError), so the slow task doesn’t permanently block a slot.

The build is the chapter at full strength: TaskGroup for structured fan-out (Step 1), Semaphore for cooperative throttling (Step 2), asyncio.timeout for per-task deadlines that release shared resources on failure (Step 3). All three combine cleanly because each is an async with block — the same composition style as plain with.

33.11 Exercises

  1. gather order. Submit three coroutines with sleep durations 0.3, 0.1, 0.2. Verify gather returns results in submission order. Verify as_completed does not.

  2. The forgotten await. Remove await from the gather call (so it returns the coroutine object instead of the result). What does Python warn about? Why?

  3. Semaphore as rate limit. Use asyncio.Semaphore(3) to limit concurrency to 3. Time 12 tasks of 0.1 s each. Does the wall time match your prediction?

  4. run_in_executor for CPU. Run the is_prime function (from Chapter 32) inside an async def main using a ProcessPoolExecutor. Verify the event loop stays responsive (e.g., a “tick” task continues to print).

  5. Why no async file I/O. The standard library has no asyncio.open(...) for files. Why? (Hint: think about how the OS handles disk vs. network reads.)

NoteFurther reading

Beazley, Python Distilled §5.23 (“Asynchronous Functions and await”) and §9.14.4 (the asyncio operational reference) together give a more low-level, manual-style treatment of the event loop, blocking-call escapes, and async I/O patterns — a useful counterpoint to Ramalho’s higher-level narrative.

33.12 Summary

asyncio is the model for I/O at scale. The vocabulary — async def, await, gather, as_completed, Semaphore, run_in_executor — is small and the surface area is consistent. For thousands of concurrent network calls, this is the right tool.

That closes Part IV. Part V — the deep end — turns to metaprogramming: dynamic attributes, descriptors, and metaclasses. We start in Chapter 34 with the simplest of the three: __getattr__ and @property.