Blog

Threading, Multiprocessing and Asyncio in Python (Part 4)

Contents at a Glance

  1. Introduction to Asynchrony
    1. What is "asynchronous code"?
    2. Comparison of three ways to get asynchronous code
  2. Threading
    1. Threading. Creation and handling
    2. Threading. Synchronization primitives: Lock
    3. Threading. Synchronization primitives: Lock (continuation), Event, Condition, Semaphore, Queue 🔒
    4. Web Scraping by Threading 🔒
  3. Multiprocessing 🔒
    1. Multiprocessing. Creation and handling 🔒
    2. Multiprocessing: Synchronization primitives 🔒
  4. Asyncio Package
    1. Generator as an asynchronous function
    2. Coroutines, Tasks and Event Loop
    3. From Generators to Coroutines and Tasks 🔒
    4. Web scraping by aiohttp Package 🔒
    5. Handling files by aiofiles Package 🔒
    6. Asyncio Synchronization Primitives 🔒
  5. Additional Packages and Methods for Creating Asynchronous Code 🔒
    1. Subprocess Package 🔒
    2. Concurrent.futures Package 🔒
    3. Sockets - timeout() Method and select Package 🔒
    4. curio and trio Packages 🔒

2. Threading
2.2 Threading. Synchronization primitives: Lock

The threads we have used so far did not interact with each other or the main thread. All they did was simply print their own results.

However, in practice, fully autonomous threads are more of an exception than the rule. More often, threads need to exchange data with each other or collectively use (modify) data that is in the main thread. In this case, there is an objective need to synchronize the actions of these threads.

It is particularly important to note the following: the use of synchronization primitives itself does not make asynchronous code synchronous (unless, of course, we are talking about programmer errors 😉). Synchronization primitives only synchronize individual threads (or individual processes in the case of the multiprocessing package, or individual coroutines in the case of the asyncio package) but by no means turn asynchronous code into synchronous!

Let's consider the simplest example of such interaction - simultaneous shared access of multiple threads to a single variable from the main thread.

As seen from the following example, multiple threads increment the value of the shared variable val within a loop:

from threading import Thread
from my_deco import time_counter

val = 0
COUNT = 100
NUM_THREADS = 100

def increase():
   global val
   for _ in range(COUNT):
       val += 1

@time_counter
def main():
   threads = [Thread(target=increase) for _ in range(NUM_THREADS)]

   for thread in threads:
       thread.start()
   for thread in threads:
       thread.join()


   diff = val - COUNT * NUM_THREADS
   print(f'diff = {diff}')

if __name__ == '__main__':
   main()

If it were not for threads, this construction could be considered as two nested loops:

  • the loop inside the thread as the inner loop,
  • and the threads themselves as the outer loop.
Based on this, the final value of the variable val should be equal to the product of the number of iterations of the two loops, i.e., the number of threads multiplied by the number of inner loops (in our case, it would be 100 * 100 = 10,000).

This would indeed be the case if the += operation were thread-safe. However, in reality, this is far from true.

First and foremost, it is important to note that a single line of code actually represents 4 sequential actions:

  1. Retrieving the current value of the variable val.
  2. Retrieving the value of the increment (in our case, it is 1).
  3. Adding the two numbers together (val + 1).
  4. Writing the result as the new value of the variable val.

Therefore, there is a non-zero probability that between steps 1 and 4, another thread may interleave with a different value of val. As a result, when overwriting this variable in step 4, the value that was increased in one of these threads will be lost. This phenomenon is known as a 'race condition.'

In our example, it is quite difficult to observe this effect since the values of the initial variables, namely the number of threads COUNT, the number of iterations NUM_THREADS, and the thread-switching interval, are not sufficient for the consistent manifestation of this effect.

By the way, the default thread-switching interval can be obtained using the getswitchinterval() method from the well-known sys package.

import sys


interval = sys.getswitchinterval()
print(f'switchinterval = {interval}')


# switchinterval = 0.005

We can modify the value of the thread-switching interval using the sys.setswitchinterval(new_interval) method, but unfortunately, we cannot decrease it to the level where the race condition effect will manifest. However, we can programmatically modify our code to slow down the increment of the val value. To achieve this, we will separate

  • the calculation of the new value of the variable val
  • and the replacement of the old value with the new one.
To make it more convincing, we will add a delay of 0.001 seconds between these two calculations:

import time
from threading import Thread
from my_deco import time_counter

val = 0
COUNT = 100
NUM_THREADS = 100

def increase():
   global val
   for _ in range(COUNT):
       new_val = val + 1
       time.sleep(0.001)
       val = new_val


@time_counter
def main():
   threads = [Thread(target=increase) for _ in range(NUM_THREADS)]

   for thread in threads:
       thread.start()
   for thread in threads:
       thread.join()

   diff = val - COUNT * NUM_THREADS
   print(f'diff = {diff}')


if __name__ == '__main__':
   main()

In this case, the difference diff will be significantly different from zero.

Thus, the "thread-UNsafe" access to shared variables is proven. How can we fix this situation?

For this purpose, Python provides so-called synchronization primitives to address these issues.

Perhaps the simplest, basic, and most commonly used synchronization primitive is the Lock() object, which operates according to the following algorithm:

  1. Before allowing a thread to start modifying data, it checks if another thread has already started this modification.
  2. If modifications have already been initiated by another thread, the current thread is put into a queue.
  3. When the queue reaches the waiting thread, it gains access to modify the data, while simultaneously preventing any other thread from making changes - using the acquire() method.
  4. After completing the modifications, the current thread releases the lock, and the right to make changes is passed to the next thread in the queue - using the release() method.

import time
from threading import Thread, Lock
from my_deco import time_counter

COUNT = 100
NUM_THREADS = 100
val = 0
val_lock = Lock()


def increase():
   global val
   for _ in range(COUNT):
       val_lock.acquire()
       new_val = val + 1
       time.sleep(0.001)
       val = new_val
       val_lock.release()


@time_counter
def main():
   threads = [Thread(target=increase) for _ in range(NUM_THREADS)]


   for thread in threads:
       thread.start()
   for thread in threads:
       thread.join()

   diff = val - COUNT * NUM_THREADS
   print(f'diff = {diff}')


if __name__ == '__main__':
   main()

As we can see, the application of the Lock object has produced the expected results.

(By the way, sometimes this type of lock is referred to as a Mutex - a synchronization primitive used to protect shared resources from simultaneous access by multiple threads. It represents a lock that a thread can hold or release. Only one thread can hold the mutex at any given time, and all other threads attempting to acquire the mutex will be blocked until it is released.)

There is also a more convenient and compact way of using the Lock object, which avoids the explicit use of the acquire() and release() methods mentioned earlier. You will be introduced to this method, along with other synchronization primitives and their usage examples, in the advanced version of this course

You can learn more about all the details of this topic from this video (Russian Voice):



To the next topic