Available Software

As always, the examples in this post are available to play along with.

As A Python Package

Please install this if you want to play along, or skip straight to implementation examples.

As Available Code On Git

Browse the code here.

…but first some background on threading and multiprocessing. You have to read this and understand this, there is no other way around this as depending on what you want to do you need to choose either a threading or processing approach.

Why Use Multi-threading & Multi-processing

If you are designing some code to run a task you are going to want it to do your tasks as fast as possible. What this really means is that you want to max out the processing resources on you machine, mainly the CPU and its sweet sweet processing cores.

If you design and write a standard bit of code you are probably running code sequentially, blocking the processing pipe and making other methods and functions wait in line for a heavy function to complete when they are probably independent. You can smash this clog by designing your application to take advantage of threading and multiprocessing so that your code executes concurrently.

You might be thinking, wow, my code executing in parallel… it may seem so… but it is actually executing concurrently. You can really only run one chunk of code at a time, your code will share slices of CPU time giving the impression that it is running in parallel and that’s good enough for me.

Processes & Multi-processing

You already know what a process is. I guarantee and some point you have opened windows task manager to terminate a stuck program/process… the OS (windows/linux) manages all of these processes and splits CPU time amongst them. This is called multi-processing.

Note how Microsoft Office doesn’t have access to your Spotify playlists? It is because processes have separate memory and processing space. In the same way, any python code you run in a multiprocessing way each individual python process will not be able to share information unless you point to some external resource such as a database or other communication tool.

Threads & Multi-threading

A process may be composed of one or more threads. Remember that the process has some allocated CPU time? Well, you need to now share that block of CPU time with all of the threads running inside the process. This is what multi-threading manages.

Taking Microsoft Office as an example, the bit of threaded code running the spell checker has access to the bit of threaded code that displays your document. In a multi-threading environment each of your threads has access to the same process and memory space.

Which Do I Use?

Threading For Input/Output IO Bound Tasks: An example of an IO task would be a communicating on a third party, perhaps you are waiting for other external resources such as network requests, file I/O, or user input. Sometimes your external data source will respond, sometimes it wont. Multi-threading allows you to execute other tasks while one task is waiting for your I/O to complete, which can help improve the responsiveness and efficiency of your application.

Multiprocessing For CPU Bound Tasks: CPU-bound tasks that involve intensive computations, such as numerical simulations, image processing, or machine learning. This is because multiprocessing allows you to distribute the workload across multiple CPU cores, which can significantly improve the performance of these types of tasks.

So you can see, you must take in to account the task you want to complete before designing your concurrent code.

Available Python Libraries

Processing & Threading Modules

There are two native python modules that implement threading and multiprocessing for you.

threading module

You can import the threading module using,

import threading

multiprocessing module

You can import the multiprocessing module using,

import multiprocessing

Concurrent.Futures Module

Concurrent.Futures is another python module, it is a high level interface for the above threading and multiprocessing modules and uses these modules under the hood to implement its functionality for it’s executors.

You can import this module using,

import concurrent.futures

The module has two items called executors which help you implement multithreading and multiprocessing code.

ThreadPoolExecutor

It’s recommended to use the ThreadPoolExecutor as a context, as follows,

with concurrent.futures.ThreadPoolExecutor() as executor:
    #submit your code to the Thread executor.

ProcessPoolExecutor

Here is the equivalent code for using the ProcessPoolExecutor as a context,

with concurrent.futures.ProcessPoolExecutor() as executor:
    #submit your code to the Process executor.

Interchangeability Of Methods

You may have noticed that the usage statements are identical except for the words Process and Thread. This is because both executors implement the same interface. As we work through examples you will see that the code for process and threads is identical which makes turning a bit of code from multi-processing to multi-threading as easy as switching ProcessPoolExecutor() to ThreadPoolExecutor() and vise versa.

EXCEPT: There is always an exception. Due to fundamental physical difference of the distribution of a multi-process and a multi-threaded application as discussed before there are a couple of occasions where you need to take care. Mainly when you want to share information or states between processes. I will point these out as we go over examples.

Asyncio

asyncio is another library to write concurrent code using the async/await syntax.

I won’t be covering it here as it will muddy the waters and I only mentioning this here as it is an available concurrency module in python. It’s a little different conceptually to threading, multiprocessing and concurrent.futures and so I will be exploring it in a different post.

EXAMPLES

I’ve tried to mix and match as many methods as possible here.

For the impatient, you can skip straight to the examples in the python packages using the following commands.
Example code is available in github, but also outputted to the screen when you run these examples.

Stick around for in depth explanations.

Python Threading Help

(concurrency) ubuntu@goodboy:~$ rexthreading --help
usage: rexthreading [-h]
                    [--thread-count | --nd-concurrent | --nd-nj-concurrent | --nd-serial | --d-concurrent | --d-nj-concurrent | --i-printalpha | --qexample | --qlexample | --qjexample | --qdqexample]

Run default threading example

options:
  -h, --help          show this help message and exit
  --thread-count      Max number of threads on your physical system
  --nd-concurrent     'Non daemon' concurrent thread example.
  --nd-nj-concurrent  'Non daemon' 'non-join' concurrent thread example.
  --nd-serial         'Non daemon' serial thread example.
  --d-concurrent      'Daemon' concurrent thread example.
  --d-nj-concurrent   'Daemon' 'non-join' concurrent thread example.
  --i-printalpha      Thread by inheritence example. Define own class that prints alphabet.
  --qexample          Threading queue example
  --qlexample         Threading queue with lock example
  --qjexample         Threading 'queue join' example
  --qdqexample        Threading with deque example

Python Multiprocessing Help

(concurrency) ubuntu@goodboy:~$ rexmultiprocessing --help
usage: rexmultiprocessing [-h]
                          [--cpu-count | --nd-concurrent | --i-printalpha | --poolexample | --poolcontextexample | --poolcontextexampleadd | --qexample | --qlexample | --qjexample | --qdqexample | --pipeexample]

Run default multiprocessing example

options:
  -h, --help               show this help message and exit
  --cpu-count              What is your machine CPU count?
  --nd-concurrent          'Non Daemon' concurrent process example
  --i-printalpha           Multiprocess by inheritence example. Define own class that prints alphabet.
  --poolexample            Multiprocess by pool square example.
  --poolcontextexample     Multiprocess by context pool square example.
  --poolcontextexampleadd  Multiprocess by context pool add example.
  --qexample               Multiprocess with queue example.
  --qlexample              Multiprocess with queue and a lock example.
  --qjexample              Multiprocess with joinable queue.
  --qdqexample             Multiprocess with a dequeue.
  --pipeexample            Multiprocess with a pipe example.

Python Concurrent.Futures Help

(concurrency) ubuntu@goodboy:~$ rexconcurrentfutures --help
usage: rexconcurrentfutures [-h]
                            [--tmap-ce | --tmap-tu | --tsubmit | --tresult | --tascomplete | --tdone | --tcallback | --tshutdown-true | --tshutdown-false | --tcancel | --tcancel-ec | --tqueueeg | --tlqueueeg | --tdequeueeg | --tldequeueeg | --pmap-ce | --pmap-tu | --psubmit | --presult | --pascomplete | --pdone | --pcallback | --pshutdown-true | --pshutdown-false | --pcancel | --pcancel-ec | --pqueue | --ppipeexample]

Run default concurrent futures example

options:
  -h, --help         show this help message and exit
  --tmap-ce          ThreadPoolExecutor map with corresponding elements
  --tmap-tu          ThreadPoolExecutor map with tuples
  --tsubmit          ThreadPoolExecutor submit example
  --tresult          ThreadPoolExecutor result usage example
  --tascomplete      ThreadPoolExecutor ascomplete example
  --tdone            ThreadPoolExecutor done example
  --tcallback        ThreadPoolExecutor callback example
  --tshutdown-true   ThreadPoolExecutor shutdown example wait true
  --tshutdown-false  ThreadPoolExecutor shutdown example wait false
  --tcancel          ThreadPoolExecutor cancel example
  --tcancel-ec       ThreadPoolExecutor cancel with thread event_check example
  --tqueueeg         ThreadPoolExecutor queue.queue() queue example
  --tlqueueeg        ThreadPoolExecutor queue.queue() queue example with lock
  --tdequeueeg       ThreadPoolExecutor collections.dequeue() example
  --tldequeueeg      ThreadPoolExecutor collections.dequeue() example with lock
  --pmap-ce          ProcessPoolExecutor map with corresponding elements
  --pmap-tu          ProcessPoolExecutor map with tuples
  --psubmit          ProcessPoolExecutor submit example
  --presult          ProcessPoolExecutor result usage example
  --pascomplete      ProcessPoolExecutor ascomplete example
  --pdone            ProcessPoolExecutor done example
  --pcallback        ProcessPoolExecutor callback example
  --pshutdown-true   ProcessPoolExecutor shutdown example wait true
  --pshutdown-false  ProcessPoolExecutor shutdown example wait false
  --pcancel          ProcessPoolExecutor cancel example
  --pcancel-ec       ProcessPoolExecutor cancel with thread event_check example
  --pqueue           ProcessPoolExecutor multiprocessing.Queue() queue example
  --ppipeexample     ProcessPoolExecutor pipe example

Thread vs Multiprocessing Implementation Gotchas

Interchangeability of methods and ‘boiler plate’ code is quite good, except for the exceptions as mentioned earlier.

Queues and Events, if you have these in your code watch out for these gotchas.

Queues and Events are a couple of issues that will cause you headaches as the type of queue and event technology you use will change depending on if you are using threading (IO bound design) or multiprocessing (CPU bound design) based concurrency.

Examples of how to code to these requirements can be found in the above package.

A Note On Queues

Queues, one of those things that you can’t interchange between process and thread implementations. You must pick the right queue technology for the concurrency implementation you have chosen to pursue, and changing your concurrency choice will unfortunatly mean rewriting any code that handles queues.

Examples of implementation of each queue are available in the above package.

Thread Safe Queue

The module queue.Queue is designed for inter-thread communication. It provides a thread-safe way to pass messages between threads, which is useful in situations where you have multiple threads running concurrently and need to share data between them. The queue.Queue is designed to be thread-safe, which means that it can be used safely in multi-threaded environments.

The module collections.deque is generally safe to use in inter-thread communication and is thread-safe by default.

Process Safe Queue

multiprocessing.Queue is designed for inter-process communication. It provides a process-safe way to pass messages between processes which is useful in situations where you have multiple processes running concurrently and need to share data between them. The multiprocessing.Queue class is designed to be process-safe, which means that it can be used safely in multi-process environments.

Concurrent Futures

Both queue.Queue and multiprocessing.Queue can be used in a concurrent.futures implementation to ensure thread safety or process safety.

The module queue.Queue should be used with ThreadPoolExecutor.

Another module called collections.deque is generally safe to use in a ThreadPoolExecutor since it is thread-safe by default.

The module multiprocessing.Queue should be used with ProcessPoolExecutor.

Locks

All of these queues, queue.Queue, multiprocessing.Queue, and collections.deque have locks. It is encouraged to use locks when designing your tasks to prevent race conditions and other issues that can arise when multiple tasks are accessing the same queue at the same time.

A Note On Events

Events, the other item you must pay attention when choosing your concurrency implementation.

You might want to pass an event to a thread or a process to signal something, perhaps to terminate the task at hand.

Difficult to explain without code, so here we go.

Thread Safe Events

Threads have access to the same memory area so you can simply create a threading event by,

cancel_event = threading.Event()

You can call the event anything you like, i’ve chosen to call it cancel_event. Here it is in use, created by the method cancel_example_with_event that uses the ThreadPoolExecutor and passes the event to the task method count_down_ccheck.

When the event has set() called on it, it signals that the event has been set and the task terminates the thread accordingly.

Here’s a snippet from one of the examples,

import concurrent.futures
import time
import threading


def count_down_ccheck(self, n, cancel_event):
    for i in range(n, 0, -1):
        if cancel_event.is_set():
            return
        print(f"{i} seconds left...")
        time.sleep(1)
    print("Time's up!")

def cancel_example_with_event(self):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        cancel_event = threading.Event()
        future = executor.submit(self.count_down_ccheck, 10, cancel_event)
        time.sleep(5)
        if not future.done():
            cancel_event.set()
            print("Task cancelled!")

Process Safe Events

Unfortunately, processes do not share the same memory space and you have to use what is called a multiprocessing Manager. Event() is one of only a few methods that the multiprocessing Manager supports.

The code is almost the same, except you need to use the manager context, and create a multiprocessing manager event.

cancel_event = manager.Event()

The event can then be used in the expected way. Notice that the cancel_example_with_event in this case calls the ProcessPoolExecutor. The task count_down_ccheck behaves in the exact same way.

Here’s a snippet from one of the examples,

import concurrent.futures
import time
import threading

import multiprocessing
from multiprocessing import Manager

def count_down_ccheck(self, n, cancel_event):
    for i in range(n, 0, -1):
        if cancel_event.is_set():
            return
        print(f"{i} seconds left...")
        time.sleep(1)
    print("Time's up!")

def cancel_example_with_event(self):
    with concurrent.futures.ProcessPoolExecutor() as executor:
        with Manager() as manager:
            cancel_event = manager.Event()
            future = executor.submit(self.count_down_ccheck, 10, cancel_event)
            time.sleep(5)
            if not future.done():
                cancel_event.set()
                print("Task cancelled!")
            manager.shutdown

As you can see, in general and apart from these two cases code using the concurrent.futures code is interchangeable.

I’ve given concurrent.futures examples as this is where you could be tripped up, but you should follow the exact same rules when using the python module threading (use thread based events), and multiprocessing (use manager based events).

Waiting Event

You can also use events to pause/wait until they are set. For example this task method.

def worker(self, an_event):
    print('Waiting for event to be set...')
    an_event.wait()
    print('Event has been set!')

Should I Use Concurrent.Futures, Or Individual Modules Threading and Multiprocessing?

The vast majority of the time you should head straight to Concurrent.Futures and use the ThreadPoolExecutor for threaded I/O bound tasks and the ProcessPoolExecutor for process CPU bound tasks.

Concurrent.Futures does a grand job of doing the hard work for you.

ThreadPoolExecutor and ProcessPoolExecutor Common Methods

As I’ve already covered the different gotchas above for queues and events, I will finish up with a description of the common methods available between ThreadPoolExecutor, and ProcessPoolExecutor. You can use these methods to confidently build boilerplate code that will run under I/O or CPU headed modes with each of the Executors.

The submit(), map(), result(), as_completed(), done(), add_done_callback(), and cancel() methods that we discussed in this tutorial can all be used with either type of executor.

I recommend installing the rexconcurrentfutures python module and running each example, code will be output to the screen as well as an explanation. e.g.

(concurrency) ubuntu@goodboy:~$ rexconcurrentfutures --tsubmit
25
 
    def some_task(self, n):
        return n * n

    def submit_example(self):
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future = executor.submit(self.some_task, 5)
            print(future.result())

    # ThreadPoolExecutor Example

    # def submit_example(self): creates a thread pool and submit a single
    # task to it using the submit() method. The submit() method returns
    # a Future object that represents the result of the task when it completes.

The examples mentioned will be snippets, but full code is available for review in the git repository.

You will find the following common methods in my examples for both ThreadPoolExecutor and ProcessPoolExecutor examples.

Submitting Tasks with submit()

The submit() method is used to submit a single task to a thread or process pool and returns a Future object that represents the result of the task. The submit() method takes a callable object and any arguments to pass to the callable as input.

Mapping Tasks with map()

The map() method is used to submit multiple tasks to a thread or process pool and returns an iterator that yields the results of the tasks in the order that they were submitted. The map() method takes a callable object and an iterable of arguments to pass to the callable as input.

Retrieving Results with result()

The result() method is used to retrieve the result of a completed task represented by a Future object. The result() method blocks until the result is available, so it will wait for the task to complete before returning the result.

Setting a Timeout with result(timeout=None)

It is also possible to set a timeout for how long result() should block before raising a concurrent.futures.TimeoutError exception.

Retrieving Results as they Complete with as_completed()

The as_completed() function is used to retrieve the results of completed tasks represented by a list of Future objects in the order that they complete. The as_completed() function returns an iterator that yields the results of the tasks as they complete.

Checking Task Completion with done()

The done() method is used to check whether a Future object representing a task has completed. The done() method returns a boolean value indicating whether the task has completed.

Setting a Callback Function with add_done_callback()

The add_done_callback() method is used to set a callback function that is called when a Future object representing a task is completed. The callback function takes a single argument, which is the Future object representing the completed task.

Shutting Down the Thread or Process Pool with shutdown()


The shutdown() method is used to shut down the thread or process pool associated with a ThreadPoolExecutor or ProcessPoolExecutor. The shutdown() method is a non-blocking call that waits for all tasks to complete before shutting down the pool.

Cancelling a Task with cancel()

The cancel() method is used to cancel a Future object representing a task that has not yet started running. If the task has already started running or has already completed, then calling cancel() has no effect.

One thought on “Python Concurrency, Multi-threading & Multi-processing”

Leave a Reply