Блог

Threading, Multiprocessing и AsyncIO в Python (часть 3)

Содержание

  1. Введение в Асинхронность
    1. Что такое “асинхронный код”?
    2. Сравнение трёх способов получения асинхронного кода
  2. Потоки (threading)
    1. Потоки: создание и управление
    2. Потоки: примитивы синхронизации: Lock
    3. Потоки: примитивы синхронизации: Lock(продолжение), Event, Condition, Semaphore, Queue 🔒
    4. Использования потоков для скрейпинга вебсайтов 🔒
  3. Мультипроцессы (multiprocessing) 🔒
    1. Мультипроцессы : создание и управление 🔒
    2. Мультипроцессы: примитивы синхронизации 🔒
  4. Пакет asyncio 🔒
    1. Генератор, как асинхронная функция 🔒
    2. Корутины (Coroutines), Задачи (Tasks) и Цикл событий (Event Loop) 🔒
    3. Переход от генераторов к корутинам и задачам 🔒
    4. Скрейпинг сайтов с помощью пакета aiohttp 🔒
    5. Работа с файлами с помощью пакета aiofiles 🔒
    6. Примитивы синхронизации для asyncio 🔒
  5. Дополнительные пакеты и методы создания асинхронности 🔒
    1. Пакет subprocess 🔒
    2. Пакет concurrent.futures 🔒
    3. Сокеты - метод timeout() и пакет select 🔒
    4. Пакеты curio и trio 🔒

2. Потоки (threadings)
2.1 Потоки. Создание и управление

Пакет threading является частью стандартной библиотеки Python начиная с версии 1.5 (ноябрь 1997-го), поэтому предварительной установки не требует. Минимум, что нужно для запуска потока, это:

  • Импортировать пакет threading.
  • Написать код, который должен исполняться в потоке, в виде функции.
  • Создать (декларировать) поток, указав в качестве параметра target имя целевой функции. Если в функцию необходимо передать аргументы, то они указываются в параметре agrs в формате данных tuple.
  • Запустить поток с помощью метода start().

import time
from threading import Thread

def clock(delay):
   time.sleep(delay)
   print(f"Current time: {time.strftime('%X')}, delay {delay} sec.")

thread1 = Thread(target=clock, args=(2,))
thread2 = Thread(target=clock, args=(3,))

if __name__ == '__main__':
   start = time.time()
   print(f"Time start: {time.strftime('%X')}")
   thread1.start()
   thread2.start()
   print(f"Time end: {time.strftime('%X')}")
   print(f'======== Total time: {time.time() - start:0.2f} ========')

Результат:

Time start: 07:39:58
Time end: 07:39:58
======== Total time: 0.00 ========
Current time: 07:40:00, delay 2 sec.
Current time: 07:40:01, delay 3 sec.


Process finished with exit code

Как видно из примера, основной поток завершился мгновенно (0 sec), а дополнительные потоки - через 2 и через 3 секунды от времени запуска скрипта соответственно.

Как видим, в нашем случае дополнительные потоки работают без оглядки на время завершения основного потока и процесса в целом, что совершенно логично: процессор работает быстрее внешних устройств, следовательно перед завершение скрипта необходимо дождаться сигнала о корректном завершении работы внешних устройств, иначе возможна потеря данных, оставшихся в буфере этих устройств. Поэтому, если не приняты особые действия, то основной процесс будет ждать полного завершения всех своих потоков.

В тех же случаях, когда надо завершить работу дополнительного потока принудительно, одновременно с завершением всего процесса, то специальный параметр daemon устанавливается в значение True. Обратите внимание, что сделать это можно двумя способами:

import time
from threading import Thread




def clock(delay):
   time.sleep(delay)
   print(f"Current time: {time.strftime('%X')}, delay {delay} sec.")


thread1 = Thread(target=clock, args=(2,))
thread2 = Thread(target=clock, args=(3,), daemon=True)


if __name__ == '__main__':
   start = time.time()
   print(f"Time start: {time.strftime('%X')}")
   thread1.start()
   thread2.daemon = True
   thread2.start()
   print(f"Time end: {time.strftime('%X')}")
   print(f'======== Total time: {time.time() - start:0.2f} ========')

Как видим, daemon второго потока был установлен в True дважды. Разумеется, хватило бы одного раза - второй был использован лишь для демонстрации ещё одного способа установки. Результат: второму потоку не хватило одной секунды для своего совершения, поэтому он прервался досрочно.

Time start: 07:54:41
Time end: 07:54:41
======== Total time: 0.00 ========
Current time: 07:54:43, delay 2 sec.

Process finished with exit code 0

(Кстати, проверить, является ли поток демоном, можно с помощью метода isDaemon(). Если является, то метод вернёт значение True.)

Чтобы узнать, когда именно второй поток прекратил свою работу, мы слегка изменим функцию clock и будем печатать информацию о её работе каждую секунду. Для этого тело функции превращаем в тело цикла с задержкой в 1 секунду и печатью результата. Тогда общее значение задержки будет определяться количеством повторений цикла.

def clock(delay: int):
   for d in range(1, delay + 1):
       time.sleep(1)
       print(f"Current time: {time.strftime('%X')}, {d}; delay {delay} sec.")

Результат показывает, что последние данные о работе второго потока заканчиваются задержкой в 2 секунды. Результат работы в последнюю третью секунду он распечатать не успел, поскольку прервался вместе с основным процессом на 2-й секунде:

Time start: 17:20:42
Time end: 17:20:42
======== Total time: 0.00 ========
Current time: 17:20:43, 1; delay 2 sec.
Current time: 17:20:43, 1; delay 3 sec.
Current time: 17:20:44, 2; delay 2 sec.
Current time: 17:20:44, 2; delay 3 sec.

Process finished with exit code 0

Если и у второго потока сделать daemon=True, то общий процесс завершится мгновенно и оба потока не оставят абсолютно никаких следов своей активности:

Time start: 17:29:27
Time end: 17:29:27
======== Total time: 0.00 ========

Process finished with exit code 0

Таким образом, можно сделать вывод: общая длительность процесса определяется длительностью “самого долгого” потока, при условии, что он не daemon. В противном случае - длительностью основного потока.

Объединение основного и вспомогательных потоков.

Очень часто результаты вспомогательных потоков необходимо использовать в основном потоке. И тогда на помощь приходим метод join(), который приостанавливает работу основного потока до тех пор, пока не завершится поток присоединённый:

if __name__ == '__main__':
   start = time.time()
   print(f"Time start: {time.strftime('%X')}")
   thread1.start()
   thread1.join()
   thread2.start()
   print(f"Time end: {time.strftime('%X')}")
   print(f'======== Total time: {time.time() - start:0.2f} ========')

Всё, что мы изменили в предыдущем скрипте - это присоединили первый поток к основному сразу же после его запуска. Что привело к следующему результату:

Time start: 17:53:40
Current time: 17:53:41, 1; delay 2 sec.
Current time: 17:53:42, 2; delay 2 sec.
Time end: 17:53:42
======== Total time: 2.00 ========

Process finished with exit code 0

Показательным здесь является то, что длительность основного потока увеличилась до 2-х секунд - ровно на время выполнения первого потока. Только дождавшись завершения первого поток, основной поток запустил второй, и тут же завершился. Остановив при этом весь процесс. Что привело к завершению и второго потока, которому так и не удалось поработать.

Но если мы сначала запустим оба потока, а только потом присоединим первый поток к основному потоку,

thread1.start()
thread2.start()
thread1.join()

то картина изменится:

Time start: 18:05:02
Current time: 18:05:03, 1; delay 2 sec.
Current time: 18:05:03, 1; delay 3 sec.
Current time: 18:05:04, 2; delay 2 sec.
Current time: 18:05:04, 2; delay 3 sec.
Time end: 18:05:04
======== Total time: 2.00 ========

Process finished with exit code 0

Важный вывод: сначала запуск всех потоков, а затем присоединение одного из потоков к основному потоку позволяет завершить все запущенные потоки. Потоки, время выполнения которых меньше или равно времени выполнения объединенного потока, будут иметь достаточно времени для завершения. Все остальные потоки будут работать только во время работы объединенного потока (в нашем случае это thread2).

Следовательно, если мы присоединим к основному потоку самый продолжительный поток, это позволит завершиться всем потокам. Общее время процесса в нашем случае будет определяться временем выполнения этого потока (в нашем случае 3 секунды):

Time start: 18:14:17
Current time: 18:14:18, 1; delay 2 sec.
Current time: 18:14:18, 1; delay 3 sec.
Current time: 18:14:19, 2; delay 2 sec.
Current time: 18:14:19, 2; delay 3 sec.
Current time: 18:14:20, 3; delay 3 sec.
Time end: 18:14:20
======== Total time: 3.00 ========

Process finished with exit code 0

А теперь небольшое задание - попробуйте самостоятельно ответить на вопрос: Что снанет с общем временем выполнения в этом

thread1.start()
thread2.start()
thread1.join()
thread2.join()

, в этом:

thread1.start()
thread2.start()
thread2.join()
thread1.join()

, и в этом случае?

thread1.start()
thread1.join()
thread2.start()
thread2.join()

И главное, попытайтесь объяснить, почему общее время работы изменилось именно так, а не иначе.

Рефакторинг примера с двумя функциями

Пример кода с двумя функциями, который мы рассмотрели в самом начале, преследовал цель добиться максимальной асинхронности. Для чего мы отправили потенциально блокирующие функции в собственные потоки, откуда они уже больше никак не смогут помешать основному потоку. В этом, конечно, огромный плюс. Но есть и маленький минус - декоратор подсчёта времени работы остался в основном потоке (в функции main(), и сейчас мы не знаем, на сколько быстрее (или медленнее) стал работать код после добавления потоков.

Давайте, присоединим дополнительные потоки к основному и посмотрим, как изменилось время работы:

import time
from threading import Thread
from my_deco import time_counter

N = 5
DELAY = 0.5


def func1(n):
   for i in range(n):
       time.sleep(DELAY)
       print(f'--- line #{i} from {n} is completed')


def func2(n):
   for i in range(n):
       time.sleep(DELAY)
       print(f'=== line #{i} from {n} is completed')


@time_counter
def main():
   thread1 = Thread(target=func1, args=(N,))
   thread2 = Thread(target=func2, args=(N,))
   thread1.start()
   thread2.start()
   thread1.join()
   thread2.join()
   print(f'All functions completed')

if __name__ == '__main__':
   main()


======== Script started ========
--- line #0 from 5 is completed
=== line #0 from 5 is completed
--- line #1 from 5 is completed
=== line #1 from 5 is completed
. . . . . . . . .

All functions completed
======== Script execution time: 2.51 ========

Итого: 2,5 секунды - в 2 раза быстрее, чем обычный синхронный код. Что вполне объяснимо: теперь обе функции запускаются одновременно, поэтому время работы всего скрипта теперь равно времени каждой из них.

Есть ли способ ещё уменьшить это время? Вероятно - да, только для начала, хорошо было бы понять, в чём причина задержки.

А причина в том, что благодаря циклу внутри функции, каждая последующая итерация ника не может запуститься раньше предыдущей. То есть каждый раз, для запуска следующей итерации приходится ждать DELAY секунд задержки предыдущей.

Решение очевидно: в функции надо оставить только одну итерацию, а цикл перенести в функцию main(). Тогда можно создать число потоков, равное числу итераций двух функций. А из предыдущих примеров мы уже знаем, что если все эти потоки запустить одновременно, то из общее время выполнения будет равно времени самого долгого потока.

Ну, хорошо, сказано - сделано:

import time
from threading import Thread
from my_deco import time_counter

N = 5
DELAY = 0.5


def func1(i, n):
   time.sleep(DELAY)
   print(f'--- line #{i} from {n} is completed')


def func2(i, n):
   time.sleep(DELAY)
   print(f'=== line #{i} from {n} is completed')

@time_counter
def main():
   threads = []
   threads1 = [Thread(target=func1, args=(i, N)) for i in range(N)]
   threads2 = [Thread(target=func2, args=(i, N)) for i in range(N)]
   threads.extend(threads1)
   threads.extend(threads2)

   for thread in threads:
       thread.start()
   for thread in threads:
       thread.join()
   print(f'All functions completed')


if __name__ == '__main__':
   main()

И время работы скрипта теперь сократилось до времени работы самого долгого потока:

======== Script started ========
--- line #0 from 5 is completed
=== line #0 from 5 is completed
--- line #1 from 5 is completed
=== line #1 from 5 is completed
. . . . . . . . .

All functions completed
======== Script execution time: 0.51 ========

Создание потока с помощью класса

В предыдущих примерах для создания и управления потоками были использованы функции. Разумеется, тоже самое можно сделать и с помощью классов.

В этом случае:

  1. Класс, описывающий поток должен наследовать класс Thread
  2. Для описания поведения потока используется (точнее, переопределяется) метод run()
  3. Все дополнительные параметры, включая daemon, передаются с помощью атрибутов класса в дандер-методе __init__()

В следующем примере создаётся два daemon-потока. Благодаря тому, что самый долгий из них присоединяется к основному потоку, успевают закончится оба потока:

import time
from threading import Thread

class ClockThread(Thread):
   def __init__(self, delay):
       super().__init__()
       self.delay = delay
       self.daemon = True

   def run(self):
       time.sleep(self.delay)
       print(f"Current time: {time.strftime('%X')}, delay {self.delay} sec.")


thread_1 = ClockThread(2)
thread_2 = ClockThread(3)

if __name__ == '__main__':
   start = time.time()
   print(f"Time start: {time.strftime('%X')}")
   thread_1.start()
   thread_2.start()
   thread_2.join()
   print(f"Time end: {time.strftime('%X')}")
   print(f'======== Total time: {time.time() - start:0.2f} ========')

Скрипт, как и положено, работает 3 секунды:

Time start: 00:32:58
Current time: 00:33:00, delay 2 sec.
Current time: 00:33:01, delay 3 sec.
Time end: 00:33:01
======== Total time: 3.00 ========

Process finished with exit code 0

Как видим, никаких принципиальных изменений описание потоков с помощью классов не вызвало. По-прежнему, управление объекта потока осуществляется с помощью методов start() и join(), а параметр (атрибут) daemon можно сразу указать в __init__(), либо затем определить (переопределить) в экземпляре пользовательского класса.

Более подробно со всеми деталями этой темы вы сможете познакомиться из этого видео:



Переход на следующую тему