Blog

Threading, Multiprocessing and Asyncio in Python (Part 3)

Contents at a Glance

  1. Introduction to Asynchrony
    1. What is "asynchronous code"?
    2. Comparison of three ways to get asynchronous code
  2. Threading
    1. Threading. Creation and handling
    2. Threading. Synchronization primitives: Lock
    3. Threading. Synchronization primitives: Lock (continuation), Event, Condition, Semaphore, Queue 🔒
    4. Web Scraping by Threading 🔒
  3. Multiprocessing 🔒
    1. Multiprocessing. Creation and handling 🔒
    2. Multiprocessing: Synchronization primitives 🔒
  4. Asyncio Package
    1. Generator as an asynchronous function
    2. Coroutines, Tasks and Event Loop
    3. From Generators to Coroutines and Tasks 🔒
    4. Web scraping by aiohttp Package 🔒
    5. Handling files by aiofiles Package 🔒
    6. Asyncio Synchronization Primitives 🔒
  5. Additional Packages and Methods for Creating Asynchronous Code 🔒
    1. Subprocess Package 🔒
    2. Concurrent.futures Package 🔒
    3. Sockets - timeout() Method and select Package 🔒
    4. curio and trio Packages 🔒

2. Threading
2.1 Threading. Creation and handling

The threading package is part of the Python standard library starting from version 1.5 (November 1997), so it does not require any additional installation. The minimum requirements for starting a thread are as follows:

  1. Import the threading package.
  2. Write the code that should be executed in the thread, typically in the form of a function.
  3. Create (instantiate) a thread, specifying the target function's name as a parameter. If the function requires arguments, they can be passed using the args parameter in tuple format.
  4. Start the thread using the start() method.

import time
from threading import Thread

def clock(delay):
   time.sleep(delay)
   print(f"Current time: {time.strftime('%X')}, delay {delay} sec.")


thread1 = Thread(target=clock, args=(2,))
thread2 = Thread(target=clock, args=(3,))


if __name__ == '__main__':
   start = time.time()
   print(f"Time start: {time.strftime('%X')}")
   thread1.start()
   thread2.start()
   print(f"Time end: {time.strftime('%X')}")
   print(f'======== Total time: {time.time() - start:0.2f} ========')

Result:

Time start: 07:39:58
Time end: 07:39:58
======== Total time: 0.00 ========
Current time: 07:40:00, delay 2 sec.
Current time: 07:40:01, delay 3 sec.


Process finished with exit code

As seen in the example, the main thread finished instantly (0 sec), while the additional threads finished after 2 and 3 seconds from the script's start time, respectively.

In our case, the additional threads continue their work without waiting for the main thread or the overall process to finish. This behavior is logical because the processor operates faster than external devices. Therefore, before terminating the script, it is necessary to wait for a signal indicating the proper completion of the external devices' operations. Otherwise, there is a risk of losing data that remains in the buffer of these devices. By default, if no specific actions are taken, the main process will wait for all its threads to complete before terminating.

In cases where it is necessary to forcefully terminate a thread simultaneously with the termination of the entire process, a special parameter called daemon is set to True. Please note that there are two ways to do this:

import time
from threading import Thread


def clock(delay):
   time.sleep(delay)
   print(f"Current time: {time.strftime('%X')}, delay {delay} sec.")


thread1 = Thread(target=clock, args=(2,))
thread2 = Thread(target=clock, args=(3,), daemon=True)


if __name__ == '__main__':
   start = time.time()
   print(f"Time start: {time.strftime('%X')}")
   thread1.start()
   thread2.daemon = True
   thread2.start()
   print(f"Time end: {time.strftime('%X')}")
   print(f'======== Total time: {time.time() - start:0.2f} ========')

As we can see, the daemon flag of the second thread was set to True twice. Of course, setting it once would have been enough - the second usage was only for demonstrating an alternative way of setting the flag. The result was that the second thread didn't have enough time to complete its task in one second, so it was terminated prematurely.

Time start: 07:54:41
Time end: 07:54:41
======== Total time: 0.00 ========
Current time: 07:54:43, delay 2 sec.


Process finished with exit code 0

(By the way, you can check if a thread is a daemon using the isDaemon() method. If it is a daemon, the method will return True.)

To find out when exactly the second thread stopped its execution, let's modify the clock function slightly. We will turn its body into a loop with a 1-second delay and print the result each time. In this case, the overall delay will be determined by the number of iterations in the loop.

def clock(delay: int):
   for d in range(1, delay + 1):
       time.sleep(1)
       print(f"Current time: {time.strftime('%X')}, {d}; delay {delay} sec.")

The result shows that the last data from the second thread ends with a 2-second delay. It didn't have time to print the result of its work in the last third second because it was terminated along with the main process in the 2nd second.

Time start: 17:20:42
Time end: 17:20:42
======== Total time: 0.00 ========
Current time: 17:20:43, 1; delay 2 sec.
Current time: 17:20:43, 1; delay 3 sec.
Current time: 17:20:44, 2; delay 2 sec.
Current time: 17:20:44, 2; delay 3 sec.


Process finished with exit code 0

If we also set daemon=True for the second thread, the overall process will terminate instantly, and both threads will leave absolutely no trace of their activity.

Time start: 17:29:27
Time end: 17:29:27
======== Total time: 0.00 ========


Process finished with exit code 0

Thus, we can conclude that the overall duration of the process is determined by the duration of the "longest" non-daemon thread. In the case of a daemon thread, it is determined by the duration of the main thread.

Combining the main and auxiliary threads

Very often, the results of auxiliary threads need to be used in the main thread. In such cases, we can use the join() method, which suspends the execution of the main thread until the joined thread is finished.

if __name__ == '__main__':
   start = time.time()
   print(f"Time start: {time.strftime('%X')}")
   thread1.start()
   thread1.join()
   thread2.start()
   print(f"Time end: {time.strftime('%X')}")
   print(f'======== Total time: {time.time() - start:0.2f} ========')

That's all we changed in the previous script - we simply joined the first thread to the main thread immediately after its start. This led to the following result:

Time start: 17:53:40
Current time: 17:53:41, 1; delay 2 sec.
Current time: 17:53:42, 2; delay 2 sec.
Time end: 17:53:42
======== Total time: 2.00 ========


Process finished with exit code 0

What is noteworthy here is that the duration of the main thread increased to 2 seconds, exactly the time it took for the first thread to complete. By waiting for the first thread to finish, the main thread started the second thread and immediately terminated, halting the entire process. As a result, the second thread also terminated without being able to do any work.

But if we first start both threads and then join the first thread to the main thread,

thread1.start()
thread2.start()
thread1.join()

then the picture will change:

Time start: 18:05:02
Current time: 18:05:03, 1; delay 2 sec.
Current time: 18:05:03, 1; delay 3 sec.
Current time: 18:05:04, 2; delay 2 sec.
Current time: 18:05:04, 2; delay 3 sec.
Time end: 18:05:04
======== Total time: 2.00 ========


Process finished with exit code 0

Key takeaway: First starting all the threads and then joining one of the threads to the main thread allows for the execution of all the started threads. The threads whose execution time is less than or equal to the execution time of the joined thread will complete their execution. All other threads will only work while the joined thread is active (thread2 in our case).

Therefore, if we join the longest-running thread to the main thread, it will enable all the threads to complete their execution. The overall process time in our case will be determined by the execution time of this thread (in our case - 3 seconds).

Time start: 18:14:17
Current time: 18:14:18, 1; delay 2 sec.
Current time: 18:14:18, 1; delay 3 sec.
Current time: 18:14:19, 2; delay 2 sec.
Current time: 18:14:19, 2; delay 3 sec.
Current time: 18:14:20, 3; delay 3 sec.
Time end: 18:14:20
======== Total time: 3.00 ========


Process finished with exit code 0

Now, a small task - try to answer the question yourself: What will happen to the overall execution time in this case: In this case? And in this case?

thread1.start()
thread2.start()
thread1.join()
thread2.join()

, in this case:

thread1.start()
thread2.start()
thread2.join()
thread1.join()

, and in this case?

thread1.start()
thread1.join()
thread2.start()
thread2.join()

And most importantly, try to explain why the overall execution time changed in this particular way and not otherwise.

Refactoring the example with two functions

The code example with two functions that we examined at the very beginning aimed to achieve maximum concurrency. We sent potentially blocking functions to their own threads, where they could no longer interfere with the main thread. This is certainly a huge advantage. However, there is a small drawback - the timing measurement decorator remained in the main thread (in the main() function), and now we don't know how much faster (or slower) the code became after adding threads. Let's join the additional threads to the main thread and see how the execution time has changed.

import time
from threading import Thread
from my_deco import time_counter

N = 5
DELAY = 0.5


def func1(n):
   for i in range(n):
       time.sleep(DELAY)
       print(f'--- line #{i} from {n} is completed')


def func2(n):
   for i in range(n):
       time.sleep(DELAY)
       print(f'=== line #{i} from {n} is completed')


@time_counter
def main():
   thread1 = Thread(target=func1, args=(N,))
   thread2 = Thread(target=func2, args=(N,))
   thread1.start()
   thread2.start()
   thread1.join()
   thread2.join()
   print(f'All functions completed')


if __name__ == '__main__':
   main()
======== Script started ========
--- line #0 from 5 is completed
=== line #0 from 5 is completed
--- line #1 from 5 is completed
=== line #1 from 5 is completed
. . . . . . . . .


All functions completed
======== Script execution time: 2.51 ========

In summary, 2.5 seconds is twice as fast as regular synchronous code. This can be easily explained: now both functions are executed simultaneously, so the overall script execution time is equal to the time taken by each function.

Is there a way to further reduce this time? Most likely, yes. But first, it would be good to understand the reason for the delay.

The reason is that due to the loop inside the function, each subsequent iteration cannot start before the previous one finishes. In other words, each time, we have to wait for the DELAY seconds of delay from the previous iteration to start the next one.

The solution is obvious: we need to keep only one iteration in the function and move the loop to the main function. Then we can create a number of threads equal to the number of iterations of the two functions. And from previous examples, we already know that if we start all these threads simultaneously, the overall execution time will be equal to the time taken by the longest thread.

Well, as they say, done and done:

import time
from threading import Thread
from my_deco import time_counter

N = 5
DELAY = 0.5


def func1(i, n):
   time.sleep(DELAY)
   print(f'--- line #{i} from {n} is completed')


def func2(i, n):
   time.sleep(DELAY)
   print(f'=== line #{i} from {n} is completed')


@time_counter
def main():
   threads = []
   threads1 = [Thread(target=func1, args=(i, N)) for i in range(N)]
   threads2 = [Thread(target=func2, args=(i, N)) for i in range(N)]
   threads.extend(threads1)
   threads.extend(threads2)

   for thread in threads:
       thread.start()
   for thread in threads:
       thread.join()

   print(f'All functions completed')


if __name__ == '__main__':
   main()

And the script running time has now been reduced to the running time of the longest thread:

======== Script started ========
--- line #0 from 5 is completed
=== line #0 from 5 is completed
--- line #1 from 5 is completed
=== line #1 from 5 is completed
. . . . . . . . .


All functions completed
======== Script execution time: 0.51 ========

Creating a Thread Using a Class

In the previous examples, functions were used to create and manage threads. However, the same can be achieved using classes.

In this case:

  1. The class that describes the thread should inherit from the Thread class.
  2. The behavior of the thread is described (or rather, overridden) using the run() method.
  3. All additional parameters, including the daemon flag, are passed using class attributes in the __init__() dunder method.

In the following example, two daemon threads are created. Thanks to the fact that the longest-running thread is joined to the main thread, both threads are able to finish:

import time
from threading import Thread


class ClockThread(Thread):
   def __init__(self, delay):
       super().__init__()
       self.delay = delay
       self.daemon = True


   def run(self):
       time.sleep(self.delay)
       print(f"Current time: {time.strftime('%X')}, delay {self.delay} sec.")


thread_1 = ClockThread(2)
thread_2 = ClockThread(3)


if __name__ == '__main__':
   start = time.time()
   print(f"Time start: {time.strftime('%X')}")
   thread_1.start()
   thread_2.start()
   thread_2.join()
   print(f"Time end: {time.strftime('%X')}")
   print(f'======== Total time: {time.time() - start:0.2f} ========')

The script, as expected, runs for 3 seconds:

Current time: 00:33:01, delay 3 sec.
Time end: 00:33:01
======== Total time: 3.00 ========

Process finished with exit code 0

As we can see, describing threads using classes didn't bring any significant changes. The management of thread objects is still done through the start() and join() methods, and the daemon parameter (attribute) can be specified directly in the __init__() method or later defined (overridden) in an instance of the user-defined class.

You can learn more about all the details of this topic from this video (Russian Voice):



To the next topic




Read more >>

Website Availability Test

It is important for each site owner to be sure that at the moment his online resource is available and working properly. Well, if, suddenly, a problem occurs, then the site owner should find out about it before everyone else.

There are a huge number of paid and shareware services that are ready to provide round-the-clock monitoring of web resources. And, in case of their unavailability, immediately inform the interested party about it.

However, there is a very simple way to run this check yourself, in a convenient mode and completely free of charge.

The idea is simple: using Google Apps Script, we send a request to the specified url and parse the response code. If the response code is 200, it does nothing. Well, if not, we send an error message to our email.

The script that implements this task is below:

function locator() {
  let sites = ['https://it4each.com/', 
               ];

  let myEmail = YourEmail;
  let subject = "Site not working!!!";
  let errors = [];
  
  // request sending and processing loop
  for (const site of sites) {
    try {
      let response = UrlFetchApp.fetch(site);
      if (response.getResponseCode() != 200 ) errors.push(site);
    } catch (e) {
      let error_messege = e.name + ': for website ' + site + '\n';
      console.error(error_messege);
      errors.push(site)
    };
  };

  // send email
  if (errors.length > 0) {
    let message = "";
    for (let error of errors) {
      message += 'Website ' + error + " doesn't working!\n";
    };
    message += '\n' + 'Remaining Daily Quota: ' + MailApp.getRemainingDailyQuota();

    MailApp.sendEmail(myEmail, subject, message)
  };
}

The locator() function monitors the operation of sites. Previously, the following initial data must be passed to this function:

  • List of sites sites;
  • Email address where the error message should be sent myEmail;
  • E-mail subject subject.

Next comes the cycle of sending and processing requests. This is done using the standard fetch(url) method of the UrlFetchApp class.

If the resource is available in principle, but its response code is not 200, then the name of the problematic resource is added to the errors error list on the same line.

But if the resource is not available at all, then UrlFetchApp.fetch(site) will give an error that can cause the program to stop. To prevent this from happening, we will process a variant of such an error through try - catch(e). And adding the name of this site will happen this time in the catch block.

The result will be processed below, in the send email block.

If the list of errors is not empty, then message will be generated in the loop, where all non-working sites will be listed. Additionally, information will be added on how many similar email messages can still be created today in order not to exceed the quota: MailApp.getRemainingDailyQuota().

The script is ready. But in order to carry out full-fledged monitoring, you need to run this script regularly and around the clock. Therefore, we need to install a trigger.

You can learn how to create and configure a trigger, as well as get more information about how this script works, from this video (RU voice):

Read more >>

Tags list

    Apps Script      Arrays Java Script      asynchronous code      asyncio      coroutine      Django      Dropdown List      Drop Shipping      Exceptions      GitHub      Google API      Google Apps Script      Google Docs      Google Drive      Google Sheets      multiprocessing      Parsing      Python      regex      Scraping      ssh      Test Driven Development (TDD)      threading      website monitoring      zip