32  Concurrent Executors

NoteCore idea

concurrent.futures provides a high-level, uniform interface for both thread-based and process-based concurrency. ThreadPoolExecutor for I/O, ProcessPoolExecutor for CPU — same API for both.

In this chapter you will learn to:

  1. Use executor.map to parallelize a function across an iterable.
  2. Switch between threads and processes by changing one identifier.
  3. Submit work with executor.submit and consume results with as_completed.
  4. Handle errors per task instead of letting one failure abort the batch.
  5. Use ProcessPoolExecutor for CPU-bound work to bypass the GIL.

32.1 A sequential baseline

To see what concurrent.futures buys you, start with a sequential version. The pattern below is what every “fetch many” script looks like:

def download_one(item):
    # do the work for `item`
    ...
    return result

def download_many(items):
    return [download_one(x) for x in items]

For 20 items each taking a second, that’s 20 seconds. We want to overlap them.

32.2 executor.map

The simplest parallel form is executor.map(func, iterable). It looks just like the built-in map, but the calls run in parallel:

from concurrent.futures import ThreadPoolExecutor
import time

def slow_double(x):
    time.sleep(0.1)
    return x * 2

t0 = time.perf_counter()
with ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(slow_double, range(10)))
elapsed = time.perf_counter() - t0
results, f"{elapsed:.2f}s"
([0, 2, 4, 6, 8, 10, 12, 14, 16, 18], '0.20s')

Walking through the moving parts:

  • slow_double(x) simulates a slow I/O call: sleep for 100 ms, then do the work. time.sleep releases the GIL, so threads really do overlap.
  • ThreadPoolExecutor(max_workers=5) builds a pool of five worker threads. The with block guarantees executor.shutdown() is called on exit — even if the body raises.
  • executor.map(slow_double, range(10)) schedules 10 calls. The pool grabs five at a time; as each finishes, the next is picked up. Results stream back in submission order.
  • We wrap it in list(...) to drain the iterator — executor.map is lazy, like the built-in map. Without list, we’d leave the block before the results materialized.

Ten 100-ms tasks finished in ~200 ms with five workers — five-way overlap. The with block ensures the pool is shut down cleanly when the block exits.

The general pattern: when you have N independent calls and want results in submission order, executor.map is the one-liner. Reach for submit/as_completed (next) only when you need per-task control.

Switching from threads to processes is one identifier:

from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
    results = list(executor.map(slow_double, range(10)))

Same code, different machinery. Threads share memory and are good for I/O. Processes have their own memory and bypass the GIL — good for CPU work.

32.3 submit and as_completed

executor.map returns results in submission order. When you want results as they finish — and want to handle each one individually — use submit plus as_completed:

from concurrent.futures import ThreadPoolExecutor, as_completed

def task(name, duration):
    time.sleep(duration)
    if name == "B":
        raise RuntimeError("B failed")
    return f"{name} done"

with ThreadPoolExecutor(max_workers=4) as executor:
    futures = {
        executor.submit(task, name, dur): name
        for name, dur in [("A", 0.2), ("B", 0.1), ("C", 0.3)]
    }
    for fut in as_completed(futures):
        name = futures[fut]
        try:
            print(name, "→", fut.result())
        except Exception as exc:
            print(name, "raised", exc)
B raised B failed
A → A done
C → C done

Walking through the pattern:

  • task(name, duration) sleeps for duration and then either returns or raises. Task B always raises — that’s the failure case we want to inspect.
  • executor.submit(task, name, dur) schedules one call and returns a Future immediately, before the work starts. We don’t wait here.
  • The dict comprehension builds {future: name} so we can recover which task each future belonged to when it finishes. Futures are anonymous; we tag them via the dict.
  • as_completed(futures) yields futures in the order they finish, not the order they were submitted. B finishes first (0.1s), then A (0.2s), then C (0.3s).
  • fut.result() returns the value if the task succeeded; re-raises the exception if it failed. We wrap it in try/except so one bad task doesn’t take down the loop.
  • With executor.map, the first exception aborts the iteration — you’d lose the results that came after it.

executor.submit(func, *args) returns immediately with a Future object. The future gives you .result() (blocks until done; re-raises exceptions), .done(), .cancel(), and .add_done_callback().

The general pattern: submit + as_completed is the “process results as they arrive” shape — necessary when tasks have different durations, when you want per-task error handling, or when you need to report progress as items complete.

32.4 A real-world pattern: download with retries

For network code, you typically want per-task error handling, status reporting, and a worker count not equal to the default (which is the number of CPUs):

from concurrent.futures import ThreadPoolExecutor, as_completed
import httpx

def get_flag(base_url, cc):
    url = f"{base_url}/{cc}/{cc}.gif".lower()
    resp = httpx.get(url, timeout=3.1, follow_redirects=True)
    resp.raise_for_status()
    return resp.content

def download_many(cc_list, base_url):
    results = {}
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(get_flag, base_url, cc): cc for cc in cc_list}
        for fut in as_completed(futures):
            cc = futures[fut]
            try:
                results[cc] = fut.result()
            except Exception as exc:
                print(f"{cc}: {exc}")
    return results

The same pattern with ProcessPoolExecutor would be wrong — processes pay startup cost and can’t share an HTTP client cleanly. For network I/O, ThreadPoolExecutor (or asyncio — see Chapter 33) is the right tool.

32.5 Processes for CPU-bound work

For CPU-bound work, the swap from threads to processes is exactly where the speedup comes from:

from concurrent.futures import ProcessPoolExecutor
from math import isqrt
import time

def is_prime(n):
    if n < 2: return False
    if n == 2: return True
    if n % 2 == 0: return False
    for i in range(3, isqrt(n) + 1, 2):
        if n % i == 0:
            return False
    return True

candidates = [11_111_111_111_111_111, 11_111_111_111_111_113,
              999_999_999_999_999_989, 1_000_000_000_000_000_001]

t0 = time.perf_counter()
with ProcessPoolExecutor() as executor:
    results = list(executor.map(is_prime, candidates))
elapsed = time.perf_counter() - t0
print(list(zip(candidates, results)), f"{elapsed:.2f}s")

Each is_prime call runs in its own process; they truly run in parallel on multiple cores. The with block joins them all and tears down the pool.

NoteWhy this isn’t executed in the rendered book

ProcessPoolExecutor pickles the target function and ships it to each worker. Functions defined inside a notebook cell don’t have a stable module path, so the workers can’t import them — they raise BrokenProcessPool. To run this example, save the code as a .py file and execute it with python script.py.

TipWhy this matters

executor.map() = familiar map() API but concurrent. executor.submit() + as_completed() = more control — handles errors per item, processes results as they complete. The switch from ThreadPoolExecutor to ProcessPoolExecutor is often one word in the source code — same API, totally different machinery.

32.6 Build: a thread-pool fanout with per-task progress and error handling

The chapter showed the two concurrent.futures shapes — executor.map for ordered results, submit + as_completed for as-completed results. We’ll combine them into one running build that downloads a list of URLs, reports progress as each finishes, and isolates per-task failures so one bad URL doesn’t take down the batch.

Step 1: a simulated fetch and the executor.map baseline. time.sleep simulates I/O; one URL deliberately raises so we can see the failure mode of map:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def fetch(url):
    time.sleep(0.1)
    if url == "bad":
        raise ValueError(f"failed to fetch {url!r}")
    return f"<contents of {url}>"

urls = ["alpha", "beta", "gamma", "delta"]

with ThreadPoolExecutor(max_workers=4) as ex:
    results = list(ex.map(fetch, urls))
results
['<contents of alpha>',
 '<contents of beta>',
 '<contents of gamma>',
 '<contents of delta>']

ex.map(fetch, urls) schedules the four calls on the pool and returns an iterator that yields results in submission order. list(...) drains it inside the with block — executor.map is lazy, so leaving the block before draining would let the pool shut down with results un-collected.

Step 2: submit + as_completed for live progress. When tasks have different durations and you want to report progress as each finishes (not in submission order), submit is the tool:

durations = {"alpha": 0.3, "beta": 0.1, "gamma": 0.2, "delta": 0.4}

def fetch_timed(url):
    time.sleep(durations.get(url, 0.1))
    return f"<contents of {url}>"

events = []
with ThreadPoolExecutor(max_workers=4) as ex:
    futures = {ex.submit(fetch_timed, url): url for url in durations}
    for fut in as_completed(futures):
        url = futures[fut]
        events.append((url, fut.result()))
events
[('beta', '<contents of beta>'),
 ('gamma', '<contents of gamma>'),
 ('alpha', '<contents of alpha>'),
 ('delta', '<contents of delta>')]

futures = {ex.submit(...): url for url in ...} maps each scheduled Future back to the URL that produced it — futures are anonymous, so we tag them in a dict. as_completed(futures) yields each future the moment it finishes; beta (0.1s) finishes first, alpha (0.3s) before delta (0.4s). The events list captures the actual completion order.

Step 3: per-task error isolation — bad URLs don’t kill the batch. With executor.map from Step 1, the first exception aborts iteration. With submit + as_completed plus a per-task try/except, every other task still runs:

def fetch_or_fail(url):
    time.sleep(0.05)
    if url.startswith("!"):
        raise RuntimeError(f"bad url: {url!r}")
    return f"<{url}>"

mixed = ["one", "!boom", "two", "!nope", "three"]

ok, errors = [], []
with ThreadPoolExecutor(max_workers=4) as ex:
    futures = {ex.submit(fetch_or_fail, url): url for url in mixed}
    for fut in as_completed(futures):
        url = futures[fut]
        try:
            ok.append((url, fut.result()))
        except Exception as exc:
            errors.append((url, str(exc)))

[ok, errors]
[[('one', '<one>'), ('two', '<two>'), ('three', '<three>')],
 [('!boom', "bad url: '!boom'"), ('!nope', "bad url: '!nope'")]]

fut.result() re-raises the task’s exception when called — that’s what makes the per-task try/except work. ok collects successes; errors collects failures with their messages. The bad URLs are isolated; the good ones complete. With executor.map instead, the first !boom would abort the iteration and two/three/!nope would never be reported.

The build is the chapter’s two shapes side by side: executor.map for “I want results in order, fail loudly” (Step 1), submit + as_completed for “I want progress as it happens” (Step 2), and the same pattern with per-task try/except fut.result() for “isolate failures, complete the rest” (Step 3). Switching the pool to ProcessPoolExecutor would parallelise CPU work the same way, with the pickling caveat from earlier in the chapter.

32.7 Exercises

  1. Threads vs. processes. Time is_prime over a list with ThreadPoolExecutor and with ProcessPoolExecutor. Predict and verify the difference.

  2. as_completed order. Submit five tasks with random sleep durations. Print results as they arrive — verify they don’t come back in submission order.

  3. max_workers. Why is the default max_workers for ThreadPoolExecutor min(32, os.cpu_count() + 4)? When would you use 100? When 1?

  4. Future.add_done_callback. Replace the as_completed loop with add_done_callback on each future. Compare readability.

  5. One word change. Take a working ThreadPoolExecutor script. Change one identifier to use ProcessPoolExecutor. What broke? (Hint: think about pickling and shared state.)

NoteFurther reading

Beazley, Python Distilled §9.14 covers the lower-level threading and process primitives that concurrent.futures is built on top of — useful when you need to step outside the executor abstraction.

32.8 Summary

concurrent.futures is the right level of abstraction for most “do these N things in parallel” tasks. Threads for I/O, processes for CPU; map for ordered, submit + as_completed for control. The same code shape works for both.

Next, Chapter 33 covers asyncio — the model for I/O at scale, where one thread can manage thousands of concurrent operations.