Blog

Threading, Multiprocessing and Asyncio in Python (Part 5)

Contents at a Glance

  1. Introduction to Asynchrony
    1. What is "asynchronous code"?
    2. Comparison of three ways to get asynchronous code
  2. Threading
    1. Threading. Creation and handling
    2. Threading. Synchronization primitives: Lock
    3. Threading. Synchronization primitives: Lock (continuation), Event, Condition, Semaphore, Queue 🔒
    4. Web Scraping by Threading 🔒
  3. Multiprocessing 🔒
    1. Multiprocessing. Creation and handling 🔒
    2. Multiprocessing: Synchronization primitives 🔒
  4. Asyncio Package
    1. Generator as an asynchronous function
    2. Coroutines, Tasks and Event Loop
    3. From Generators to Coroutines and Tasks 🔒
    4. Web scraping by aiohttp Package 🔒
    5. Handling files by aiofiles Package 🔒
    6. Asyncio Synchronization Primitives 🔒
  5. Additional Packages and Methods for Creating Asynchronous Code 🔒
    1. Subprocess Package 🔒
    2. Concurrent.futures Package 🔒
    3. Sockets - timeout() Method and select Package 🔒
    4. curio and trio Packages 🔒

4. Asyncio Package
4.1 Generator as an asynchronous function

And finally, we come to the third way of creating asynchronous code, where all the program code is contained within not only the same process but also the same thread (see the diagram in the introduction).

In the previous two cases (threading and multiprocessing packages), there were no specific requirements for the source code. To turn this code into asynchronous, we simply took a blocking (or "slow" function) and placed it in a separate thread or process. And we did this without any changes to the original function, as we placed these functions in a separate process or thread managed by the operating system.

However, when we attempt to achieve asynchronicity within the same process and thread, we can no longer rely on the assistance of the operating system. We are left to rely on ourselves, which means we cannot avoid making significant changes to the original source code.

Armed with this idea, let's once again recall those two "slow" functions from our very first example at the beginning of this course:

import time
from my_deco import time_counter
N = 5
DELAY = 0.5


def func1(n):
    for i in range(n):
        time.sleep(DELAY)
        print(f'--- line #{i} from {n} is completed')


def func2(n):
    for i in range(n):
        time.sleep(DELAY)
        print(f'=== line #{i} from {n} is completed')


@time_counter
def main():
    func1(N)
    func2(N)
    print(f'All functions completed')


if __name__ == '__main__':
    main()

As we already know well, when we call the function func1(n), the further execution of the main program will be suspended until this function completes all its iterations. Only after that, control will move to the next line of code.

In other words, a regular function has the property of blocking the execution of the main code from the moment of its invocation until its complete completion.

However, in Python, there is a wonderful object called a generator, which can also be considered as a kind of function. But it's a function without blocking. It's a function that can be executed "partially" or "step-by-step." Each time it is called, it doesn't complete its execution but only advances by "one step," one iteration, and no more. However, it remembers its state, the current step it stopped at, so that it doesn't repeat itself and can continue its work from the next step.

The generator is incredibly popular in Python, so there is no doubt that most readers are very familiar with what it is. Nevertheless, it is still worth saying a few introductory words on this topic.

Generators in Python

Below is an example of a generator function gen():

def gen(seq: iter):
    for item in seq:
        yield item


def main():
    data = [0, 1, 2, 3]

    for i in gen(data):
        print(i)


if __name__ == '__main__':
    main()

In this case, the yield statement serves as the exact stopping point where the generator temporarily suspends its execution and resumes it upon the next call.

Therefore, you cannot simply run the generator like a regular function once and wait for the result. The generator needs to be continuously managed. This is precisely what the main() function does in our case.

In this example, the generator's data is extracted using a loop. This is perhaps the simplest way to work with a generator. However, for our case, this approach is not entirely suitable because the loop strictly retrieves all the elements of the generator in sequential order. As a result, this construction (generator + its management from the main() function) ends up behaving similar to a loop in a regular (blocking) function.

Hence, we will utilize the __next__() method (or the next() function), which allows for arbitrary access to the generator:

def gen(seq: iter):
    for item in seq:
        yield item


def main():
    data = [0, 1, 2, 3]

    while True:
        print(next(gen(data)))


if __name__ == '__main__':
    main()

However, in this case, we end up with an infinite loop where the generator returns the same initial value of 0 every time. To fix this, the generator needs to be initialized first.

Initialization of the generator is done by calling the function that contains the yield keyword. When the generator function is called in the code, it doesn't execute immediately but returns a generator object. This object can be used to iterate over the sequence of values generated by the generator function:

def gen(seq: iter):
    for item in seq:
        yield item


def main():
    data = [0, 1, 2, 3]

    # initialization
    g = gen(data)

    while True:
        print(next(g))


if __name__ == '__main__':
    main()

Well, you're almost there. However, after exhausting all the values from the generator, a StopIteration exception is raised, which would make sense to catch:

def gen(seq: iter):
    for item in seq:
        yield item


def main():
    data = [0, 1, 2, 3]

    # initialization
    g = gen(data)

    while True:
        try:
            print(next(g))
        except StopIteration:
            print('the generator is exhausted')
            break


if __name__ == '__main__':
    main()

Well, there you have it. Everything is in order now - we have complete control over the process of extracting values from the generator. And if needed, we can sequentially extract values from multiple generator functions, which externally appears as parallel execution of these functions.

To conclude this brief overview of generator topic, let's add two final touches:
  1. The loop in the generator function gen() can be written much more compactly: yield from seq.
  2. The iterator in the form of a list [0, 1, 2, 3] that is passed to the generator can be written more compactly as the range object: range(4).
Here's the updated code, taking into account the two last additions:

def gen(seq: iter):
    yield from seq


def main():
    data = range(4)  # [0, 1, 2, 3] (not equal, but about the same in your case!)

    # initialization
    g = gen(data)

    while True:
        try:
            print(next(g))
        except StopIteration:
            print('the generator is exhausted')
            break


if __name__ == '__main__':
    main()

Replacing Blocking Functions with Generators

As we just learned from the previous section, it's not enough to replace functions with generators, we also need to manage these generators.

Thus, there arises a need for another dispatcher function, called main(), which controls the execution of generator functions. It can also be referred to as an Event Loop since each event of receiving a new value from a generator is born within the depths of the event loop.

If there are two or more generators, the task for the event loop becomes slightly more complex since each generator needs to be called in turn.

def gen(seq: iter):
    yield from seq


def main():
    data1 = range(5)
    data2 = data1

    g1 = gen(data1)
    g2 = gen(data2)

    while True:
        try:
            print(next(g1))
            print(next(g2))
        except StopIteration:
            print('the generators are exhausted')
            break


if __name__ == '__main__':
    main()

This code already bears a strong resemblance to our recent example with threads , as the generator functions g1() and g2() behave in a similar way in our example: they no longer block the execution of the main program until they are completed. Therefore, both generator functions now run in parallel.

However, in this example, the event loop appears to be somewhat simplified, as it does not take into account that the generators can yield sequences of different lengths. Below is an adjusted version that addresses this issue:

def gen(seq: iter):
    yield from seq


def main():
    data1 = range(5)
    data2 = range(15, 18)

    g1 = gen(data1)
    g2 = gen(data2)
    g1_not_exhausted = True
    g2_not_exhausted = True

    while g1_not_exhausted or g2_not_exhausted:
        if g1_not_exhausted:
            try:
                print(next(g1))
            except StopIteration:
                print('the generator 1 is exhausted')
                g1_not_exhausted = False

        if g2_not_exhausted:
            try:
                print(next(g2))
            except StopIteration:
                print('the generator 2 is exhausted')
                g2_not_exhausted = False

Now we can refactor our initial example where regular functions func1() and func2() will be transformed into generators gen1() and gen2():

import time
from my_deco import time_counter

N = 5
DELAY = 0.5


def gen1(n):
    for i in range(n):
        yield
        time.sleep(DELAY)
        print(f'--- line #{i} from {n} is completed')


def gen2(n):
    for i in range(n):
        yield
        time.sleep(DELAY)
        print(f'=== line #{i} from {n} is completed')


@time_counter
def main():
    g1 = gen1(N)
    g2 = gen2(N)
    g1_not_exhausted = True
    g2_not_exhausted = True

    while g1_not_exhausted or g2_not_exhausted:
        if g1_not_exhausted:
            try:
                next(g1)
            except StopIteration:
                print('the generator 1 is exhausted')
                g1_not_exhausted = False

        if g2_not_exhausted:
            try:
                next(g2)
            except StopIteration:
                print('the generator 2 is exhausted')
                g2_not_exhausted = False


if __name__ == '__main__':
   main()

Now, this code even more closely resembles the previous example with threads, as the modified functions func1() and func2() (transformed into generators gen1() and gen2()) are effectively executed in parallel. However, there is one caveat: each function still contains a blocking delay of 2 seconds. To solve this problem, we can utilize the asyncio package.

But before we dive into writing our first asynchronous script using this package, we need to familiarize ourselves with its fundamental components: Coroutines, Tasks, and the Event Loop.

You can learn more about all the details of this topic from this video (Russian Voice):



To the next topic