Status: In Progress

Multithreading vs Multiprocessing

  • Multithreading is scaling up
    • MT is good for I/O bound problems
  • Multiprocessing is scaling out
    • MP is good for cpu bound problems

Threads

Threads allow for a separate flow of execution within a program. In Python, because of the Global Interpreter lock (GIL) things don’t actually happen simultaneously, but merely appear to.

  • Even though they aren’t running at the same time, they may be located on separate processors. But only one will be running at once.

There are some ways to make Python code actually execute in parallel

  • Multiprocessing
  • Subprocess
  • C extensions
  • C++ Pybind
  • Other implementations of Python that don’t have a GIL.1 None of these methods above are bound by the GIL.

The order in which threads run is determined by the operating system.

Daemons

  • Killed when the main thread dies
  • Doesn’t wait for daemon to finish

Primitives

Locks

  • Locks are a basic primitive in python. Exposed through threading.lock
  • Often also called a mutex
  • Locks are like a hall pass. Only one student can be in the hallway at once.
  • Can be used with context managers which greatly reduces the chance that an exception will skip over the release of the lock
          import threading
          lock = threading.Lock()
          with lock:
              # perform some thread safe operation
    
          # to be safe with exceptions use try/except/finally
          lock.acquire()
          try: 
              # run some code
          except Exception:
              # catch and handle some exception
          finally:
         	    # release the lock
              lock.release()
    
  • If two pieces of code are trying to acquire the same resource, they must share the same lock. (Same reference to the lock)

RLocks

Like locks except for they support being able to acquire the lock multiple times before operation.

  • RLocks still needed to be acquired the same number of times that they are released.
  • Good for recursive functions that need to acquire a lock multiple times.

Semaphores

Semaphores are counters with the special property that they are atomic. OS will never swap out threads when incrementing or decrementing the counter. 2

  • Semaphores are frequently used to protect a resource that has a limited capacity. An example would be if you have a pool of connections and want to limit the size of that pool to a specific number.

Barrier

Like an MPI Barrier, all threads call wait at the barrier until some condition is reached

  • this ensures that all threads operate at the same time
  • Often used to initialize a bunch of threads before letting them all operate.

Deadlock

The state in which two threads are trying to acquire a lock at the same time

  • This can happen when:
    1. An implementation bug where a Lock is not released properly
    2. A design issue where a utility function needs to be called by functions that might or might not already have the Lock

Implementation

Key takeaway is that threading is for I/O bound problems

CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing or concurrent.futures.ProcessPoolExecutor. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.

Threading Module

Module level functions

import threading

threading.active_count()    # number of active threads
threading.current_thread()  # return current thread object
threading.get_ident()       # get current thead identifier
threading.enumerate()       # enumerate currently alive threads
threading.main_thread()     # return main thread object

# thread local data
mydata = threading.local()
mydata.some_attr = 1

Creating Threads

There are many different ways to create a thread

# create a Thread object
import threading

t = threading.Thread(target=some_function, args=(arg_1,), kwargs={"some": "values"})
t.start()


# override a class
class Worker(threading.Thread):
	def __init__(self):
		Thread.__init__(self, daemon=False)

	def run(self):
		# do some work

worker = Worker()
worker.start()

# create a thread pool
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=3) as executor:
    executor.map(some_function, range(3))


Debugging

The logging module is very helpful for debugging threading and race conditions.

import logging

logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)

# create the handler
# has the ability to create locks as well
ch = logging.StreamHandler()

# set the format
# include the thread name
formatter = logging.Formatter('%(asctime)s [%(threadName)s] %(hostname)s %(name)s[%(process)d] %(levelname)s %(message)s')
ch.setFormatter(formatter)

# add the handler to the logger
logger.addHandler(ch)


# override a class
class Worker(threading.Thread):
	def __init__(self):
		Thread.__init__(self, daemon=False)

	def run(self):
		# do some work
		logger.debug("doing some work")

w = Worker()
w.run()

>>> "doing some work" 

With the code snippet above, you can log within your functions and the name of the thread will be labeled on each log message. This is extremely useful for debugging race conditions.

Examples

Downloading links concurrently

This example takes a set of five links and downloads their content concurrently using 5 threads

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

Producer Consumer

Uses a thread-safe queue, event, and a ThreadPoolExecutor to implement a producer consumer paradigm were messages are sent from a producer to a consumer to be stored.

import concurrent.futures
import logging
import queue
import random
import threading
import time

def producer(queue, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        queue.put(message)

    logging.info("Producer received event. Exiting")

def consumer(queue, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not queue.empty():
        message = queue.get()
        logging.info(
            "Consumer storing message: %s (size=%d)", message, queue.qsize()
        )

    logging.info("Consumer received event. Exiting")

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    pipeline = queue.Queue(maxsize=10)
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(1)
        logging.info("Main: about to set event")
        event.set()

Counter with threading Locks

import logging
import random
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )
                    
class Counter(object):
    def __init__(self, start=0):
        self.lock = threading.Lock()
        self.value = start
    def increment(self):
        logging.debug('Waiting for lock')
        self.lock.acquire()
        try:
            logging.debug('Acquired lock')
            self.value = self.value + 1
        finally:
            self.lock.release()

def worker(c):
    for i in range(2):
        pause = random.random()
        logging.debug('Sleeping %0.02f', pause)
        time.sleep(pause)
        c.increment()
    logging.debug('Done')

counter = Counter()
for i in range(2):
    t = threading.Thread(target=worker, args=(counter,))
    t.start()

logging.debug('Waiting for worker threads')
main_thread = threading.currentThread()
for t in threading.enumerate():
    if t is not main_thread:
        t.join()
logging.debug('Counter: %d', counter.value)

Links

  1. IronPython and Jython are two Python implementations without a GIL. 

  2. Multiprocessing has its own synchronization primitives that largely mirror that of the Threading API but work for multiple processes instead of multiple threads.