Threading in Python
Multithreading vs Multiprocessing
- Multithreading is scaling up
- MT is good for I/O bound problems
- Multiprocessing is scaling out
- MP is good for cpu bound problems
Threads
Threads allow for a separate flow of execution within a program. In Python, because of the Global Interpreter lock (GIL) things don’t actually happen simultaneously, but merely appear to.
- Even though they aren’t running at the same time, they may be located on separate processors. But only one will be running at once.
There are some ways to make Python code actually execute in parallel
- Multiprocessing
- Subprocess
- C extensions
- C++ Pybind
- Other implementations of Python that don’t have a GIL.1 None of these methods above are bound by the GIL.
The order in which threads run is determined by the operating system.
Daemons
- Killed when the main thread dies
- Doesn’t wait for daemon to finish
Primitives
Locks
- Locks are a basic primitive in python. Exposed through
threading.lock
- Often also called a mutex
- Locks are like a hall pass. Only one student can be in the hallway at once.
- Can be used with context managers which greatly reduces the chance that an exception will skip over the release of the lock
import threading lock = threading.Lock() with lock: # perform some thread safe operation # to be safe with exceptions use try/except/finally lock.acquire() try: # run some code except Exception: # catch and handle some exception finally: # release the lock lock.release()
- If two pieces of code are trying to acquire the same resource, they must share the same lock. (Same reference to the lock)
RLocks
Like locks except for they support being able to acquire the lock multiple times before operation.
- RLocks still needed to be acquired the same number of times that they are released.
- Good for recursive functions that need to acquire a lock multiple times.
Semaphores
Semaphores are counters with the special property that they are atomic. OS will never swap out threads when incrementing or decrementing the counter. 2
- Semaphores are frequently used to protect a resource that has a limited capacity. An example would be if you have a pool of connections and want to limit the size of that pool to a specific number.
Barrier
Like an MPI Barrier, all threads call wait at the barrier until some condition is reached
- this ensures that all threads operate at the same time
- Often used to initialize a bunch of threads before letting them all operate.
Deadlock
The state in which two threads are trying to acquire a lock at the same time
- This can happen when:
- An implementation bug where a Lock is not released properly
- A design issue where a utility function needs to be called by functions that might or might not already have the Lock
Implementation
Key takeaway is that threading is for I/O bound problems
CPython implementation detail: In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing or concurrent.futures.ProcessPoolExecutor. However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.
Threading Module
Module level functions
import threading
threading.active_count() # number of active threads
threading.current_thread() # return current thread object
threading.get_ident() # get current thead identifier
threading.enumerate() # enumerate currently alive threads
threading.main_thread() # return main thread object
# thread local data
mydata = threading.local()
mydata.some_attr = 1
Creating Threads
There are many different ways to create a thread
# create a Thread object
import threading
t = threading.Thread(target=some_function, args=(arg_1,), kwargs={"some": "values"})
t.start()
# override a class
class Worker(threading.Thread):
def __init__(self):
Thread.__init__(self, daemon=False)
def run(self):
# do some work
worker = Worker()
worker.start()
# create a thread pool
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=3) as executor:
executor.map(some_function, range(3))
Debugging
The logging module is very helpful for debugging threading and race conditions.
import logging
logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)
# create the handler
# has the ability to create locks as well
ch = logging.StreamHandler()
# set the format
# include the thread name
formatter = logging.Formatter('%(asctime)s [%(threadName)s] %(hostname)s %(name)s[%(process)d] %(levelname)s %(message)s')
ch.setFormatter(formatter)
# add the handler to the logger
logger.addHandler(ch)
# override a class
class Worker(threading.Thread):
def __init__(self):
Thread.__init__(self, daemon=False)
def run(self):
# do some work
logger.debug("doing some work")
w = Worker()
w.run()
>>> "doing some work"
With the code snippet above, you can log within your functions and the name of the thread will be labeled on each log message. This is extremely useful for debugging race conditions.
Examples
Downloading links concurrently
This example takes a set of five links and downloads their content concurrently using 5 threads
- taken from here
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
Producer Consumer
Uses a thread-safe queue, event, and a ThreadPoolExecutor to implement a producer consumer paradigm were messages are sent from a producer to a consumer to be stored.
import concurrent.futures
import logging
import queue
import random
import threading
import time
def producer(queue, event):
"""Pretend we're getting a number from the network."""
while not event.is_set():
message = random.randint(1, 101)
logging.info("Producer got message: %s", message)
queue.put(message)
logging.info("Producer received event. Exiting")
def consumer(queue, event):
"""Pretend we're saving a number in the database."""
while not event.is_set() or not queue.empty():
message = queue.get()
logging.info(
"Consumer storing message: %s (size=%d)", message, queue.qsize()
)
logging.info("Consumer received event. Exiting")
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
pipeline = queue.Queue(maxsize=10)
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)
time.sleep(1)
logging.info("Main: about to set event")
event.set()
Counter with threading Locks
import logging
import random
import threading
import time
logging.basicConfig(level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
class Counter(object):
def __init__(self, start=0):
self.lock = threading.Lock()
self.value = start
def increment(self):
logging.debug('Waiting for lock')
self.lock.acquire()
try:
logging.debug('Acquired lock')
self.value = self.value + 1
finally:
self.lock.release()
def worker(c):
for i in range(2):
pause = random.random()
logging.debug('Sleeping %0.02f', pause)
time.sleep(pause)
c.increment()
logging.debug('Done')
counter = Counter()
for i in range(2):
t = threading.Thread(target=worker, args=(counter,))
t.start()
logging.debug('Waiting for worker threads')
main_thread = threading.currentThread()
for t in threading.enumerate():
if t is not main_thread:
t.join()
logging.debug('Counter: %d', counter.value)