Brief Primer on Parallel Programming
The following notes serve as a small introduction to parallel programming. The objective is to provide enough information to convey current parallel programming methods and implement some simple ones on your own.
I will be putting terms in bold that are good terms to look up on google.
Understanding Parallelism
To understand parallelism, it’s important to understand the difference between parallelism and concurrency.
At a high level, the way I think about this the following:
- Concurrency: Many tasks, same compute resources.
- Parallelism: single or many task, multiple compute resources.
There are two software level primitives dedicated to each of these methodologies.
- Threads (concurrency)
- Processes (parallelism)
Concurrency and Threads
Concurrency is the ability to have multiple actors sharing the same resources but operating on a different schedule. These actors are typically threads.
In Python, you can use the threading library to spawn query and run threads.
import threading
threading.active_count() # number of active threads
threading.current_thread() # return current thread object
threading.get_ident() # get current thead identifier
threading.enumerate() # enumerate currently alive threads
threading.main_thread() # return main thread object
The big thing to know about threads is that they share state. This means that two different threads can access the same data structures. This can be valuable for reducing time complexity, but also introduce subtle bugs very quickly.
For example, what happens if two threads try to write over the same index in a list? The answer: chaos..
Just kidding, these are called Race Conditions and they are incredibly hard to debug. To mitigate these problems, engineers use what are called threading primitives like locks and semaphores. There are a few of these tools in the python threading library.
The term thread safe refers to a data structure or API that can be used simultaneously by multiple threads sharing state and not encounter such problems.
Python actually cannot have multiple threads running simultaneously because of something called the Global Interpreter Lock. This is just like the locking using threading primitives but it applies by nature in CPython. While this may seem like it defeats the purpose of threads, it does not.
From the Python Standard docs
CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing or concurrent.futures.ProcessPoolExecutor. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.
Because of this, threads in Python are primarily useful for I/O bound problems. I/O bound meaning: waiting on the OS/kernel to either fetch or write something in another location. When waiting, the GIL can release and allow other threads to execute.
When to use Concurrency?
Let’s say you’re trying to retrieve a bunch of website data. Instead of iteratively going through each website, we can instead request the data from each website at the same time and collectively wait for the results.
An implementation of this looks as follows:
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
It’s important to note that other languages can support more intensive threading models that aren’t restricted by the GIL. The most prevalent example of this in HPC is OpenMP
More Resources on Concurrency
“True Parallelism” (Processes)
When your problem is compute bound, meaning you need to spread out the work of computing multiple things, you need to use multiple Processes.
Processes do not share memory space with other processes. There are multiple ways spawn a new process but usually it either boils down to a process fork or a process spawn.
The advantage of processes is that you can utilize multiple cores of a CPU explicitly. Instead of the operating system (kernel) deciding when and where to execute threads, you can explicitly determine which processes to map to which cores.
In Python, this can be done through the use of the Multiprocessing Library
Multiprocessing is good for writing programs that need to operate on a set of Compute bound problems collectively. For example, preprocessing a batch of images.
There are also ways to explicitly start a new process in Python with modules like Subprocess and Psutil. With these libraries, you can start, monitor and stop individual processes with more control than you can in multiprocessing. This is better for tasks that you want to run in the background, but that don’t need shared state with the rest of the program like threads.1
Communicating between processes
The question probably came up in your mind:
What if I need to communicate between processes?
There are many approaches but the two most dominate are:
- Shared memory
- Message passing
Shared memory programming is the model in which some segment of memory is dedicated to storing the data from multiple processes. there are even ways of doing this in Python.
Message Passing is the major paradigm for High Performance Computing simply because it’s been around the longest and hence has gotten the fastest.
Not surprisingly this is what the Message Passing Interface or MPI
does.
MPI
MPI is so important is deserves its own section. Essentially, MPI allows a programmer to write a program once and have it work across multiple CPUs or even multiple machines.
The reason this works is because of what are called collective operations in MPI. For example, MPI-gather is a collective that goes to each CPU core (also known as a rank in MPI) and retrieves some specified data field and returns it to host 0. MPI-All-gather is the same but instead of just returning the result to host 0, it returns the data to all hosts (ranks) in the program.
There is usually some reduction involved in the collective operation. For example, the average or sum is taken off all involved fields. This is how data parallel distributed AI training works. The gradient is collected from each copy of the model being training and averaged into a single gradient that is then broadcasted back out to each rank in the training program. Libraries like Horovod define their own collectives optimized for deep learning training.
The MPI programming model
What can be hard to understand about parallel programming and MPI in particular is how to actually write an MPI program.
For example, if you execute:
# mpirun is the command to "launch" a parallel MPI program
mpirun -n 2 some_python_script.py
two instances of that Python script will be started. If the script contains no operations to pass messages or communicate between those programs (like MPI) than the two programs will do the exact same thing… twice..
MPI allows you to define where in programs that certain collective communication should happen. This way, you write the entire program once, but it can be executed in parallel across many computers and effective work on a single compute problem. From algorithms class you probably remember the divide and conquer strategy. HPC, in general, is an industry dedicated to algorithms that follow that methodology across many computers (also known as compute nodes).
Lets take the simplest possible example: hello world
#include <mpi.h>
#include <stdio.h>
int main(int argc, char** argv) {
// Initialize the MPI environment
MPI_Init(NULL, NULL);
// Get the number of processes
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
// Get the rank of the process
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
// Get the name of the processor
char processor_name[MPI_MAX_PROCESSOR_NAME];
int name_len;
MPI_Get_processor_name(processor_name, &name_len);
// Print off a hello world message
printf("Hello world from processor %s, rank %d out of %d processors\n",
processor_name, world_rank, world_size);
// Finalize the MPI environment.
MPI_Finalize();
}
Before I break this down, some terms:
- Compute node: a single “computer” containing usually 1-2 CPUs and optionally 1-10 GPUs.
- core or cpu core: scientist sometimes call these nodes but they are wrong, this is the total number of available CPU cores to the parallel program.
- thread: with hyperthreading, the number of ranks in a program can double as each CPU core can house two ranks. Usually these are turned off for best performance, but they can be helpful
- So the total number of actors (ranks) in a MPI program is nodes x cores x threads.
So now, Breaking down this program a bit.
#include mpi.h
: is the header file for the MPI library. This library is often included in compilers so that it is automatically found when you are compiling. This example can be compiled on horizon withcc ./hello_world.c -o hello
MPI_Init
Tell the MPI launcher (srun
ormpirun
) that this is an MPI program and should be started as such.MPI_Comm_size
get the number of processes globally (meaning across all compute nodes) that this program was given by the MPI launcher. So if you ran this withmpirun -n 3 ./hello
thenworld_size
would be3
MPI_Comm_rank
tells us which rank (cpu/gpu) this particular instance or process is running on. So if run with three processes as the previous step, this could return0
1
or2
. (0 based indexing)MPI_Get_processor_name
: get the host name or compute node we are current running on.MPI_finalize
: let the MPI launcher know to wait for the shutdown of all the processes in this program and then clean up their memory space in the kernel.
Here is the same program in Python which uses MPI4PY.
#!/usr/bin/env python
"""
Parallel Hello World
"""
from mpi4py import MPI
import sys
size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
name = MPI.Get_processor_name()
sys.stdout.write(
"Hello, World! I am process %d of %d on %s.\n"
% (rank, size, name))
Fun Ways to get Started
- Try to run a multi-threaded program that causes a race condition.
- Try to use PSutil to start a couple processes that just sleep and then print the processor id they are on (can do on laptop or horizon.
- Try to run a multi-processing hello world and confirm it’s using all the CPUs available
-
I highly suggest running through this documentation here and trying some of the examples. ↩