Blog

Threading, Multiprocessing and Asyncio in Python (Part 3)

Contents at a Glance

  1. Introduction to Asynchrony
    1. What is "asynchronous code"?
    2. Comparison of three ways to get asynchronous code
  2. Threading
    1. Threading. Creation and handling
    2. Threading. Synchronization primitives: Lock
    3. Threading. Synchronization primitives: Lock (continuation), Event, Condition, Semaphore, Queue 🔒
    4. Web Scraping by Threading 🔒
  3. Multiprocessing 🔒
    1. Multiprocessing. Creation and handling 🔒
    2. Multiprocessing: Synchronization primitives 🔒
  4. Asyncio Package
    1. Generator as an asynchronous function
    2. Coroutines, Tasks and Event Loop
    3. From Generators to Coroutines and Tasks 🔒
    4. Web scraping by aiohttp Package 🔒
    5. Handling files by aiofiles Package 🔒
    6. Asyncio Synchronization Primitives 🔒
  5. Additional Packages and Methods for Creating Asynchronous Code 🔒
    1. Subprocess Package 🔒
    2. Concurrent.futures Package 🔒
    3. Sockets - timeout() Method and select Package 🔒
    4. curio and trio Packages 🔒

2. Threading
2.1 Threading. Creation and handling

The threading package is part of the Python standard library starting from version 1.5 (November 1997), so it does not require any additional installation. The minimum requirements for starting a thread are as follows:

  1. Import the threading package.
  2. Write the code that should be executed in the thread, typically in the form of a function.
  3. Create (instantiate) a thread, specifying the target function's name as a parameter. If the function requires arguments, they can be passed using the args parameter in tuple format.
  4. Start the thread using the start() method.

import time
from threading import Thread

def clock(delay):
   time.sleep(delay)
   print(f"Current time: {time.strftime('%X')}, delay {delay} sec.")


thread1 = Thread(target=clock, args=(2,))
thread2 = Thread(target=clock, args=(3,))


if __name__ == '__main__':
   start = time.time()
   print(f"Time start: {time.strftime('%X')}")
   thread1.start()
   thread2.start()
   print(f"Time end: {time.strftime('%X')}")
   print(f'======== Total time: {time.time() - start:0.2f} ========')

Result:

Time start: 07:39:58
Time end: 07:39:58
======== Total time: 0.00 ========
Current time: 07:40:00, delay 2 sec.
Current time: 07:40:01, delay 3 sec.


Process finished with exit code

As seen in the example, the main thread finished instantly (0 sec), while the additional threads finished after 2 and 3 seconds from the script's start time, respectively.

In our case, the additional threads continue their work without waiting for the main thread or the overall process to finish. This behavior is logical because the processor operates faster than external devices. Therefore, before terminating the script, it is necessary to wait for a signal indicating the proper completion of the external devices' operations. Otherwise, there is a risk of losing data that remains in the buffer of these devices. By default, if no specific actions are taken, the main process will wait for all its threads to complete before terminating.

In cases where it is necessary to forcefully terminate a thread simultaneously with the termination of the entire process, a special parameter called daemon is set to True. Please note that there are two ways to do this:

import time
from threading import Thread


def clock(delay):
   time.sleep(delay)
   print(f"Current time: {time.strftime('%X')}, delay {delay} sec.")


thread1 = Thread(target=clock, args=(2,))
thread2 = Thread(target=clock, args=(3,), daemon=True)


if __name__ == '__main__':
   start = time.time()
   print(f"Time start: {time.strftime('%X')}")
   thread1.start()
   thread2.daemon = True
   thread2.start()
   print(f"Time end: {time.strftime('%X')}")
   print(f'======== Total time: {time.time() - start:0.2f} ========')

As we can see, the daemon flag of the second thread was set to True twice. Of course, setting it once would have been enough - the second usage was only for demonstrating an alternative way of setting the flag. The result was that the second thread didn't have enough time to complete its task in one second, so it was terminated prematurely.

Time start: 07:54:41
Time end: 07:54:41
======== Total time: 0.00 ========
Current time: 07:54:43, delay 2 sec.


Process finished with exit code 0

(By the way, you can check if a thread is a daemon using the isDaemon() method. If it is a daemon, the method will return True.)

To find out when exactly the second thread stopped its execution, let's modify the clock function slightly. We will turn its body into a loop with a 1-second delay and print the result each time. In this case, the overall delay will be determined by the number of iterations in the loop.

def clock(delay: int):
   for d in range(1, delay + 1):
       time.sleep(1)
       print(f"Current time: {time.strftime('%X')}, {d}; delay {delay} sec.")

The result shows that the last data from the second thread ends with a 2-second delay. It didn't have time to print the result of its work in the last third second because it was terminated along with the main process in the 2nd second.

Time start: 17:20:42
Time end: 17:20:42
======== Total time: 0.00 ========
Current time: 17:20:43, 1; delay 2 sec.
Current time: 17:20:43, 1; delay 3 sec.
Current time: 17:20:44, 2; delay 2 sec.
Current time: 17:20:44, 2; delay 3 sec.


Process finished with exit code 0

If we also set daemon=True for the second thread, the overall process will terminate instantly, and both threads will leave absolutely no trace of their activity.

Time start: 17:29:27
Time end: 17:29:27
======== Total time: 0.00 ========


Process finished with exit code 0

Thus, we can conclude that the overall duration of the process is determined by the duration of the "longest" non-daemon thread. In the case of a daemon thread, it is determined by the duration of the main thread.

Combining the main and auxiliary threads

Very often, the results of auxiliary threads need to be used in the main thread. In such cases, we can use the join() method, which suspends the execution of the main thread until the joined thread is finished.

if __name__ == '__main__':
   start = time.time()
   print(f"Time start: {time.strftime('%X')}")
   thread1.start()
   thread1.join()
   thread2.start()
   print(f"Time end: {time.strftime('%X')}")
   print(f'======== Total time: {time.time() - start:0.2f} ========')

That's all we changed in the previous script - we simply joined the first thread to the main thread immediately after its start. This led to the following result:

Time start: 17:53:40
Current time: 17:53:41, 1; delay 2 sec.
Current time: 17:53:42, 2; delay 2 sec.
Time end: 17:53:42
======== Total time: 2.00 ========


Process finished with exit code 0

What is noteworthy here is that the duration of the main thread increased to 2 seconds, exactly the time it took for the first thread to complete. By waiting for the first thread to finish, the main thread started the second thread and immediately terminated, halting the entire process. As a result, the second thread also terminated without being able to do any work.

But if we first start both threads and then join the first thread to the main thread,

thread1.start()
thread2.start()
thread1.join()

then the picture will change:

Time start: 18:05:02
Current time: 18:05:03, 1; delay 2 sec.
Current time: 18:05:03, 1; delay 3 sec.
Current time: 18:05:04, 2; delay 2 sec.
Current time: 18:05:04, 2; delay 3 sec.
Time end: 18:05:04
======== Total time: 2.00 ========


Process finished with exit code 0

Key takeaway: First starting all the threads and then joining one of the threads to the main thread allows for the execution of all the started threads. The threads whose execution time is less than or equal to the execution time of the joined thread will complete their execution. All other threads will only work while the joined thread is active (thread2 in our case).

Therefore, if we join the longest-running thread to the main thread, it will enable all the threads to complete their execution. The overall process time in our case will be determined by the execution time of this thread (in our case - 3 seconds).

Time start: 18:14:17
Current time: 18:14:18, 1; delay 2 sec.
Current time: 18:14:18, 1; delay 3 sec.
Current time: 18:14:19, 2; delay 2 sec.
Current time: 18:14:19, 2; delay 3 sec.
Current time: 18:14:20, 3; delay 3 sec.
Time end: 18:14:20
======== Total time: 3.00 ========


Process finished with exit code 0

Now, a small task - try to answer the question yourself: What will happen to the overall execution time in this case: In this case? And in this case?

thread1.start()
thread2.start()
thread1.join()
thread2.join()

, in this case:

thread1.start()
thread2.start()
thread2.join()
thread1.join()

, and in this case?

thread1.start()
thread1.join()
thread2.start()
thread2.join()

And most importantly, try to explain why the overall execution time changed in this particular way and not otherwise.

Refactoring the example with two functions

The code example with two functions that we examined at the very beginning aimed to achieve maximum concurrency. We sent potentially blocking functions to their own threads, where they could no longer interfere with the main thread. This is certainly a huge advantage. However, there is a small drawback - the timing measurement decorator remained in the main thread (in the main() function), and now we don't know how much faster (or slower) the code became after adding threads. Let's join the additional threads to the main thread and see how the execution time has changed.

import time
from threading import Thread
from my_deco import time_counter

N = 5
DELAY = 0.5


def func1(n):
   for i in range(n):
       time.sleep(DELAY)
       print(f'--- line #{i} from {n} is completed')


def func2(n):
   for i in range(n):
       time.sleep(DELAY)
       print(f'=== line #{i} from {n} is completed')


@time_counter
def main():
   thread1 = Thread(target=func1, args=(N,))
   thread2 = Thread(target=func2, args=(N,))
   thread1.start()
   thread2.start()
   thread1.join()
   thread2.join()
   print(f'All functions completed')


if __name__ == '__main__':
   main()
======== Script started ========
--- line #0 from 5 is completed
=== line #0 from 5 is completed
--- line #1 from 5 is completed
=== line #1 from 5 is completed
. . . . . . . . .


All functions completed
======== Script execution time: 2.51 ========

In summary, 2.5 seconds is twice as fast as regular synchronous code. This can be easily explained: now both functions are executed simultaneously, so the overall script execution time is equal to the time taken by each function.

Is there a way to further reduce this time? Most likely, yes. But first, it would be good to understand the reason for the delay.

The reason is that due to the loop inside the function, each subsequent iteration cannot start before the previous one finishes. In other words, each time, we have to wait for the DELAY seconds of delay from the previous iteration to start the next one.

The solution is obvious: we need to keep only one iteration in the function and move the loop to the main function. Then we can create a number of threads equal to the number of iterations of the two functions. And from previous examples, we already know that if we start all these threads simultaneously, the overall execution time will be equal to the time taken by the longest thread.

Well, as they say, done and done:

import time
from threading import Thread
from my_deco import time_counter

N = 5
DELAY = 0.5


def func1(i, n):
   time.sleep(DELAY)
   print(f'--- line #{i} from {n} is completed')


def func2(i, n):
   time.sleep(DELAY)
   print(f'=== line #{i} from {n} is completed')


@time_counter
def main():
   threads = []
   threads1 = [Thread(target=func1, args=(i, N)) for i in range(N)]
   threads2 = [Thread(target=func2, args=(i, N)) for i in range(N)]
   threads.extend(threads1)
   threads.extend(threads2)

   for thread in threads:
       thread.start()
   for thread in threads:
       thread.join()

   print(f'All functions completed')


if __name__ == '__main__':
   main()

And the script running time has now been reduced to the running time of the longest thread:

======== Script started ========
--- line #0 from 5 is completed
=== line #0 from 5 is completed
--- line #1 from 5 is completed
=== line #1 from 5 is completed
. . . . . . . . .


All functions completed
======== Script execution time: 0.51 ========

Creating a Thread Using a Class

In the previous examples, functions were used to create and manage threads. However, the same can be achieved using classes.

In this case:

  1. The class that describes the thread should inherit from the Thread class.
  2. The behavior of the thread is described (or rather, overridden) using the run() method.
  3. All additional parameters, including the daemon flag, are passed using class attributes in the __init__() dunder method.

In the following example, two daemon threads are created. Thanks to the fact that the longest-running thread is joined to the main thread, both threads are able to finish:

import time
from threading import Thread


class ClockThread(Thread):
   def __init__(self, delay):
       super().__init__()
       self.delay = delay
       self.daemon = True


   def run(self):
       time.sleep(self.delay)
       print(f"Current time: {time.strftime('%X')}, delay {self.delay} sec.")


thread_1 = ClockThread(2)
thread_2 = ClockThread(3)


if __name__ == '__main__':
   start = time.time()
   print(f"Time start: {time.strftime('%X')}")
   thread_1.start()
   thread_2.start()
   thread_2.join()
   print(f"Time end: {time.strftime('%X')}")
   print(f'======== Total time: {time.time() - start:0.2f} ========')

The script, as expected, runs for 3 seconds:

Current time: 00:33:01, delay 3 sec.
Time end: 00:33:01
======== Total time: 3.00 ========

Process finished with exit code 0

As we can see, describing threads using classes didn't bring any significant changes. The management of thread objects is still done through the start() and join() methods, and the daemon parameter (attribute) can be specified directly in the __init__() method or later defined (overridden) in an instance of the user-defined class.

You can learn more about all the details of this topic from this video (Russian Voice):



To the next topic