Easy parallelisation in Python

Frequently in Python, I find myself having to make repeated calls to the same API for different parameters e.g. if extracting stock ticker data, it is usually served one-at-a-time, so to get the data for multiple tickers, I need to make multiple requests. Sometimes that's fine to do sequentially, but sometimes speed is important and we need to parallelise the workflow.

I always find myself having to look up the syntax for Python's ThreadPoolExecutor or ProcessPoolExecutor, and there's always some level of faff involved in processing the results. One day, in 2025, I hit upon an idea, and I have never manually used the Executors again since. This approach just works in every parallelisation scenario I have encountered.

The code

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from multiprocessing import cpu_count
from typing import Any, Callable

ALLOWABLE_EXECUTOR_CLASSES = {
    'thread': ThreadPoolExecutor,
    'process': ProcessPoolExecutor,
}


def simple_parallel(
    tasks: dict[str, Callable],
    max_workers: int|None = None, 
    executor_class: type[ThreadPoolExecutor] | type[ProcessPoolExecutor] | str | None = None,
) -> dict[str, Any]:
    '''Utility to make parallelising workloads simpler.

    Input arguments:
    * tasks: dict, where the key is what gets used to return the data with and the value must be 
      the callable function/method (hint: just wrap code inside a lambda).
    * max_workers - the max number of workers for the executor_class
    * executor_class - the type of parallelism to run (multi-threading or multi-processing)

    As an example:

    def paginated_function(offset: int, limit: int) -> QuerySet:
        return Model.objects.all()[offset:offset + limit]

    If we wanted to run this multiple times concurrently, we can now just do:

    results = simple_parallel({
        'page1': lambda: paginated_function(0, 50),
        'page2': lambda: paginated_function(50, 100),
        'page3': lambda: paginated_function(100, 150),
        'page4': lambda: paginated_function(150, 200),
    })

    And simple_parallel does the hard part concurrently.
    '''
    if isinstance(executor_class, str):
        executor_class = ALLOWABLE_EXECUTOR_CLASSES[executor_class]

    if executor_class is None:
        executor_class = ThreadPoolExecutor

    if max_workers is None:
        if executor_class is ProcessPoolExecutor:
            max_workers = min(cpu_count(), len(tasks) or 1)
        else:
            max_workers = len(tasks) or 1

    # validate tasks dict
    for name, task in tasks.items():
        if not callable(task):
            raise TypeError(f'Task spec for {name} incorrect - must be callable')

    results = {}
    with executor_class(max_workers=max_workers) as executor:

        # use the future itself as the key in this dict
        future_to_name = {executor.submit(task): name for name, task in tasks.items()}

        for future in as_completed(future_to_name):
            # this is a generator that returns results in whatever order they execute most quickly
            # no ordering is implied or expected - this is why the inputs must have a key i.e. be a dict
            name = future_to_name[future]
            try:
                results[name] = future.result()
            except Exception as exc:
                results[name] = exc

    return results

Example 1

Let's query some stock data from Yahoo Finance. The good folks there have a public API that can easily handle multiple simultaneous requests.

from datetime import datetime

import requests


def get_ticker_data(ticker: str) -> dict[str, list[float | int]]:
    url = f"https://query2.finance.yahoo.com/v8/finance/chart/{ticker}?interval=1d&range=3mo&includePrePost=true&events=div%7Csplit%7Cearn&lang=en-GB&region=GB&source=cosaic"
    response = requests.get(url, headers={"User-Agent": "Not blank"})
    if response.status_code != 200:
        raise Exception('Something went wrong')

    data = response.json()
    return {
        "timestamp": data["chart"]["result"][0]["timestamp"],
        "open": data["chart"]["result"][0]["indicators"]["quote"][0]["open"],
        "high": data["chart"]["result"][0]["indicators"]["quote"][0]["high"],
        "low": data["chart"]["result"][0]["indicators"]["quote"][0]["low"],
        "close": data["chart"]["result"][0]["indicators"]["quote"][0]["close"],
        "volume": data["chart"]["result"][0]["indicators"]["quote"][0]["volume"],
        "adjclose": data["chart"]["result"][0]["indicators"]["adjclose"][0]["adjclose"],
    }


TICKERS = ["AAPL", "GOOG", "NFLX", "TSLA", "META", "AMZN", "NVDA"]


# process sequentially
dt0 = datetime.now()

data = {}
for ticker in TICKERS:
    data[ticker] = get_ticker_data(ticker)

dt1 = datetime.now()
print(f'Sequential took {(dt1 - dt0).total_seconds()} seconds')
# Sequential took 0.918822 seconds

# process in parallel without util
dt0 = datetime.now()

tasks = {}
with ThreadPoolExecutor(max_workers=7) as pool:
    for ticker in TICKERS:
        tasks[ticker] = pool.submit(get_ticker_data, ticker)

for ticker, future in tasks.items():
    try:
        tasks[ticker] = future.result()
    except Exception as exc:
        tasks[ticker] = exc

dt1 = datetime.now()
print(f'Parallel took {(dt1 - dt0).total_seconds()} seconds')
# Parallel took 0.244826 seconds

# process in parallel with util
dt0 = datetime.now()

tasks = {}
for ticker in TICKERS:
    tasks[ticker] = lambda ticker=ticker: get_ticker_data(ticker)  # NOTE: late-binding issue with lambda in loop
data = simple_parallel(tasks)

dt1 = datetime.now()
print(f'Parallel with util took {(dt1 - dt0).total_seconds()} seconds')
# Parallel with util took 0.287205 seconds

The parallel versions are consistently 3 times faster than the sequential version. Using the utility also makes the code much simpler to write as well (fewer imports, easier-to-remember syntax).

Late-binding

Note the comment about late-binding. Lambdas defined within a for loop often have this behaviour. If we do not add the keyword arguments (the ticker=ticker) to the lambda, then all lambdas will be run with only the final value of the for loop i.e. we will query "NVDA" 7 times. This is a fun little gotcha in Python.

Example 2

Say we have 100 log files, each with 10k rows, that we need to read and count up how many times a particular keyword appears. This task is mainly CPU-bound, so I would not expect multi-threading to help much.

Let's start by setting up our 100 files:

import random
import os
from datetime import datetime


# Create all our log files
random.seed(22)

ENDPOINTS = ['/api/v1/login', '/api/v1/users', '/api/v1/products', '/api/v1/orders', '/api/v1/cart']
STATUS_CODES = [200, 400, 500]
FILENAMES = [f'./logs/log{i}.log' for i in range(100)]

os.makedirs('./logs', exist_ok=True)
for filename in FILENAMES:
    with open(filename, 'w') as f:
        for j in range(10_000):
            endpoint = random.choice(ENDPOINTS)
            status = random.choice(STATUS_CODES)
            f.write(f'{datetime.now().isoformat()} {endpoint} {status}\n')


def count_endpoints(filename: str, endpoint: str) -> int:
    count = 0
    with open(filename, 'r') as f:
        for line in f:
            if endpoint in line:
                count += 1
    return count

Running sequentially:

dt0 = datetime.now()

endpoint_counts = {}
for filename in FILENAMES:
    endpoint_counts[filename] = count_endpoints(filename, '/api/v1/login')

dt1 = datetime.now()
print(f'Sequential took {(dt1 - dt0).total_seconds()} seconds')
# Sequential took 0.33276 seconds

Running in parallel with the utility:

dt0 = datetime.now()

tasks = {}
for filename in FILENAMES:
    tasks[filename] = lambda filename=filename: count_endpoints(filename, '/api/v1/login')
endpoint_counts = simple_parallel(tasks, executor_class='process')

dt1 = datetime.now()
print(f'Parallel (process) with util took {(dt1 - dt0).total_seconds()} seconds')
# Parallel (process) with util took 0.102157 seconds

Again, we see a huge speedup for minimal increase in complexity. We are very easily able to use multi-processing as this task is mainly CPU-bound.

Performance tuning

Running the utility code with executor_class='thread' actually results in a marked slow-down of the code execution. Thinking about it - creating all of these threads and switching between them, when only one thread can actually work at any one time (due to the GIL), this could explain the slowdown.

As a test, let's play with the number of workers for the multi-threading and see if we actually get a speedup if we reduce the workers:

# workers | Time taken (seconds)
1       | 0.466398
2       | 0.453574
4       | 0.446490
8       | 0.436512
16      | 0.473652
32      | 0.594783
64      | 0.539817
100     | 0.563930

It should be noted that this has been run on a MacBook with an SSD, so the read operations are generally incredibly fast.

We can see that the overhead is minimised at around 8 workers, but even at its best, multi-threading is consistently slower than the sequential approach for this CPU-bound task. After 8 workers, the context-switching overhead grows further and makes things worse.

Let's now tune the performance of the multi-processing version by again playing with the number of workers:

# workers | Time taken (seconds)
1       | 0.096115
2       | 0.103184
4       | 0.085363
8       | 0.090914
16      | 0.100748
32      | 0.135785
64      | 0.208119
100     | 0.349425

We see the same performance degradation above a certain number of workers as we see in the multi-threading scenario. At some point, the cost of coordinating the multiple processes starts to outweigh the benefits of parallelism. As a rule of thumb, for multi-processing, you should limit the number of workers to the number of available CPU cores on your machine.

Comments

↪ Replying to

No comments yet.