Блог

Threading, Multiprocessing и AsyncIO в Python (часть 5)

Содержание

  1. Введение в Асинхронность
    1. Что такое “асинхронный код”?
    2. Сравнение трёх способов получения асинхронного кода
  2. Потоки (threading)
    1. Потоки: создание и управление
    2. Потоки: примитивы синхронизации: Lock
    3. Потоки: примитивы синхронизации: Lock(продолжение), Event, Condition, Semaphore, Queue 🔒
    4. Использования потоков для скрейпинга вебсайтов 🔒
  3. Мультипроцессы (multiprocessing) 🔒
    1. Мультипроцессы : создание и управление 🔒
    2. Мультипроцессы: примитивы синхронизации 🔒
  4. Пакет asyncio
    1. Генератор, как асинхронная функция
    2. Корутины (Coroutines), Задачи (Tasks) и Цикл событий (Event Loop)
    3. Переход от генераторов к корутинам и задачам 🔒
    4. Скрейпинг сайтов с помощью пакета aiohttp 🔒
    5. Работа с файлами с помощью пакета aiofiles 🔒
    6. Примитивы синхронизации для asyncio 🔒
  5. Дополнительные пакеты и методы создания асинхронности 🔒
    1. Пакет subprocess 🔒
    2. Пакет concurrent.futures 🔒
    3. Сокеты - метод timeout() и пакет select 🔒
    4. Пакеты curio и trio 🔒

4. Asyncio
4.1 Генератор, как асинхронная функция

И, наконец, мы подошли к третьему способу создания асинхронного кода, когда весь программный код находится в пределах не только одного процесса, но и одного же потока (см. Схему сравнения трёх способов асинхронности).

В двух предыдущих двух случаях (пакеты threading и multiprocessing) к исходному коду не предъявлялось никаких специальных требований. Для того, что превратить этот код в асинхронный, мы просто брали блокирующую или “медленную” функцию и помещали её в отдельный поток или процесс. И делали это без каких-либо изменений исходной функции, поскольку помещали эти функции в отдельный процесс или поток, работой которого управляла операционная система.

Однако в случае, когда мы пытаемся получить асинхронность в пределах одного процесса и одного же потока, рассчитывать на помощь операционной системы больше не приходится. Остаётся рассчитывать только на самих себя, поэтому без существенного изменения исходного кода нам никак не обойтись.

Вооружившись этой идеей, давайте ещё раз вспомним те две “долгих” функции из нашего самого первого примера в начале этого курса:

import time
from my_deco import time_counter
N = 5
DELAY = 0.5


def func1(n):
    for i in range(n):
        time.sleep(DELAY)
        print(f'--- line #{i} from {n} is completed')


def func2(n):
    for i in range(n):
        time.sleep(DELAY)
        print(f'=== line #{i} from {n} is completed')


@time_counter
def main():
    func1(N)
    func2(N)
    print(f'All functions completed')


if __name__ == '__main__':
    main()

Как мы уже хорошо знаем, при вызове функция func1(n), дальнейшее выполнение основной программы будет приостановлено ровно до тех пор, пока эта функция не выполнит все свои итерации до конца. И только после этого управление перейдёт к следующей строке кода.

Иными словами, обычная функция обладает свойством блокировать выполнение основного кода от момента её вызова и до момента её полного завершения.

Однако в Python есть замечательный объект “генератор”, который тоже можно рассматривать как, своего рода, функцию. Но функцию без блокировки. Функцию, которая способна выполняться “по частям” или “по шагам”. Которая при каждом вызове выполняется не до её полного завершения, а продвигается только на “один шаг”, на одну итерацию и не более, но при этом запоминает своё состояние, свой текущий шаг на котором она остановилась. Поэтому, при следующем к ней обращении, эта функция-генератор не начинает свою работу с самого начала, а продолжает её именно с того места, где в последний раз остановилась.

Генератор невероятно популярен в Python, поэтому нет никаких сомнений, что большинство читателей очень хорошо знают что это такое. Тем не менее, несколько вступительных слов на эту тему сказать, всё-таки, стоит.

Генераторы в Python

Ниже приведён пример функции-генератора gen():

def gen(seq: iter):
    for item in seq:
        yield item


def main():
    data = [0, 1, 2, 3]

    for i in gen(data):
        print(i)


if __name__ == '__main__':
    main()

В данном случае оператор yield является той самой точной останова, на которой генератор временно прерывает свою работу, и с которой он её возобновляет при следующем вызове.

Следовательно, генератор нельзя как обычную функцию запустить один раз и ждать результата. Генератором необходимо управлять непрерывно. Чем, в нашем случае, и занимается функция main():

В этом примере, данные из генератора извлекаются с помощью цикла. Это, пожалуй, самый простой способ организовать работу с генератором. Впрочем, для нашего случая этот вариант не вполне подходящий, поскольку благодаря циклу, все элементы генератора извлекаются строго последовательно. Что в итоге делает работу этой конструкции (генератор + его управление из функции main) мало отличимой от работы цикла в обычной (блокирующей) функции.

Поэтому, воспользуемся методом __next__() (или функцией next()), который (-ая) позволяет организовывать произвольный доступ к генератору:
def gen(seq: iter):
    for item in seq:
        yield item


def main():
    data = [0, 1, 2, 3]

    while True:
        print(next(gen(data)))


if __name__ == '__main__':
    main()

Однако в этом случае мы получаем бесконечный цикл, в котором всякий раз возвращается одно и тоже начальное значение генератора - 0. Чтобы это исправить, генератор необходимо сначала инициализировать.

Инициализация генератора осуществляется путем вызова функции, содержащей ключевое слово yield. Когда функция-генератор вызывается в коде, она не выполняется непосредственно, а возвращает объект-генератор. Этот объект может быть использован для итерации по последовательности значений, генерируемых функцией-генератором:

def gen(seq: iter):
    for item in seq:
        yield item


def main():
    data = [0, 1, 2, 3]

    # initialization
    g = gen(data)

    while True:
        print(next(g))


if __name__ == '__main__':
    main()

Ну вот, почти всё получилось. Правда после исчерпания значений генератора выбрасывается исключение StopIteration, которое логично было бы перехватить:

def gen(seq: iter):
    for item in seq:
        yield item


def main():
    data = [0, 1, 2, 3]

    # initialization
    g = gen(data)

    while True:
        try:
            print(next(g))
        except StopIteration:
            print('the generator is exhausted')
            break


if __name__ == '__main__':
    main()

Ну вот, всё в порядке - теперь мы полностью контролируем процесс извлечения значений из генератора. И, при необходимости, можем последовательно извлекать значения сразу из нескольких функций-генераторов, что внешне будет выглядеть как параллельная работа этих самых функции. Ну чем не конкаренси?

В завершении краткого обзора темы генераторов стоит добавить два последних штриха:
  1. Цикл в генераторе gen() можно записать значительно компактнее: yield from seq
  2. Итератор в виде списка [0, 1, 2, 3], который передаётся в генератор в данном случае мы запишем более компактно, как объект range: range(4)
Изменённый код с учётом двух последних дополнений:

def gen(seq: iter):
    yield from seq


def main():
    data = range(4)  # [0, 1, 2, 3] (not equal, but about the same in your case!)

    # initialization
    g = gen(data)

    while True:
        try:
            print(next(g))
        except StopIteration:
            print('the generator is exhausted')
            break


if __name__ == '__main__':
    main()

Замена блокирующих функций на генераторы

Как мы только что узнали из предыдущего блока, мало заменить функции на генераторы, этими генераторами надо ещё и управлять.

Таким образом, возникает необходимость в ещё одной функции-диспетчере main(), которая управляет работой функций-генераторов. Можно также назвать его и Циклом событий (Event Loop), поскольку каждое событие получения от генератора нового значения рождается именно в недрах цикла событий.

Если генераторов будет два и более, то задача для цикла событий несколько усложняется, поскольку вызывать теперь приходится каждый генератор поочерёдно:

def gen(seq: iter):
    yield from seq


def main():
    data1 = range(5)
    data2 = data1

    g1 = gen(data1)
    g2 = gen(data2)

    while True:
        try:
            print(next(g1))
            print(next(g2))
        except StopIteration:
            print('the generators are exhausted')
            break


if __name__ == '__main__':
    main()

Этот код уже очень напоминает наш недавний пример с потоками , поскольку функции-генераторы g1 и g2 ведут себя в нашем примере очень схожим образом, а именно: они не блокируют выполнения основной программы, а выполняются параллельно.

Правда, в этом примере цикл событий здесь выглядит несколько упрощённо, поскольку не учитывает, что генераторы могут принимать последовательность разной длины. Ниже предлагается скорректированный вариант, который устраняет этот недостаток:

def gen(seq: iter):
    yield from seq


def main():
    data1 = range(5)
    data2 = range(15, 18)

    g1 = gen(data1)
    g2 = gen(data2)
    g1_not_exhausted = True
    g2_not_exhausted = True

    while g1_not_exhausted or g2_not_exhausted:
        if g1_not_exhausted:
            try:
                print(next(g1))
            except StopIteration:
                print('the generator 1 is exhausted')
                g1_not_exhausted = False

        if g2_not_exhausted:
            try:
                print(next(g2))
            except StopIteration:
                print('the generator 2 is exhausted')
                g2_not_exhausted = False

Теперь мы можем провести рефакторинг нашего первоначального примера, в котором обычные функции func1() и func2() будут трансформированы в генераторы gen1() и gen2():

import time
from my_deco import time_counter

N = 5
DELAY = 0.5


def gen1(n):
    for i in range(n):
        yield
        time.sleep(DELAY)
        print(f'--- line #{i} from {n} is completed')


def gen2(n):
    for i in range(n):
        yield
        time.sleep(DELAY)
        print(f'=== line #{i} from {n} is completed')


@time_counter
def main():
    g1 = gen1(N)
    g2 = gen2(N)
    g1_not_exhausted = True
    g2_not_exhausted = True

    while g1_not_exhausted or g2_not_exhausted:
        if g1_not_exhausted:
            try:
                next(g1)
            except StopIteration:
                print('the generator 1 is exhausted')
                g1_not_exhausted = False

        if g2_not_exhausted:
            try:
                next(g2)
            except StopIteration:
                print('the generator 2 is exhausted')
                g2_not_exhausted = False


if __name__ == '__main__':
   main()

Этот код ещё больше напоминает предыдущий пример с потоками , поскольку модифицированные функции func1() и func2() (превратившиеся в генераторы gen1() и gen2()) выполняются фактически параллельно. Правда есть одно “но”: внутри каждой из функций осталась блокирующая задержка на 2 секунды. Решить и эту проблему нам позволит использование пакета asyncio. Но прежде, чем приступить к написанию первого (осмысленного!) асинхронного скрипта с помощью этого пакета, необходимо ознакомиться с его базовыми элементами, а именно: Корутиной (Coroutine), Задачами (task) и Циклом Событий (Event Loop).

Более подробно со всеми деталями этой темы вы сможете познакомиться из этого видео:



Переход на следующую тему