Frequently in Python, I find myself having to make repeated calls to the same API for different parameters e.g. if extracting stock ticker data, it is usually served one-at-a-time, so to get the data for multiple tickers, I need to make multiple requests. Sometimes that's fine to do sequentially, but sometimes speed is important and we need to parallelise the workflow.
I always find myself having to look up the syntax for Python's ThreadPoolExecutor or ProcessPoolExecutor, and there's always some level of faff involved in processing the results. One day, in 2025, I hit upon an idea, and I have never manually used the Executors again since. This approach just works in every parallelisation scenario I have encountered.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
from multiprocessing import cpu_count
from typing import Any, Callable
ALLOWABLE_EXECUTOR_CLASSES = {
'thread': ThreadPoolExecutor,
'process': ProcessPoolExecutor,
}
def simple_parallel(
tasks: dict[str, Callable],
max_workers: int|None = None,
executor_class: type[ThreadPoolExecutor] | type[ProcessPoolExecutor] | str | None = None,
) -> dict[str, Any]:
'''Utility to make parallelising workloads simpler.
Input arguments:
* tasks: dict, where the key is what gets used to return the data with and the value must be
the callable function/method (hint: just wrap code inside a lambda).
* max_workers - the max number of workers for the executor_class
* executor_class - the type of parallelism to run (multi-threading or multi-processing)
As an example:
def paginated_function(offset: int, limit: int) -> QuerySet:
return Model.objects.all()[offset:offset + limit]
If we wanted to run this multiple times concurrently, we can now just do:
results = simple_parallel({
'page1': lambda: paginated_function(0, 50),
'page2': lambda: paginated_function(50, 100),
'page3': lambda: paginated_function(100, 150),
'page4': lambda: paginated_function(150, 200),
})
And simple_parallel does the hard part concurrently.
'''
if isinstance(executor_class, str):
executor_class = ALLOWABLE_EXECUTOR_CLASSES[executor_class]
if executor_class is None:
executor_class = ThreadPoolExecutor
if max_workers is None:
if executor_class is ProcessPoolExecutor:
max_workers = min(cpu_count(), len(tasks) or 1)
else:
max_workers = len(tasks) or 1
# validate tasks dict
for name, task in tasks.items():
if not callable(task):
raise TypeError(f'Task spec for {name} incorrect - must be callable')
results = {}
with executor_class(max_workers=max_workers) as executor:
# use the future itself as the key in this dict
future_to_name = {executor.submit(task): name for name, task in tasks.items()}
for future in as_completed(future_to_name):
# this is a generator that returns results in whatever order they execute most quickly
# no ordering is implied or expected - this is why the inputs must have a key i.e. be a dict
name = future_to_name[future]
try:
results[name] = future.result()
except Exception as exc:
results[name] = exc
return results
Let's query some stock data from Yahoo Finance. The good folks there have a public API that can easily handle multiple simultaneous requests.
from datetime import datetime
import requests
def get_ticker_data(ticker: str) -> dict[str, list[float | int]]:
url = f"https://query2.finance.yahoo.com/v8/finance/chart/{ticker}?interval=1d&range=3mo&includePrePost=true&events=div%7Csplit%7Cearn&lang=en-GB®ion=GB&source=cosaic"
response = requests.get(url, headers={"User-Agent": "Not blank"})
if response.status_code != 200:
raise Exception('Something went wrong')
data = response.json()
return {
"timestamp": data["chart"]["result"][0]["timestamp"],
"open": data["chart"]["result"][0]["indicators"]["quote"][0]["open"],
"high": data["chart"]["result"][0]["indicators"]["quote"][0]["high"],
"low": data["chart"]["result"][0]["indicators"]["quote"][0]["low"],
"close": data["chart"]["result"][0]["indicators"]["quote"][0]["close"],
"volume": data["chart"]["result"][0]["indicators"]["quote"][0]["volume"],
"adjclose": data["chart"]["result"][0]["indicators"]["adjclose"][0]["adjclose"],
}
TICKERS = ["AAPL", "GOOG", "NFLX", "TSLA", "META", "AMZN", "NVDA"]
# process sequentially
dt0 = datetime.now()
data = {}
for ticker in TICKERS:
data[ticker] = get_ticker_data(ticker)
dt1 = datetime.now()
print(f'Sequential took {(dt1 - dt0).total_seconds()} seconds')
# Sequential took 0.918822 seconds
# process in parallel without util
dt0 = datetime.now()
tasks = {}
with ThreadPoolExecutor(max_workers=7) as pool:
for ticker in TICKERS:
tasks[ticker] = pool.submit(get_ticker_data, ticker)
for ticker, future in tasks.items():
try:
tasks[ticker] = future.result()
except Exception as exc:
tasks[ticker] = exc
dt1 = datetime.now()
print(f'Parallel took {(dt1 - dt0).total_seconds()} seconds')
# Parallel took 0.244826 seconds
# process in parallel with util
dt0 = datetime.now()
tasks = {}
for ticker in TICKERS:
tasks[ticker] = lambda ticker=ticker: get_ticker_data(ticker) # NOTE: late-binding issue with lambda in loop
data = simple_parallel(tasks)
dt1 = datetime.now()
print(f'Parallel with util took {(dt1 - dt0).total_seconds()} seconds')
# Parallel with util took 0.287205 seconds
The parallel versions are consistently 3 times faster than the sequential version. Using the utility also makes the code much simpler to write as well (fewer imports, easier-to-remember syntax).
Note the comment about late-binding. Lambdas defined within a for loop often have this behaviour. If we do not add the keyword arguments (the ticker=ticker) to the lambda, then all lambdas will be run with only the final value of the for loop i.e. we will query "NVDA" 7 times. This is a fun little gotcha in Python.
Say we have 100 log files, each with 10k rows, that we need to read and count up how many times a particular keyword appears. This task is mainly CPU-bound, so I would not expect multi-threading to help much.
Let's start by setting up our 100 files:
import random
import os
from datetime import datetime
# Create all our log files
random.seed(22)
ENDPOINTS = ['/api/v1/login', '/api/v1/users', '/api/v1/products', '/api/v1/orders', '/api/v1/cart']
STATUS_CODES = [200, 400, 500]
FILENAMES = [f'./logs/log{i}.log' for i in range(100)]
os.makedirs('./logs', exist_ok=True)
for filename in FILENAMES:
with open(filename, 'w') as f:
for j in range(10_000):
endpoint = random.choice(ENDPOINTS)
status = random.choice(STATUS_CODES)
f.write(f'{datetime.now().isoformat()} {endpoint} {status}\n')
def count_endpoints(filename: str, endpoint: str) -> int:
count = 0
with open(filename, 'r') as f:
for line in f:
if endpoint in line:
count += 1
return count
Running sequentially:
dt0 = datetime.now()
endpoint_counts = {}
for filename in FILENAMES:
endpoint_counts[filename] = count_endpoints(filename, '/api/v1/login')
dt1 = datetime.now()
print(f'Sequential took {(dt1 - dt0).total_seconds()} seconds')
# Sequential took 0.33276 seconds
Running in parallel with the utility:
dt0 = datetime.now()
tasks = {}
for filename in FILENAMES:
tasks[filename] = lambda filename=filename: count_endpoints(filename, '/api/v1/login')
endpoint_counts = simple_parallel(tasks, executor_class='process')
dt1 = datetime.now()
print(f'Parallel (process) with util took {(dt1 - dt0).total_seconds()} seconds')
# Parallel (process) with util took 0.102157 seconds
Again, we see a huge speedup for minimal increase in complexity. We are very easily able to use multi-processing as this task is mainly CPU-bound.
Running the utility code with executor_class='thread' actually results in a marked slow-down of the code execution. Thinking about it - creating all of these threads and switching between them, when only one thread can actually work at any one time (due to the GIL), this could explain the slowdown.
As a test, let's play with the number of workers for the multi-threading and see if we actually get a speedup if we reduce the workers:
# workers | Time taken (seconds)
1 | 0.466398
2 | 0.453574
4 | 0.446490
8 | 0.436512
16 | 0.473652
32 | 0.594783
64 | 0.539817
100 | 0.563930
It should be noted that this has been run on a MacBook with an SSD, so the read operations are generally incredibly fast.
We can see that the overhead is minimised at around 8 workers, but even at its best, multi-threading is consistently slower than the sequential approach for this CPU-bound task. After 8 workers, the context-switching overhead grows further and makes things worse.
Let's now tune the performance of the multi-processing version by again playing with the number of workers:
# workers | Time taken (seconds)
1 | 0.096115
2 | 0.103184
4 | 0.085363
8 | 0.090914
16 | 0.100748
32 | 0.135785
64 | 0.208119
100 | 0.349425
We see the same performance degradation above a certain number of workers as we see in the multi-threading scenario. At some point, the cost of coordinating the multiple processes starts to outweigh the benefits of parallelism. As a rule of thumb, for multi-processing, you should limit the number of workers to the number of available CPU cores on your machine.
Comments
No comments yet.