Blog

Threading, Multiprocessing and Asyncio in Python (Part 5)

Contents at a Glance

  1. Introduction to Asynchrony
    1. What is "asynchronous code"?
    2. Comparison of three ways to get asynchronous code
  2. Threading
    1. Threading. Creation and handling
    2. Threading. Synchronization primitives: Lock
    3. Threading. Synchronization primitives: Lock (continuation), Event, Condition, Semaphore, Queue 🔒
    4. Web Scraping by Threading 🔒
  3. Multiprocessing 🔒
    1. Multiprocessing. Creation and handling 🔒
    2. Multiprocessing: Synchronization primitives 🔒
  4. Asyncio Package
    1. Generator as an asynchronous function
    2. Coroutines, Tasks and Event Loop
    3. From Generators to Coroutines and Tasks 🔒
    4. Web scraping by aiohttp Package 🔒
    5. Handling files by aiofiles Package 🔒
    6. Asyncio Synchronization Primitives 🔒
  5. Additional Packages and Methods for Creating Asynchronous Code 🔒
    1. Subprocess Package 🔒
    2. Concurrent.futures Package 🔒
    3. Sockets - timeout() Method and select Package 🔒
    4. curio and trio Packages 🔒

4. Asyncio Package
4.1 Generator as an asynchronous function

And finally, we come to the third way of creating asynchronous code, where all the program code is contained within not only the same process but also the same thread (see the diagram in the introduction).

In the previous two cases (threading and multiprocessing packages), there were no specific requirements for the source code. To turn this code into asynchronous, we simply took a blocking (or "slow" function) and placed it in a separate thread or process. And we did this without any changes to the original function, as we placed these functions in a separate process or thread managed by the operating system.

However, when we attempt to achieve asynchronicity within the same process and thread, we can no longer rely on the assistance of the operating system. We are left to rely on ourselves, which means we cannot avoid making significant changes to the original source code.

Armed with this idea, let's once again recall those two "slow" functions from our very first example at the beginning of this course:

import time
from my_deco import time_counter
N = 5
DELAY = 0.5


def func1(n):
    for i in range(n):
        time.sleep(DELAY)
        print(f'--- line #{i} from {n} is completed')


def func2(n):
    for i in range(n):
        time.sleep(DELAY)
        print(f'=== line #{i} from {n} is completed')


@time_counter
def main():
    func1(N)
    func2(N)
    print(f'All functions completed')


if __name__ == '__main__':
    main()

As we already know well, when we call the function func1(n), the further execution of the main program will be suspended until this function completes all its iterations. Only after that, control will move to the next line of code.

In other words, a regular function has the property of blocking the execution of the main code from the moment of its invocation until its complete completion.

However, in Python, there is a wonderful object called a generator, which can also be considered as a kind of function. But it's a function without blocking. It's a function that can be executed "partially" or "step-by-step." Each time it is called, it doesn't complete its execution but only advances by "one step," one iteration, and no more. However, it remembers its state, the current step it stopped at, so that it doesn't repeat itself and can continue its work from the next step.

The generator is incredibly popular in Python, so there is no doubt that most readers are very familiar with what it is. Nevertheless, it is still worth saying a few introductory words on this topic.

Generators in Python

Below is an example of a generator function gen():

def gen(seq: iter):
    for item in seq:
        yield item


def main():
    data = [0, 1, 2, 3]

    for i in gen(data):
        print(i)


if __name__ == '__main__':
    main()

In this case, the yield statement serves as the exact stopping point where the generator temporarily suspends its execution and resumes it upon the next call.

Therefore, you cannot simply run the generator like a regular function once and wait for the result. The generator needs to be continuously managed. This is precisely what the main() function does in our case.

In this example, the generator's data is extracted using a loop. This is perhaps the simplest way to work with a generator. However, for our case, this approach is not entirely suitable because the loop strictly retrieves all the elements of the generator in sequential order. As a result, this construction (generator + its management from the main() function) ends up behaving similar to a loop in a regular (blocking) function.

Hence, we will utilize the __next__() method (or the next() function), which allows for arbitrary access to the generator:

def gen(seq: iter):
    for item in seq:
        yield item


def main():
    data = [0, 1, 2, 3]

    while True:
        print(next(gen(data)))


if __name__ == '__main__':
    main()

However, in this case, we end up with an infinite loop where the generator returns the same initial value of 0 every time. To fix this, the generator needs to be initialized first.

Initialization of the generator is done by calling the function that contains the yield keyword. When the generator function is called in the code, it doesn't execute immediately but returns a generator object. This object can be used to iterate over the sequence of values generated by the generator function:

def gen(seq: iter):
    for item in seq:
        yield item


def main():
    data = [0, 1, 2, 3]

    # initialization
    g = gen(data)

    while True:
        print(next(g))


if __name__ == '__main__':
    main()

Well, you're almost there. However, after exhausting all the values from the generator, a StopIteration exception is raised, which would make sense to catch:

def gen(seq: iter):
    for item in seq:
        yield item


def main():
    data = [0, 1, 2, 3]

    # initialization
    g = gen(data)

    while True:
        try:
            print(next(g))
        except StopIteration:
            print('the generator is exhausted')
            break


if __name__ == '__main__':
    main()

Well, there you have it. Everything is in order now - we have complete control over the process of extracting values from the generator. And if needed, we can sequentially extract values from multiple generator functions, which externally appears as parallel execution of these functions.

To conclude this brief overview of generator topic, let's add two final touches:
  1. The loop in the generator function gen() can be written much more compactly: yield from seq.
  2. The iterator in the form of a list [0, 1, 2, 3] that is passed to the generator can be written more compactly as the range object: range(4).
Here's the updated code, taking into account the two last additions:

def gen(seq: iter):
    yield from seq


def main():
    data = range(4)  # [0, 1, 2, 3] (not equal, but about the same in your case!)

    # initialization
    g = gen(data)

    while True:
        try:
            print(next(g))
        except StopIteration:
            print('the generator is exhausted')
            break


if __name__ == '__main__':
    main()

Replacing Blocking Functions with Generators

As we just learned from the previous section, it's not enough to replace functions with generators, we also need to manage these generators.

Thus, there arises a need for another dispatcher function, called main(), which controls the execution of generator functions. It can also be referred to as an Event Loop since each event of receiving a new value from a generator is born within the depths of the event loop.

If there are two or more generators, the task for the event loop becomes slightly more complex since each generator needs to be called in turn.

def gen(seq: iter):
    yield from seq


def main():
    data1 = range(5)
    data2 = data1

    g1 = gen(data1)
    g2 = gen(data2)

    while True:
        try:
            print(next(g1))
            print(next(g2))
        except StopIteration:
            print('the generators are exhausted')
            break


if __name__ == '__main__':
    main()

This code already bears a strong resemblance to our recent example with threads , as the generator functions g1() and g2() behave in a similar way in our example: they no longer block the execution of the main program until they are completed. Therefore, both generator functions now run in parallel.

However, in this example, the event loop appears to be somewhat simplified, as it does not take into account that the generators can yield sequences of different lengths. Below is an adjusted version that addresses this issue:

def gen(seq: iter):
    yield from seq


def main():
    data1 = range(5)
    data2 = range(15, 18)

    g1 = gen(data1)
    g2 = gen(data2)
    g1_not_exhausted = True
    g2_not_exhausted = True

    while g1_not_exhausted or g2_not_exhausted:
        if g1_not_exhausted:
            try:
                print(next(g1))
            except StopIteration:
                print('the generator 1 is exhausted')
                g1_not_exhausted = False

        if g2_not_exhausted:
            try:
                print(next(g2))
            except StopIteration:
                print('the generator 2 is exhausted')
                g2_not_exhausted = False

Now we can refactor our initial example where regular functions func1() and func2() will be transformed into generators gen1() and gen2():

import time
from my_deco import time_counter

N = 5
DELAY = 0.5


def gen1(n):
    for i in range(n):
        yield
        time.sleep(DELAY)
        print(f'--- line #{i} from {n} is completed')


def gen2(n):
    for i in range(n):
        yield
        time.sleep(DELAY)
        print(f'=== line #{i} from {n} is completed')


@time_counter
def main():
    g1 = gen1(N)
    g2 = gen2(N)
    g1_not_exhausted = True
    g2_not_exhausted = True

    while g1_not_exhausted or g2_not_exhausted:
        if g1_not_exhausted:
            try:
                next(g1)
            except StopIteration:
                print('the generator 1 is exhausted')
                g1_not_exhausted = False

        if g2_not_exhausted:
            try:
                next(g2)
            except StopIteration:
                print('the generator 2 is exhausted')
                g2_not_exhausted = False


if __name__ == '__main__':
   main()

Now, this code even more closely resembles the previous example with threads, as the modified functions func1() and func2() (transformed into generators gen1() and gen2()) are effectively executed in parallel. However, there is one caveat: each function still contains a blocking delay of 2 seconds. To solve this problem, we can utilize the asyncio package.

But before we dive into writing our first asynchronous script using this package, we need to familiarize ourselves with its fundamental components: Coroutines, Tasks, and the Event Loop.

You can learn more about all the details of this topic from this video (Russian Voice):



To the next topic




Read more >>

Threading, Multiprocessing and Asyncio in Python (Part 4)

Contents at a Glance

  1. Introduction to Asynchrony
    1. What is "asynchronous code"?
    2. Comparison of three ways to get asynchronous code
  2. Threading
    1. Threading. Creation and handling
    2. Threading. Synchronization primitives: Lock
    3. Threading. Synchronization primitives: Lock (continuation), Event, Condition, Semaphore, Queue 🔒
    4. Web Scraping by Threading 🔒
  3. Multiprocessing 🔒
    1. Multiprocessing. Creation and handling 🔒
    2. Multiprocessing: Synchronization primitives 🔒
  4. Asyncio Package
    1. Generator as an asynchronous function
    2. Coroutines, Tasks and Event Loop
    3. From Generators to Coroutines and Tasks 🔒
    4. Web scraping by aiohttp Package 🔒
    5. Handling files by aiofiles Package 🔒
    6. Asyncio Synchronization Primitives 🔒
  5. Additional Packages and Methods for Creating Asynchronous Code 🔒
    1. Subprocess Package 🔒
    2. Concurrent.futures Package 🔒
    3. Sockets - timeout() Method and select Package 🔒
    4. curio and trio Packages 🔒

2. Threading
2.2 Threading. Synchronization primitives: Lock

The threads we have used so far did not interact with each other or the main thread. All they did was simply print their own results.

However, in practice, fully autonomous threads are more of an exception than the rule. More often, threads need to exchange data with each other or collectively use (modify) data that is in the main thread. In this case, there is an objective need to synchronize the actions of these threads.

It is particularly important to note the following: the use of synchronization primitives itself does not make asynchronous code synchronous (unless, of course, we are talking about programmer errors 😉). Synchronization primitives only synchronize individual threads (or individual processes in the case of the multiprocessing package, or individual coroutines in the case of the asyncio package) but by no means turn asynchronous code into synchronous!

Let's consider the simplest example of such interaction - simultaneous shared access of multiple threads to a single variable from the main thread.

As seen from the following example, multiple threads increment the value of the shared variable val within a loop:

from threading import Thread
from my_deco import time_counter

val = 0
COUNT = 100
NUM_THREADS = 100

def increase():
   global val
   for _ in range(COUNT):
       val += 1

@time_counter
def main():
   threads = [Thread(target=increase) for _ in range(NUM_THREADS)]

   for thread in threads:
       thread.start()
   for thread in threads:
       thread.join()


   diff = val - COUNT * NUM_THREADS
   print(f'diff = {diff}')

if __name__ == '__main__':
   main()

If it were not for threads, this construction could be considered as two nested loops:

  • the loop inside the thread as the inner loop,
  • and the threads themselves as the outer loop.
Based on this, the final value of the variable val should be equal to the product of the number of iterations of the two loops, i.e., the number of threads multiplied by the number of inner loops (in our case, it would be 100 * 100 = 10,000).

This would indeed be the case if the += operation were thread-safe. However, in reality, this is far from true.

First and foremost, it is important to note that a single line of code actually represents 4 sequential actions:

  1. Retrieving the current value of the variable val.
  2. Retrieving the value of the increment (in our case, it is 1).
  3. Adding the two numbers together (val + 1).
  4. Writing the result as the new value of the variable val.

Therefore, there is a non-zero probability that between steps 1 and 4, another thread may interleave with a different value of val. As a result, when overwriting this variable in step 4, the value that was increased in one of these threads will be lost. This phenomenon is known as a 'race condition.'

In our example, it is quite difficult to observe this effect since the values of the initial variables, namely the number of threads COUNT, the number of iterations NUM_THREADS, and the thread-switching interval, are not sufficient for the consistent manifestation of this effect.

By the way, the default thread-switching interval can be obtained using the getswitchinterval() method from the well-known sys package.

import sys


interval = sys.getswitchinterval()
print(f'switchinterval = {interval}')


# switchinterval = 0.005

We can modify the value of the thread-switching interval using the sys.setswitchinterval(new_interval) method, but unfortunately, we cannot decrease it to the level where the race condition effect will manifest. However, we can programmatically modify our code to slow down the increment of the val value. To achieve this, we will separate

  • the calculation of the new value of the variable val
  • and the replacement of the old value with the new one.
To make it more convincing, we will add a delay of 0.001 seconds between these two calculations:

import time
from threading import Thread
from my_deco import time_counter

val = 0
COUNT = 100
NUM_THREADS = 100

def increase():
   global val
   for _ in range(COUNT):
       new_val = val + 1
       time.sleep(0.001)
       val = new_val


@time_counter
def main():
   threads = [Thread(target=increase) for _ in range(NUM_THREADS)]

   for thread in threads:
       thread.start()
   for thread in threads:
       thread.join()

   diff = val - COUNT * NUM_THREADS
   print(f'diff = {diff}')


if __name__ == '__main__':
   main()

In this case, the difference diff will be significantly different from zero.

Thus, the "thread-UNsafe" access to shared variables is proven. How can we fix this situation?

For this purpose, Python provides so-called synchronization primitives to address these issues.

Perhaps the simplest, basic, and most commonly used synchronization primitive is the Lock() object, which operates according to the following algorithm:

  1. Before allowing a thread to start modifying data, it checks if another thread has already started this modification.
  2. If modifications have already been initiated by another thread, the current thread is put into a queue.
  3. When the queue reaches the waiting thread, it gains access to modify the data, while simultaneously preventing any other thread from making changes - using the acquire() method.
  4. After completing the modifications, the current thread releases the lock, and the right to make changes is passed to the next thread in the queue - using the release() method.

import time
from threading import Thread, Lock
from my_deco import time_counter

COUNT = 100
NUM_THREADS = 100
val = 0
val_lock = Lock()


def increase():
   global val
   for _ in range(COUNT):
       val_lock.acquire()
       new_val = val + 1
       time.sleep(0.001)
       val = new_val
       val_lock.release()


@time_counter
def main():
   threads = [Thread(target=increase) for _ in range(NUM_THREADS)]


   for thread in threads:
       thread.start()
   for thread in threads:
       thread.join()

   diff = val - COUNT * NUM_THREADS
   print(f'diff = {diff}')


if __name__ == '__main__':
   main()

As we can see, the application of the Lock object has produced the expected results.

(By the way, sometimes this type of lock is referred to as a Mutex - a synchronization primitive used to protect shared resources from simultaneous access by multiple threads. It represents a lock that a thread can hold or release. Only one thread can hold the mutex at any given time, and all other threads attempting to acquire the mutex will be blocked until it is released.)

There is also a more convenient and compact way of using the Lock object, which avoids the explicit use of the acquire() and release() methods mentioned earlier. You will be introduced to this method, along with other synchronization primitives and their usage examples, in the advanced version of this course

You can learn more about all the details of this topic from this video (Russian Voice):



To the next topic




Read more >>

Threading, Multiprocessing and Asyncio in Python (Part 3)

Contents at a Glance

  1. Introduction to Asynchrony
    1. What is "asynchronous code"?
    2. Comparison of three ways to get asynchronous code
  2. Threading
    1. Threading. Creation and handling
    2. Threading. Synchronization primitives: Lock
    3. Threading. Synchronization primitives: Lock (continuation), Event, Condition, Semaphore, Queue 🔒
    4. Web Scraping by Threading 🔒
  3. Multiprocessing 🔒
    1. Multiprocessing. Creation and handling 🔒
    2. Multiprocessing: Synchronization primitives 🔒
  4. Asyncio Package
    1. Generator as an asynchronous function
    2. Coroutines, Tasks and Event Loop
    3. From Generators to Coroutines and Tasks 🔒
    4. Web scraping by aiohttp Package 🔒
    5. Handling files by aiofiles Package 🔒
    6. Asyncio Synchronization Primitives 🔒
  5. Additional Packages and Methods for Creating Asynchronous Code 🔒
    1. Subprocess Package 🔒
    2. Concurrent.futures Package 🔒
    3. Sockets - timeout() Method and select Package 🔒
    4. curio and trio Packages 🔒

2. Threading
2.1 Threading. Creation and handling

The threading package is part of the Python standard library starting from version 1.5 (November 1997), so it does not require any additional installation. The minimum requirements for starting a thread are as follows:

  1. Import the threading package.
  2. Write the code that should be executed in the thread, typically in the form of a function.
  3. Create (instantiate) a thread, specifying the target function's name as a parameter. If the function requires arguments, they can be passed using the args parameter in tuple format.
  4. Start the thread using the start() method.

import time
from threading import Thread

def clock(delay):
   time.sleep(delay)
   print(f"Current time: {time.strftime('%X')}, delay {delay} sec.")


thread1 = Thread(target=clock, args=(2,))
thread2 = Thread(target=clock, args=(3,))


if __name__ == '__main__':
   start = time.time()
   print(f"Time start: {time.strftime('%X')}")
   thread1.start()
   thread2.start()
   print(f"Time end: {time.strftime('%X')}")
   print(f'======== Total time: {time.time() - start:0.2f} ========')

Result:

Time start: 07:39:58
Time end: 07:39:58
======== Total time: 0.00 ========
Current time: 07:40:00, delay 2 sec.
Current time: 07:40:01, delay 3 sec.


Process finished with exit code

As seen in the example, the main thread finished instantly (0 sec), while the additional threads finished after 2 and 3 seconds from the script's start time, respectively.

In our case, the additional threads continue their work without waiting for the main thread or the overall process to finish. This behavior is logical because the processor operates faster than external devices. Therefore, before terminating the script, it is necessary to wait for a signal indicating the proper completion of the external devices' operations. Otherwise, there is a risk of losing data that remains in the buffer of these devices. By default, if no specific actions are taken, the main process will wait for all its threads to complete before terminating.

In cases where it is necessary to forcefully terminate a thread simultaneously with the termination of the entire process, a special parameter called daemon is set to True. Please note that there are two ways to do this:

import time
from threading import Thread


def clock(delay):
   time.sleep(delay)
   print(f"Current time: {time.strftime('%X')}, delay {delay} sec.")


thread1 = Thread(target=clock, args=(2,))
thread2 = Thread(target=clock, args=(3,), daemon=True)


if __name__ == '__main__':
   start = time.time()
   print(f"Time start: {time.strftime('%X')}")
   thread1.start()
   thread2.daemon = True
   thread2.start()
   print(f"Time end: {time.strftime('%X')}")
   print(f'======== Total time: {time.time() - start:0.2f} ========')

As we can see, the daemon flag of the second thread was set to True twice. Of course, setting it once would have been enough - the second usage was only for demonstrating an alternative way of setting the flag. The result was that the second thread didn't have enough time to complete its task in one second, so it was terminated prematurely.

Time start: 07:54:41
Time end: 07:54:41
======== Total time: 0.00 ========
Current time: 07:54:43, delay 2 sec.


Process finished with exit code 0

(By the way, you can check if a thread is a daemon using the isDaemon() method. If it is a daemon, the method will return True.)

To find out when exactly the second thread stopped its execution, let's modify the clock function slightly. We will turn its body into a loop with a 1-second delay and print the result each time. In this case, the overall delay will be determined by the number of iterations in the loop.

def clock(delay: int):
   for d in range(1, delay + 1):
       time.sleep(1)
       print(f"Current time: {time.strftime('%X')}, {d}; delay {delay} sec.")

The result shows that the last data from the second thread ends with a 2-second delay. It didn't have time to print the result of its work in the last third second because it was terminated along with the main process in the 2nd second.

Time start: 17:20:42
Time end: 17:20:42
======== Total time: 0.00 ========
Current time: 17:20:43, 1; delay 2 sec.
Current time: 17:20:43, 1; delay 3 sec.
Current time: 17:20:44, 2; delay 2 sec.
Current time: 17:20:44, 2; delay 3 sec.


Process finished with exit code 0

If we also set daemon=True for the second thread, the overall process will terminate instantly, and both threads will leave absolutely no trace of their activity.

Time start: 17:29:27
Time end: 17:29:27
======== Total time: 0.00 ========


Process finished with exit code 0

Thus, we can conclude that the overall duration of the process is determined by the duration of the "longest" non-daemon thread. In the case of a daemon thread, it is determined by the duration of the main thread.

Combining the main and auxiliary threads

Very often, the results of auxiliary threads need to be used in the main thread. In such cases, we can use the join() method, which suspends the execution of the main thread until the joined thread is finished.

if __name__ == '__main__':
   start = time.time()
   print(f"Time start: {time.strftime('%X')}")
   thread1.start()
   thread1.join()
   thread2.start()
   print(f"Time end: {time.strftime('%X')}")
   print(f'======== Total time: {time.time() - start:0.2f} ========')

That's all we changed in the previous script - we simply joined the first thread to the main thread immediately after its start. This led to the following result:

Time start: 17:53:40
Current time: 17:53:41, 1; delay 2 sec.
Current time: 17:53:42, 2; delay 2 sec.
Time end: 17:53:42
======== Total time: 2.00 ========


Process finished with exit code 0

What is noteworthy here is that the duration of the main thread increased to 2 seconds, exactly the time it took for the first thread to complete. By waiting for the first thread to finish, the main thread started the second thread and immediately terminated, halting the entire process. As a result, the second thread also terminated without being able to do any work.

But if we first start both threads and then join the first thread to the main thread,

thread1.start()
thread2.start()
thread1.join()

then the picture will change:

Time start: 18:05:02
Current time: 18:05:03, 1; delay 2 sec.
Current time: 18:05:03, 1; delay 3 sec.
Current time: 18:05:04, 2; delay 2 sec.
Current time: 18:05:04, 2; delay 3 sec.
Time end: 18:05:04
======== Total time: 2.00 ========


Process finished with exit code 0

Key takeaway: First starting all the threads and then joining one of the threads to the main thread allows for the execution of all the started threads. The threads whose execution time is less than or equal to the execution time of the joined thread will complete their execution. All other threads will only work while the joined thread is active (thread2 in our case).

Therefore, if we join the longest-running thread to the main thread, it will enable all the threads to complete their execution. The overall process time in our case will be determined by the execution time of this thread (in our case - 3 seconds).

Time start: 18:14:17
Current time: 18:14:18, 1; delay 2 sec.
Current time: 18:14:18, 1; delay 3 sec.
Current time: 18:14:19, 2; delay 2 sec.
Current time: 18:14:19, 2; delay 3 sec.
Current time: 18:14:20, 3; delay 3 sec.
Time end: 18:14:20
======== Total time: 3.00 ========


Process finished with exit code 0

Now, a small task - try to answer the question yourself: What will happen to the overall execution time in this case: In this case? And in this case?

thread1.start()
thread2.start()
thread1.join()
thread2.join()

, in this case:

thread1.start()
thread2.start()
thread2.join()
thread1.join()

, and in this case?

thread1.start()
thread1.join()
thread2.start()
thread2.join()

And most importantly, try to explain why the overall execution time changed in this particular way and not otherwise.

Refactoring the example with two functions

The code example with two functions that we examined at the very beginning aimed to achieve maximum concurrency. We sent potentially blocking functions to their own threads, where they could no longer interfere with the main thread. This is certainly a huge advantage. However, there is a small drawback - the timing measurement decorator remained in the main thread (in the main() function), and now we don't know how much faster (or slower) the code became after adding threads. Let's join the additional threads to the main thread and see how the execution time has changed.

import time
from threading import Thread
from my_deco import time_counter

N = 5
DELAY = 0.5


def func1(n):
   for i in range(n):
       time.sleep(DELAY)
       print(f'--- line #{i} from {n} is completed')


def func2(n):
   for i in range(n):
       time.sleep(DELAY)
       print(f'=== line #{i} from {n} is completed')


@time_counter
def main():
   thread1 = Thread(target=func1, args=(N,))
   thread2 = Thread(target=func2, args=(N,))
   thread1.start()
   thread2.start()
   thread1.join()
   thread2.join()
   print(f'All functions completed')


if __name__ == '__main__':
   main()
======== Script started ========
--- line #0 from 5 is completed
=== line #0 from 5 is completed
--- line #1 from 5 is completed
=== line #1 from 5 is completed
. . . . . . . . .


All functions completed
======== Script execution time: 2.51 ========

In summary, 2.5 seconds is twice as fast as regular synchronous code. This can be easily explained: now both functions are executed simultaneously, so the overall script execution time is equal to the time taken by each function.

Is there a way to further reduce this time? Most likely, yes. But first, it would be good to understand the reason for the delay.

The reason is that due to the loop inside the function, each subsequent iteration cannot start before the previous one finishes. In other words, each time, we have to wait for the DELAY seconds of delay from the previous iteration to start the next one.

The solution is obvious: we need to keep only one iteration in the function and move the loop to the main function. Then we can create a number of threads equal to the number of iterations of the two functions. And from previous examples, we already know that if we start all these threads simultaneously, the overall execution time will be equal to the time taken by the longest thread.

Well, as they say, done and done:

import time
from threading import Thread
from my_deco import time_counter

N = 5
DELAY = 0.5


def func1(i, n):
   time.sleep(DELAY)
   print(f'--- line #{i} from {n} is completed')


def func2(i, n):
   time.sleep(DELAY)
   print(f'=== line #{i} from {n} is completed')


@time_counter
def main():
   threads = []
   threads1 = [Thread(target=func1, args=(i, N)) for i in range(N)]
   threads2 = [Thread(target=func2, args=(i, N)) for i in range(N)]
   threads.extend(threads1)
   threads.extend(threads2)

   for thread in threads:
       thread.start()
   for thread in threads:
       thread.join()

   print(f'All functions completed')


if __name__ == '__main__':
   main()

And the script running time has now been reduced to the running time of the longest thread:

======== Script started ========
--- line #0 from 5 is completed
=== line #0 from 5 is completed
--- line #1 from 5 is completed
=== line #1 from 5 is completed
. . . . . . . . .


All functions completed
======== Script execution time: 0.51 ========

Creating a Thread Using a Class

In the previous examples, functions were used to create and manage threads. However, the same can be achieved using classes.

In this case:

  1. The class that describes the thread should inherit from the Thread class.
  2. The behavior of the thread is described (or rather, overridden) using the run() method.
  3. All additional parameters, including the daemon flag, are passed using class attributes in the __init__() dunder method.

In the following example, two daemon threads are created. Thanks to the fact that the longest-running thread is joined to the main thread, both threads are able to finish:

import time
from threading import Thread


class ClockThread(Thread):
   def __init__(self, delay):
       super().__init__()
       self.delay = delay
       self.daemon = True


   def run(self):
       time.sleep(self.delay)
       print(f"Current time: {time.strftime('%X')}, delay {self.delay} sec.")


thread_1 = ClockThread(2)
thread_2 = ClockThread(3)


if __name__ == '__main__':
   start = time.time()
   print(f"Time start: {time.strftime('%X')}")
   thread_1.start()
   thread_2.start()
   thread_2.join()
   print(f"Time end: {time.strftime('%X')}")
   print(f'======== Total time: {time.time() - start:0.2f} ========')

The script, as expected, runs for 3 seconds:

Current time: 00:33:01, delay 3 sec.
Time end: 00:33:01
======== Total time: 3.00 ========

Process finished with exit code 0

As we can see, describing threads using classes didn't bring any significant changes. The management of thread objects is still done through the start() and join() methods, and the daemon parameter (attribute) can be specified directly in the __init__() method or later defined (overridden) in an instance of the user-defined class.

You can learn more about all the details of this topic from this video (Russian Voice):



To the next topic




Read more >>

Threading, Multiprocessing and Asyncio in Python (Part 2)

Contents at a Glance

  1. Introduction to Asynchrony
    1. What is "asynchronous code"?
    2. Comparison of three ways to get asynchronous code
  2. Threading
    1. Threading. Creation and handling
    2. Threading. Synchronization primitives: Lock
    3. Threading. Synchronization primitives: Lock (continuation), Event, Condition, Semaphore, Queue 🔒
    4. Web Scraping by Threading 🔒
  3. Multiprocessing 🔒
    1. Multiprocessing. Creation and handling 🔒
    2. Multiprocessing: Synchronization primitives 🔒
  4. Asyncio Package
    1. Generator as an asynchronous function
    2. Coroutines, Tasks and Event Loop
    3. From Generators to Coroutines and Tasks 🔒
    4. Web scraping by aiohttp Package 🔒
    5. Handling files by aiofiles Package 🔒
    6. Asyncio Synchronization Primitives 🔒
  5. Additional Packages and Methods for Creating Asynchronous Code 🔒
    1. Subprocess Package 🔒
    2. Concurrent.futures Package 🔒
    3. Sockets - timeout() Method and select Package 🔒
    4. curio and trio Packages 🔒

1. Introduction to Asynchrony
1.2 Comparison of three ways to get asynchronous code

The easiest way to understand the working principle of all three methods of achieving asynchronous code is through the following diagram:

All three methods listed (threads, multiprocessing, and the asyncio package) essentially accomplish the same thing: they allow the execution of the main program to run in parallel mode (represented by the green and blue lines). In other words, certain (often "problematic") sections of code start executing as if simultaneously and independently from each other. If one or even several branches of this parallel process take a long time or stop altogether, it won't affect the main program - it will continue working in its regular mode.

Please note that the term "as if" is used here intentionally - parallel computations are not always truly parallel. Sometimes, the switch between different branches of the process happens so quickly that they appear parallel to an external observer. It's similar to how 24 static frames displayed within one second create the illusion of continuous motion on a movie screen.

Indeed, this is the fundamental difference between the multiprocessing package, where computations truly occur in parallel, and the other two packages (threading and asyncio), where the effect of parallelism is achieved through fast switching between several "independent" parts of the program within a single process.

The technology of real parallel computation is called parallelism, while the technology of simulating parallel computations through fast switching is referred to as concurrency.

Parallel computations, in terms of resource usage, are not cheap because they involve multiple (or even all!) processor cores, additional RAM, and so on. Therefore, such a solution is justified only in the case of complex computational tasks where continuous and uninterrupted CPU processing is necessary (CPU-bound tasks).

In practice, however, we often deal with relatively slow processes such as database queries, network interactions, and so on. These are known as IO-bound tasks, in which the processor, while sending a request to a relatively slow external resource, is forced to idle and wait for a response.

In this case, it makes perfect sense to utilize the idle time of the processor for something more useful. For this purpose, two other technologies are used (threading and asyncio), where the waiting time for a "sleeping task" is used to perform other tasks.

It is important to note that machine resources are used more efficiently in this case - we no longer create new processes but instead use resources within the context of a single process.

Here, it is worth highlighting the fundamental technological difference between the threading and asyncio packages.

In the case of threads (using the threading package), the Python interpreter relies on the operating system (OS) for additional assistance. When creating a new thread, it essentially tells the OS, "Now I need the main thread's task to be executed simultaneously with the task of the new thread until this new thread finishes." In this case, the OS switches back and forth between the two tasks at strictly equal time intervals. The switching takes only fractions of a second, so from an external observer's perspective, both tasks appear to be executed in parallel and simultaneously.

The advantage of this method is evident: the Python code itself for each thread remains completely unchanged (referring to the function passed as the target parameter to the thread). The thread itself is simply an instance of the Thread class, and its control is managed using the start() and join() methods (from the language's syntax perspective, there is nothing fundamentally new!).

There are indeed some drawbacks to using threads:

  • Thread-related data needs to be stored, which requires additional memory resources.
  • The context switching between threads during data reading/writing also takes time. The more threads are involved, the more noticeable this becomes.
  • Thread management is handled by the operating system, not the Python interpreter. Therefore, thread switching occurs based on the OS's scheduling algorithm, which may not always be optimal in terms of prioritizing the execution of specific threads.

These factors can impact the overall performance and efficiency of threaded code.

All the aforementioned drawbacks are absent in the asyncio package. Here, only one thread is used within a single process, of course. Everything would be fine if it weren't for one significant drawback: applying this method requires its own separate and fundamentally new code, which differs significantly from the familiar syntax of the Python language.

However, judge for yourself - here, for example, is what the solution to the previous task would look like using the asyncio package:

import time
import asyncio
from my_deco import async_time_counter


N = 5
DELAY = 0.5




async def func1(n):
   for i in range(n):
       await asyncio.sleep(DELAY)
       print(f'--- line #{i} from {n} is completed')




async def func2(n):
   for i in range(n):
       await asyncio.sleep(DELAY)
       print(f'=== line #{i} from {n} is completed')




@async_time_counter
async def main():
   print(f'All functions completed')




async def run():
   task0 = asyncio.create_task(main())
   task1 = asyncio.create_task(func1(N))
   task2 = asyncio.create_task(func2(N))
   await task0
   await task1
   await task2


if __name__ == '__main__':
   asyncio.run(run())

The result of executing this script will be exactly the same as in the previous example with threads: control is immediately passed back to the main program main(). And since the code in main() only contains one print statement, this function completes almost instantly, and the result of the two other functions' actions becomes visible after the main() program has finished:

======== Script started ========
All functions completed
======== Script execution time: 0.00 ========
--- line #0 from 5 is completed
=== line #0 from 5 is completed
--- line #1 from 5 is completed
=== line #1 from 5 is completed
. . . . . . . . . . . . . . . .
Process finished with exit code 0

It is reasonable for beginners who have just learned the basics of Python to ask, "In which language is this code written?" In fact, there is nothing surprising about this question because:

  1. Function definitions use the new async keyword.
  2. Inside these functions, an unfamiliar operator await is used.
  3. Strictly speaking, the use of these two operators transforms the functions into a completely different entity in the Python language - they are no longer regular functions but coroutines.
  4. Instead of the familiar time delay using time.sleep(DELEY), its "asynchronous" counterpart asyncio.sleep(DELEY) is used, which not only introduces a delay but also includes a control element that switches execution from the current function (or should we say, current coroutine) to another.
  5. The previous decorator @time_counter also cannot work here because the coroutine main() cannot be simply invoked with parentheses like the function main(). This peculiarity needs to be taken into account when defining a new decorator, @async_time_counter.
  6. Finally, running this code using the regular approach is no longer possible - it requires a special construct like asyncio.run().

As a result, it turns out that this approach also has its own downsides, and quite significant ones at that.

Thus, a brief overview of the three methods (technologies) for creating asynchronous code has shown that none of the discussed options has universal advantages over the others. Each has its own merits and drawbacks. Therefore, all three methods have prospects for further development, improvement, and practical usage.

Hence, the answer to the question "Which option to choose?" is surprisingly straightforward: the one that best suits the specific requirements of your current task. Of course, it is crucial to have an equally good command of each of the listed technologies to be able to make the optimal choice at the right moment, rather than simply relying on familiarity.

It's natural to wonder, "Are there any other ways to make code asynchronous besides the three mentioned in the course title?"

The answer is, undoubtedly, yes.

The subprocess package allows for the creation of additional processes in which various programs can be executed, including Python code.

The concurrent.futures package provides a convenient interface for asynchronous tasks and parallel computing. It abstracts away the details of thread or process creation and management, making it more preferable in simple scenarios where ease of use is important and direct control over threads or processes is not required. However, for more complex scenarios or lower-level control, the threading and multiprocessing modules can provide greater flexibility.

In addition to the packages included in the Python standard library, there are other well-known packages that are not part of it. For example, the packages curio
( https://curio.readthedocs.io/ ) and trio
( https://trio.readthedocs.io/ ) are used for working with coroutines.

The examples mentioned above can be classified as universal packages capable of making almost any synchronous code asynchronous. In addition to these, there are also specialized packages that enable achieving asynchronicity for specific programs and applications. For instance, the select package is used to facilitate asynchronous socket operations (via the socket package).

Furthermore, within the socket package itself, there are separate "asynchronous" methods and functions that are part of the regular "synchronous" package.

Certainly, our course focuses on the three foundational "pillars" of asynchronicity mentioned in the title. They form the basis and foundation of this programming approach. However, the topic of asynchronicity in Python would not be fully explored without at least a brief overview of some additional packages and methods mentioned earlier. This will be the subject of the final, fifth lesson of this course.

So, let's proceed to a detailed study of the three main packages listed in the course title. We will begin this exploration with threads (the threading package).

You can learn more about all the details of this topic from this video (Russian Voice):



To the next topic




Read more >>

Threading, Multiprocessing and Asyncio in Python

Contents at a Glance

  1. Introduction to Asynchrony
    1. What is "asynchronous code"?
    2. Comparison of three ways to get asynchronous code
  2. Threading
    1. Threading. Creation and handling
    2. Threading. Synchronization primitives: Lock
    3. Threading. Synchronization primitives: Lock (continuation), Event, Condition, Semaphore, Queue 🔒
    4. Web Scraping by Threading 🔒
  3. Multiprocessing 🔒
    1. Multiprocessing. Creation and handling 🔒
    2. Multiprocessing: Synchronization primitives 🔒
  4. Asyncio Package
    1. Generator as an asynchronous function
    2. Coroutines, Tasks and Event Loop
    3. From Generators to Coroutines and Tasks 🔒
    4. Web scraping by aiohttp Package 🔒
    5. Handling files by aiofiles Package 🔒
    6. Asyncio Synchronization Primitives 🔒
  5. Additional Packages and Methods for Creating Asynchronous Code 🔒
    1. Subprocess Package 🔒
    2. Concurrent.futures Package 🔒
    3. Sockets - timeout() Method and select Package 🔒
    4. curio and trio Packages 🔒

1. Introduction to Asynchrony
1.1 What is "asynchronous code"?

First of all, let's try to clarify the terminology and understand what is behind the terms synchronous and asynchronous code. And also let's try to figure out what is so bad in synchronous code that everyone is so persistently trying to turn it into asynchronous?

Synchronous code is code in which all instructions are executed strictly sequentially, line by line, and executing the next line of code is possible only if the previous one is completely executed.

The main problem of synchronous code is the requirement to not execute the next instruction until the previous one is completed. This poses a significant challenge for programs that interact with the outside world or other programs during execution since the execution of an instruction may unexpectedly require much more time than usual.

To prevent this from happening, the software code should be mindful of what is happening around it. If the next instruction has the potential to slow down the execution of the main program, it should be parallelized with the execution of other faster instructions or postponed altogether until a more opportune time.

In other words, the task of asynchronous programming is to replace the "mindless" sequential execution of instructions (synchronous) with a "meaningful" change in the order of execution based on the completion time of different instructions (asynchronous).

It is important to emphasize here that it is not necessary to incorporate complex algorithms for estimating the execution time of subsequent instructions into asynchronous code. In the vast majority of cases, it is sufficient to parallelize the execution of problematic sections, move them into a background mode, so that they do not hinder the fast execution of the main program.

Now, (hopefully!) having gained some understanding of asynchrony, it's time to provide a strictly scientific definition to this phenomenon. Fortunately, there are numerous definitions available on the internet, each more obscure than the next 😉. Personally, I found the definition of asynchrony on the Mozilla.org developer website quite appealing:

“Asynchronous programming is a technique that enables your program to start a potentially long-running task and still be able to be responsive to other events while that task runs, rather than having to wait until that task has finished.”

https://developer.mozilla.org/en-US/docs/Learn/JavaScript/Asynchronous/Introducing

Thus, asynchrony is what prevents your program from getting "stuck" even when it reaches blocking (or "very long") sections of code, as these code sections will be executed concurrently (or almost concurrently) with the main program.

This is precisely why packages capable of transforming synchronous code into asynchronous code have gained incredible popularity.

Well, at this point, it's probably a good time to go through an example that allows for an even better understanding and consolidation of everything mentioned above.

Let's assume we have two functions (func1() and func2()) that need to be executed sequentially:

import time
N = 5
DELAY = 0.5


def func1(n):
   for i in range(n):
       time.sleep(DELAY)
       print(f'--- line #{i} from {n} is completed')


def func2(n):
   for i in range(n):
       time.sleep(DELAY)
       print(f'=== line #{i} from {n} is completed')


def main():
   func1(N)
   func2(N)


if __name__ == '__main__':
   start = time.time()
   main()
   print(f'Total time: {time.time() - start}')

The main() function will be the main control function here and onwards, while the functions func1() and func2() are called sequentially within it. Additionally, the total execution time of the main() function, which obviously equals the execution time of the entire script, will be calculated.

In this classic example of synchronous code, it is evident that the control flow will only be passed to the second function (func2()), after the completion of the first function (func1()). It's also fortunate that in our example, the values of the repetition count (N = 5) and the delay time (DELAY = 0.5 seconds) are relatively small, allowing the program to complete within a short time of 5 seconds. But what if these parameters have multiple zeroes at the end? In that case, the execution of func2() may not even be waited for, let alone the appearance of the final completion message for all functions.

It seems that without an asynchronous solution to this problem, someday, in a not-so-pleasant moment, we might find ourselves in a very difficult situation. Therefore, let's try applying one of the three techniques mentioned in the course title, such as threads.

A more detailed explanation of how this and the following examples work will be provided a little later. For now, let's simply enjoy the beauty and ease of transforming synchronous code into asynchronous.

But first, let's add a useful detail to our code, namely a decorator that calculates the runtime of our program. Since all our future tasks will be evaluated in terms of code execution time in one way or another, it makes sense to optimize this calculation from the very beginning. Moreover, it doesn't require knowledge of asynchronous programming methodologies. Our usual "synchronous" knowledge will be sufficient for this purpose.

Many of you probably remember from the basics of the Python language that it is more logical to extract repetitive code within a function and place it separately as a decorator. For those who are not familiar with this concept or may have forgotten, I recommend watching these two videos that cover all four variations of creating decorators:

  • Simplest function decorator for a function (Russian voice): https://youtu.be/394ZfiPJQ38
  • More advanced decorator variations (Russian voice: https://youtu.be/2szgmbn3cYM
So, after the entry point, the code will pass into a decorator placed in a new module within the working directory named my_deco.py:

def time_counter(func):
   @functools.wraps(func)
   def wrap():
       start = time.time()
       print("======== Script started ========")
       func()
       print(f"Time end: {time.strftime('%X')}")
       print(f'======== Script execution time: {time.time() - start:0.2f} ========')


   return wrap

And the previous script is supplemented by importing the new decorator and adding it to the main() function:

import time
from my_deco import time_counter

N = 5
DELAY = 0.5


def func1(n):
   for i in range(n):
       time.sleep(DELAY)
       print(f'--- line #{i} from {n} is completed')


def func2(n):
   for i in range(n):
       time.sleep(DELAY)
       print(f'=== line #{i} from {n} is completed')


@time_counter
def main():
   func1(N)
   func2(N)
   print(f'All functions completed')


if __name__ == '__main__':
   main()

Well, now we can add threads. To do that, we need to import them first:

from threading import Thread

And slightly modify the main() function:

@time_counter
def main():
   thread1 = Thread(target=func1, args=(N,))
   thread2 = Thread(target=func2, args=(N,))
   thread1.start()
   thread2.start()
   print(f'All functions completed')

Indeed, the code transformation is minimal, but the result is remarkable!

======== Script started ========
All functions completed
======== Script execution time: 0.01 ========
--- line #0 from 5 is completed
=== line #0 from 5 is completed
--- line #1 from 5 is completed
=== line #1 from 5 is completed
. . . . . . . . . . . . . . . .
Process finished with exit code 0

Please note that the line indicating the end of the program execution (======== Script execution time: 0.01 ========), as well as the message indicating the completion of all functions (All functions completed), appear before the information generated by the functions themselves. This confirms that the functions func1() and func2(), which had the potential to block the code, are no longer blocking. Threads allow us to easily "jump over" them and pass control to the code that follows. Consequently, our synchronous code has been transformed into asynchronous code, and its execution time has been reduced from 5 seconds (or even infinity!) to 0.01 seconds.

In conclusion, let's summarize a few observations that will be useful as we further explore threads:

  • The objects thread1 and thread2 represent two threads in which our functions are executed, passed as the target parameter. The corresponding arguments are also passed to these functions using the args parameter. Note that the arguments themselves are passed in tuple format.
  • Creating a thread is similar to defining a function with the def statement. For example, the statement def func means that the function func is only declared but not executed. To execute it, a separate line is required where the function name is invoked with parentheses: func().
  • Similar to function invocation, starting a thread requires a similar approach. Instead of simple parentheses, we use the start() method appended to the thread object.

These observations will be helpful as we delve deeper into the study of threads.

Indeed, threads provide one method to address the issue of code synchronicity. It is evident that such a powerful tool requires further and deeper exploration, which we will undertake in the subsequent lessons.

However, as suggested by the course title, there are at least two more mechanisms or methods to achieve asynchronicity. Does it make sense to divert our attention to learning something else when threads have already allowed us to achieve impressive results?

To find an answer to this question, let's explore the next topic (article).

You can learn more about all the details of this topic from this video (Russian Voice):



To the next topic




Read more >>

Tags list

    Apps Script      Arrays Java Script      asynchronous code      asyncio      coroutine      Django      Dropdown List      Drop Shipping      Exceptions      GitHub      Google API      Google Apps Script      Google Docs      Google Drive      Google Sheets      multiprocessing      Parsing      Python      regex      Scraping      ssh      Test Driven Development (TDD)      threading      website monitoring      zip