The threads we have used so far did not interact with each other or the main thread. All they did was simply print their own results.
However, in practice, fully autonomous threads are more of an exception than the rule. More often, threads need to exchange data with each other or collectively use (modify) data that is in the main thread. In this case, there is an objective need to synchronize the actions of these threads.
It is particularly important to note the following: the use of synchronization primitives itself does not make asynchronous code synchronous (unless, of course, we are talking about programmer errors 😉). Synchronization primitives only synchronize individual threads (or individual processes in the case of the multiprocessing package, or individual coroutines in the case of the asyncio package) but by no means turn asynchronous code into synchronous!
Let's consider the simplest example of such interaction - simultaneous shared access of multiple threads to a single variable from the main thread.
As seen from the following example, multiple threads increment the value of the shared variable val within a loop:
from threading import Thread
from my_deco import time_counter
val = 0
COUNT = 100
NUM_THREADS = 100
def increase():
global val
for _ in range(COUNT):
val += 1
@time_counter
def main():
threads = [Thread(target=increase) for _ in range(NUM_THREADS)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
diff = val - COUNT * NUM_THREADS
print(f'diff = {diff}')
if __name__ == '__main__':
main()
If it were not for threads, this construction could be considered as two nested loops:
This would indeed be the case if the += operation were thread-safe. However, in reality, this is far from true.
First and foremost, it is important to note that a single line of code actually represents 4 sequential actions:
Therefore, there is a non-zero probability that between steps 1 and 4, another thread may interleave with a different value of val. As a result, when overwriting this variable in step 4, the value that was increased in one of these threads will be lost. This phenomenon is known as a 'race condition.'
In our example, it is quite difficult to observe this effect since the values of the initial variables, namely the number of threads COUNT, the number of iterations NUM_THREADS, and the thread-switching interval, are not sufficient for the consistent manifestation of this effect.
By the way, the default thread-switching interval can be obtained using the getswitchinterval() method from the well-known sys package.
import sys
interval = sys.getswitchinterval()
print(f'switchinterval = {interval}')
# switchinterval = 0.005
We can modify the value of the thread-switching interval using the sys.setswitchinterval(new_interval) method, but unfortunately, we cannot decrease it to the level where the race condition effect will manifest. However, we can programmatically modify our code to slow down the increment of the val value. To achieve this, we will separate
import time
from threading import Thread
from my_deco import time_counter
val = 0
COUNT = 100
NUM_THREADS = 100
def increase():
global val
for _ in range(COUNT):
new_val = val + 1
time.sleep(0.001)
val = new_val
@time_counter
def main():
threads = [Thread(target=increase) for _ in range(NUM_THREADS)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
diff = val - COUNT * NUM_THREADS
print(f'diff = {diff}')
if __name__ == '__main__':
main()
In this case, the difference diff will be significantly different from zero.
Thus, the "thread-UNsafe" access to shared variables is proven. How can we fix this situation?
For this purpose, Python provides so-called synchronization primitives to address these issues.
Perhaps the simplest, basic, and most commonly used synchronization primitive is the Lock() object, which operates according to the following algorithm:
import time
from threading import Thread, Lock
from my_deco import time_counter
COUNT = 100
NUM_THREADS = 100
val = 0
val_lock = Lock()
def increase():
global val
for _ in range(COUNT):
val_lock.acquire()
new_val = val + 1
time.sleep(0.001)
val = new_val
val_lock.release()
@time_counter
def main():
threads = [Thread(target=increase) for _ in range(NUM_THREADS)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
diff = val - COUNT * NUM_THREADS
print(f'diff = {diff}')
if __name__ == '__main__':
main()
As we can see, the application of the Lock object has produced the expected results.
(By the way, sometimes this type of lock is referred to as a Mutex - a synchronization primitive used to protect shared resources from simultaneous access by multiple threads. It represents a lock that a thread can hold or release. Only one thread can hold the mutex at any given time, and all other threads attempting to acquire the mutex will be blocked until it is released.)
There is also a more convenient and compact way of using the Lock object, which avoids the explicit use of the acquire() and release() methods mentioned earlier. You will be introduced to this method, along with other synchronization primitives and their usage examples, in the advanced version of this course
You can learn more about all the details of this topic from this video (Russian Voice):