Блог

Threading, Multiprocessing и AsyncIO в Python (часть 4)

Содержание

  1. Введение в Асинхронность
    1. Что такое “асинхронный код”?
    2. Сравнение трёх способов получения асинхронного кода
  2. Потоки (threading)
    1. Потоки: создание и управление
    2. Потоки: примитивы синхронизации: Lock
    3. Потоки: примитивы синхронизации: Lock(продолжение), Event, Condition, Semaphore, Queue 🔒
    4. Использования потоков для скрейпинга вебсайтов 🔒
  3. Мультипроцессы (multiprocessing) 🔒
    1. Мультипроцессы : создание и управление 🔒
    2. Мультипроцессы: примитивы синхронизации 🔒
  4. Пакет asyncio
    1. Генератор, как асинхронная функция
    2. Корутины (Coroutines), Задачи (Tasks) и Цикл событий (Event Loop)
    3. Переход от генераторов к корутинам и задачам 🔒
    4. Скрейпинг сайтов с помощью пакета aiohttp 🔒
    5. Работа с файлами с помощью пакета aiofiles 🔒
    6. Примитивы синхронизации для asyncio 🔒
  5. Дополнительные пакеты и методы создания асинхронности 🔒
    1. Пакет subprocess 🔒
    2. Пакет concurrent.futures 🔒
    3. Сокеты - метод timeout() и пакет select 🔒
    4. Пакеты curio и trio 🔒

2. Потоки (threading)
2.2 Потоки. Примитивы синхронизации: Lock

Потоки, которые мы использовали до этого, не взаимодействовали ни друг с другом, ни с основным потоком. Всё, они делали - просто выводили на печать свои собственные результаты.

Однако на практике полностью автономные потоки скорее исключение, чем правило. Чаще потокам приходится обмениваться друг с другом данными, либо совместно использовать (изменять) данные, находящиеся в основном потоке. И в этом случае возникает объективная необходимость синхронизировать между собой действия этих потоков.

Здесь особенно важно обратить внимание на следующее: применение примитивов синхронизации само по себе не делает асинхронный код синхронным (если, конечно, речь не идёт об ошибках программистов))).

Примитивы синхронизации лишь синхронизируют между собой отдельные потоки (либо отдельные процессы - для пакета multiprocessing, либо отдельные корутины в случае пакета asyncio), но отнюдь не превращают асинхронный код в синхронный!

Давайте рассмотрим простейший пример такого взаимодействия - одновременный совместный доступ нескольких потоков к одной единственной переменной из основного потока.

Как видно из следующего примера, несколько потоков в цикле увеличивают значение общей переменной val:

Если бы речь не шла о потоках, то эту конструкцию можно было бы рассматривать как два вложенных цикла: цикл внутри потока - как внутренний цикл и сами потоки - как внешний цикл. Исходя из этого, конечное значение переменной val должно равняться произведению числа итераций двух циклов, то есть числа потоков и числа внутренних циклов (в нашем случае это 100 * 100 = 10 000).

Это было бы действительно так, если бы операция += являлась бы потокобезопасной. Что на самом деле это далеко не так.

from threading import Thread
from my_deco import time_counter

val = 0
COUNT = 100
NUM_THREADS = 100


def increase():
    global val
    for _ in range(COUNT):
        val += 1


@time_counter
def main():
    threads = [Thread(target=increase) for _ in range(NUM_THREADS)]

    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

    diff = val - COUNT * NUM_THREADS
    print(f'diff = {diff}')


if __name__ == '__main__':
    main()

Во-первых, запись в одну строку на самом деле представляет собой 4 последовательных действия:

  1. извлечение текущего значения переменной val;
  2. извлечение значения инкремента (в нашем случае - это 1);
  3. сложение двух чисел (val + 1);
  4. запись результата как новое значение переменной val

Поэтому существует далеко не нулевая вероятность, что между пунктами 1 и 4 может включиться другой поток, с другим значение val. И тогда, при перезаписи этой переменной, в 4-м пункте потеряется значение, которое увеличилось в одном из этих потоков. Подобное действие получило название “эффект гонки” или “состояние гонки” (race condition).

На нашем примере получить этот эффект достаточно сложно, поскольку величины исходных значений, а именно: число потоков COUNT, число итераций NUM_THREADS и интервал переключения между потоками, не достаточны для устойчивого проявления этого эффекта.

Кстати, дефолтный интервал переключения между потоками можно узнать с помощью метода getswitchinterval() из хорошо всем известного пакета sys:

import sys

interval = sys.getswitchinterval()
print(f'switchinterval = {interval}')

# switchinterval = 0.005

Значение интервала переключения мы можем изменить с помощью метода sys.setswitchinterval(new_interval), но, к сожалению, уменьшать его до значений, на котором проявится эффект гонки, мы не можем. Но зато мы можем программно изменить наш код так, чтобы инкремент значение val, выполнялся бы “не так быстро”. Для этого мы разделим:

  • вычисление нового значения переменной val и
  • замену старого значения новым.

И для пущей убедительности добавим между этими двумя вычислениям задержку в 0,001 секунды:

import time
from threading import Thread
from my_deco import time_counter

val = 0
COUNT = 100
NUM_THREADS = 100


def increase():
    global val
    for _ in range(COUNT):
        new_val = val + 1
        time.sleep(0.001)
        val = new_val


@time_counter
def main():
    threads = [Thread(target=increase) for _ in range(NUM_THREADS)]

    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

    diff = val - COUNT * NUM_THREADS
    print(f'diff = {diff}')


if __name__ == '__main__':
    main()

В данном случае разница diff будет уже очень сильно отличаться от нуля.

Таким образом, “потокоНЕбезопасность” обращения к общим переменным доказана. Как можно исправить эту ситуацию?

Для этой цели в языке Python существуют так называемые примитивы синхронизации (synchronization primitives).

Пожалуй, самой простой, базовой и наиболее часто используемым примитивом синхронизации является блокировка Lock(), которая работает по следующему алгоритму:

  1. Перед тем, как потоку будет позволено начать изменение данных, проверяется, не начал ли это изменение другой поток.
  2. Если изменения уже начаты другим потоком, то текущий поток ставится в очередь.
  3. Когда это очередь доходит до ожидающего потока, то происходит открытие данные для изменений текущего потока с одновременной блокировки этих изменений для всех других - метод acquire().
  4. После завершения изменений, текущий поток разблокирует доступ и право изменений передаются следующему по очереди потоку - метод release().

import time
from threading import Thread, Lock
from my_deco import time_counter

COUNT = 100
NUM_THREADS = 100
val = 0
val_lock = Lock()


def increase():
    global val
    for _ in range(COUNT):
        val_lock.acquire()
        new_val = val + 1
        time.sleep(0.001)
        val = new_val
        val_lock.release()


@time_counter
def main():
    threads = [Thread(target=increase) for _ in range(NUM_THREADS)]

    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()

    diff = val - COUNT * NUM_THREADS
    print(f'diff = {diff}')


if __name__ == '__main__':
    main()

Как видим, применение блокировки Lock дало ожидаемый результаты.

(Кстати, иногда этот тип блокировки называют Mutex - примитив синхронизации, который используется для защиты общих ресурсов от одновременного доступа нескольких потоков. Он представляет собой блокировку, которую поток может удерживать или освобождать. Только один поток может удерживать мьютекс в любой момент времени, и все остальные потоки, которые попытаются захватить мьютекс, будут блокироваться до тех пор, пока он не будет освобожден.)

Есть ещё более удобный и компактный вариант использования блокировки Lock, позволяющий избегать применения указанных выше методов acquire() и release(). С этом способом, а также с другими примитивами синхронизации (ровно как и с примерами их использования) вы сможете познакомиться в расширенной версии этого курса.

Более подробно со всеми деталями этой темы вы сможете познакомиться из этого видео:



Переход на следующую тему