И, наконец, мы подошли к третьему способу создания асинхронного кода, когда весь программный код находится в пределах не только одного процесса, но и одного же потока (см. Схему сравнения трёх способов асинхронности).
В двух предыдущих двух случаях (пакеты threading и multiprocessing) к исходному коду не предъявлялось никаких специальных требований. Для того, что превратить этот код в асинхронный, мы просто брали блокирующую или “медленную” функцию и помещали её в отдельный поток или процесс. И делали это без каких-либо изменений исходной функции, поскольку помещали эти функции в отдельный процесс или поток, работой которого управляла операционная система.
Однако в случае, когда мы пытаемся получить асинхронность в пределах одного процесса и одного же потока, рассчитывать на помощь операционной системы больше не приходится. Остаётся рассчитывать только на самих себя, поэтому без существенного изменения исходного кода нам никак не обойтись.
Вооружившись этой идеей, давайте ещё раз вспомним те две “долгих” функции из нашего самого первого примера в начале этого курса:
import time
from my_deco import time_counter
N = 5
DELAY = 0.5
def func1(n):
for i in range(n):
time.sleep(DELAY)
print(f'--- line #{i} from {n} is completed')
def func2(n):
for i in range(n):
time.sleep(DELAY)
print(f'=== line #{i} from {n} is completed')
@time_counter
def main():
func1(N)
func2(N)
print(f'All functions completed')
if __name__ == '__main__':
main()
Как мы уже хорошо знаем, при вызове функция func1(n), дальнейшее выполнение основной программы будет приостановлено ровно до тех пор, пока эта функция не выполнит все свои итерации до конца. И только после этого управление перейдёт к следующей строке кода.
Иными словами, обычная функция обладает свойством блокировать выполнение основного кода от момента её вызова и до момента её полного завершения.
Однако в Python есть замечательный объект “генератор”, который тоже можно рассматривать как, своего рода, функцию. Но функцию без блокировки. Функцию, которая способна выполняться “по частям” или “по шагам”. Которая при каждом вызове выполняется не до её полного завершения, а продвигается только на “один шаг”, на одну итерацию и не более, но при этом запоминает своё состояние, свой текущий шаг на котором она остановилась. Поэтому, при следующем к ней обращении, эта функция-генератор не начинает свою работу с самого начала, а продолжает её именно с того места, где в последний раз остановилась.
Генератор невероятно популярен в Python, поэтому нет никаких сомнений, что большинство читателей очень хорошо знают что это такое. Тем не менее, несколько вступительных слов на эту тему сказать, всё-таки, стоит.
Ниже приведён пример функции-генератора gen():
def gen(seq: iter):
for item in seq:
yield item
def main():
data = [0, 1, 2, 3]
for i in gen(data):
print(i)
if __name__ == '__main__':
main()
В данном случае оператор yield является той самой точной останова, на которой генератор временно прерывает свою работу, и с которой он её возобновляет при следующем вызове.
Следовательно, генератор нельзя как обычную функцию запустить один раз и ждать результата. Генератором необходимо управлять непрерывно. Чем, в нашем случае, и занимается функция main():
В этом примере, данные из генератора извлекаются с помощью цикла. Это, пожалуй, самый простой способ организовать работу с генератором. Впрочем, для нашего случая этот вариант не вполне подходящий, поскольку благодаря циклу, все элементы генератора извлекаются строго последовательно. Что в итоге делает работу этой конструкции (генератор + его управление из функции main) мало отличимой от работы цикла в обычной (блокирующей) функции. Поэтому, воспользуемся методом __next__() (или функцией next()), который (-ая) позволяет организовывать произвольный доступ к генератору:def gen(seq: iter):
for item in seq:
yield item
def main():
data = [0, 1, 2, 3]
while True:
print(next(gen(data)))
if __name__ == '__main__':
main()
Однако в этом случае мы получаем бесконечный цикл, в котором всякий раз возвращается одно и тоже начальное значение генератора - 0. Чтобы это исправить, генератор необходимо сначала инициализировать.
Инициализация генератора осуществляется путем вызова функции, содержащей ключевое слово yield. Когда функция-генератор вызывается в коде, она не выполняется непосредственно, а возвращает объект-генератор. Этот объект может быть использован для итерации по последовательности значений, генерируемых функцией-генератором:
def gen(seq: iter):
for item in seq:
yield item
def main():
data = [0, 1, 2, 3]
# initialization
g = gen(data)
while True:
print(next(g))
if __name__ == '__main__':
main()
Ну вот, почти всё получилось. Правда после исчерпания значений генератора выбрасывается исключение StopIteration, которое логично было бы перехватить:
def gen(seq: iter):
for item in seq:
yield item
def main():
data = [0, 1, 2, 3]
# initialization
g = gen(data)
while True:
try:
print(next(g))
except StopIteration:
print('the generator is exhausted')
break
if __name__ == '__main__':
main()
Ну вот, всё в порядке - теперь мы полностью контролируем процесс извлечения значений из генератора. И, при необходимости, можем последовательно извлекать значения сразу из нескольких функций-генераторов, что внешне будет выглядеть как параллельная работа этих самых функции. Ну чем не конкаренси?
В завершении краткого обзора темы генераторов стоит добавить два последних штриха:def gen(seq: iter):
yield from seq
def main():
data = range(4) # [0, 1, 2, 3] (not equal, but about the same in your case!)
# initialization
g = gen(data)
while True:
try:
print(next(g))
except StopIteration:
print('the generator is exhausted')
break
if __name__ == '__main__':
main()
Как мы только что узнали из предыдущего блока, мало заменить функции на генераторы, этими генераторами надо ещё и управлять.
Таким образом, возникает необходимость в ещё одной функции-диспетчере main(), которая управляет работой функций-генераторов. Можно также назвать его и Циклом событий (Event Loop), поскольку каждое событие получения от генератора нового значения рождается именно в недрах цикла событий.
Если генераторов будет два и более, то задача для цикла событий несколько усложняется, поскольку вызывать теперь приходится каждый генератор поочерёдно:
def gen(seq: iter):
yield from seq
def main():
data1 = range(5)
data2 = data1
g1 = gen(data1)
g2 = gen(data2)
while True:
try:
print(next(g1))
print(next(g2))
except StopIteration:
print('the generators are exhausted')
break
if __name__ == '__main__':
main()
Этот код уже очень напоминает наш недавний пример с потоками , поскольку функции-генераторы g1 и g2 ведут себя в нашем примере очень схожим образом, а именно: они не блокируют выполнения основной программы, а выполняются параллельно.
Правда, в этом примере цикл событий здесь выглядит несколько упрощённо, поскольку не учитывает, что генераторы могут принимать последовательность разной длины. Ниже предлагается скорректированный вариант, который устраняет этот недостаток:
def gen(seq: iter):
yield from seq
def main():
data1 = range(5)
data2 = range(15, 18)
g1 = gen(data1)
g2 = gen(data2)
g1_not_exhausted = True
g2_not_exhausted = True
while g1_not_exhausted or g2_not_exhausted:
if g1_not_exhausted:
try:
print(next(g1))
except StopIteration:
print('the generator 1 is exhausted')
g1_not_exhausted = False
if g2_not_exhausted:
try:
print(next(g2))
except StopIteration:
print('the generator 2 is exhausted')
g2_not_exhausted = False
Теперь мы можем провести рефакторинг нашего первоначального примера, в котором обычные функции func1() и func2() будут трансформированы в генераторы gen1() и gen2():
import time
from my_deco import time_counter
N = 5
DELAY = 0.5
def gen1(n):
for i in range(n):
yield
time.sleep(DELAY)
print(f'--- line #{i} from {n} is completed')
def gen2(n):
for i in range(n):
yield
time.sleep(DELAY)
print(f'=== line #{i} from {n} is completed')
@time_counter
def main():
g1 = gen1(N)
g2 = gen2(N)
g1_not_exhausted = True
g2_not_exhausted = True
while g1_not_exhausted or g2_not_exhausted:
if g1_not_exhausted:
try:
next(g1)
except StopIteration:
print('the generator 1 is exhausted')
g1_not_exhausted = False
if g2_not_exhausted:
try:
next(g2)
except StopIteration:
print('the generator 2 is exhausted')
g2_not_exhausted = False
if __name__ == '__main__':
main()
Этот код ещё больше напоминает предыдущий пример с потоками , поскольку модифицированные функции func1() и func2() (превратившиеся в генераторы gen1() и gen2()) выполняются фактически параллельно. Правда есть одно “но”: внутри каждой из функций осталась блокирующая задержка на 2 секунды. Решить и эту проблему нам позволит использование пакета asyncio. Но прежде, чем приступить к написанию первого (осмысленного!) асинхронного скрипта с помощью этого пакета, необходимо ознакомиться с его базовыми элементами, а именно: Корутиной (Coroutine), Задачами (task) и Циклом Событий (Event Loop).
Более подробно со всеми деталями этой темы вы сможете познакомиться из этого видео:
Потоки, которые мы использовали до этого, не взаимодействовали ни друг с другом, ни с основным потоком. Всё, они делали - просто выводили на печать свои собственные результаты.
Однако на практике полностью автономные потоки скорее исключение, чем правило. Чаще потокам приходится обмениваться друг с другом данными, либо совместно использовать (изменять) данные, находящиеся в основном потоке. И в этом случае возникает объективная необходимость синхронизировать между собой действия этих потоков.
Здесь особенно важно обратить внимание на следующее: применение примитивов синхронизации само по себе не делает асинхронный код синхронным (если, конечно, речь не идёт об ошибках программистов))).
Примитивы синхронизации лишь синхронизируют между собой отдельные потоки (либо отдельные процессы - для пакета multiprocessing, либо отдельные корутины в случае пакета asyncio), но отнюдь не превращают асинхронный код в синхронный!
Давайте рассмотрим простейший пример такого взаимодействия - одновременный совместный доступ нескольких потоков к одной единственной переменной из основного потока.
Как видно из следующего примера, несколько потоков в цикле увеличивают значение общей переменной val:
Если бы речь не шла о потоках, то эту конструкцию можно было бы рассматривать как два вложенных цикла: цикл внутри потока - как внутренний цикл и сами потоки - как внешний цикл. Исходя из этого, конечное значение переменной val должно равняться произведению числа итераций двух циклов, то есть числа потоков и числа внутренних циклов (в нашем случае это 100 * 100 = 10 000).
Это было бы действительно так, если бы операция += являлась бы потокобезопасной. Что на самом деле это далеко не так.
from threading import Thread
from my_deco import time_counter
val = 0
COUNT = 100
NUM_THREADS = 100
def increase():
global val
for _ in range(COUNT):
val += 1
@time_counter
def main():
threads = [Thread(target=increase) for _ in range(NUM_THREADS)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
diff = val - COUNT * NUM_THREADS
print(f'diff = {diff}')
if __name__ == '__main__':
main()
Во-первых, запись в одну строку на самом деле представляет собой 4 последовательных действия:
Поэтому существует далеко не нулевая вероятность, что между пунктами 1 и 4 может включиться другой поток, с другим значение val. И тогда, при перезаписи этой переменной, в 4-м пункте потеряется значение, которое увеличилось в одном из этих потоков. Подобное действие получило название “эффект гонки” или “состояние гонки” (race condition).
На нашем примере получить этот эффект достаточно сложно, поскольку величины исходных значений, а именно: число потоков COUNT, число итераций NUM_THREADS и интервал переключения между потоками, не достаточны для устойчивого проявления этого эффекта.
Кстати, дефолтный интервал переключения между потоками можно узнать с помощью метода getswitchinterval() из хорошо всем известного пакета sys:
import sys
interval = sys.getswitchinterval()
print(f'switchinterval = {interval}')
# switchinterval = 0.005
Значение интервала переключения мы можем изменить с помощью метода sys.setswitchinterval(new_interval), но, к сожалению, уменьшать его до значений, на котором проявится эффект гонки, мы не можем. Но зато мы можем программно изменить наш код так, чтобы инкремент значение val, выполнялся бы “не так быстро”. Для этого мы разделим:
И для пущей убедительности добавим между этими двумя вычислениям задержку в 0,001 секунды:
import time
from threading import Thread
from my_deco import time_counter
val = 0
COUNT = 100
NUM_THREADS = 100
def increase():
global val
for _ in range(COUNT):
new_val = val + 1
time.sleep(0.001)
val = new_val
@time_counter
def main():
threads = [Thread(target=increase) for _ in range(NUM_THREADS)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
diff = val - COUNT * NUM_THREADS
print(f'diff = {diff}')
if __name__ == '__main__':
main()
В данном случае разница diff будет уже очень сильно отличаться от нуля.
Таким образом, “потокоНЕбезопасность” обращения к общим переменным доказана. Как можно исправить эту ситуацию?
Для этой цели в языке Python существуют так называемые примитивы синхронизации (synchronization primitives).
Пожалуй, самой простой, базовой и наиболее часто используемым примитивом синхронизации является блокировка Lock(), которая работает по следующему алгоритму:
import time
from threading import Thread, Lock
from my_deco import time_counter
COUNT = 100
NUM_THREADS = 100
val = 0
val_lock = Lock()
def increase():
global val
for _ in range(COUNT):
val_lock.acquire()
new_val = val + 1
time.sleep(0.001)
val = new_val
val_lock.release()
@time_counter
def main():
threads = [Thread(target=increase) for _ in range(NUM_THREADS)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
diff = val - COUNT * NUM_THREADS
print(f'diff = {diff}')
if __name__ == '__main__':
main()
Как видим, применение блокировки Lock дало ожидаемый результаты.
(Кстати, иногда этот тип блокировки называют Mutex - примитив синхронизации, который используется для защиты общих ресурсов от одновременного доступа нескольких потоков. Он представляет собой блокировку, которую поток может удерживать или освобождать. Только один поток может удерживать мьютекс в любой момент времени, и все остальные потоки, которые попытаются захватить мьютекс, будут блокироваться до тех пор, пока он не будет освобожден.)
Есть ещё более удобный и компактный вариант использования блокировки Lock, позволяющий избегать применения указанных выше методов acquire() и release(). С этом способом, а также с другими примитивами синхронизации (ровно как и с примерами их использования) вы сможете познакомиться в расширенной версии этого курса.
Более подробно со всеми деталями этой темы вы сможете познакомиться из этого видео:
Пакет threading является частью стандартной библиотеки Python начиная с версии 1.5 (ноябрь 1997-го), поэтому предварительной установки не требует. Минимум, что нужно для запуска потока, это:
import time
from threading import Thread
def clock(delay):
time.sleep(delay)
print(f"Current time: {time.strftime('%X')}, delay {delay} sec.")
thread1 = Thread(target=clock, args=(2,))
thread2 = Thread(target=clock, args=(3,))
if __name__ == '__main__':
start = time.time()
print(f"Time start: {time.strftime('%X')}")
thread1.start()
thread2.start()
print(f"Time end: {time.strftime('%X')}")
print(f'======== Total time: {time.time() - start:0.2f} ========')
Результат:
Time start: 07:39:58
Time end: 07:39:58
======== Total time: 0.00 ========
Current time: 07:40:00, delay 2 sec.
Current time: 07:40:01, delay 3 sec.
Process finished with exit code
Как видно из примера, основной поток завершился мгновенно (0 sec), а дополнительные потоки - через 2 и через 3 секунды от времени запуска скрипта соответственно.
Как видим, в нашем случае дополнительные потоки работают без оглядки на время завершения основного потока и процесса в целом, что совершенно логично: процессор работает быстрее внешних устройств, следовательно перед завершение скрипта необходимо дождаться сигнала о корректном завершении работы внешних устройств, иначе возможна потеря данных, оставшихся в буфере этих устройств. Поэтому, если не приняты особые действия, то основной процесс будет ждать полного завершения всех своих потоков.
В тех же случаях, когда надо завершить работу дополнительного потока принудительно, одновременно с завершением всего процесса, то специальный параметр daemon устанавливается в значение True. Обратите внимание, что сделать это можно двумя способами:
import time
from threading import Thread
def clock(delay):
time.sleep(delay)
print(f"Current time: {time.strftime('%X')}, delay {delay} sec.")
thread1 = Thread(target=clock, args=(2,))
thread2 = Thread(target=clock, args=(3,), daemon=True)
if __name__ == '__main__':
start = time.time()
print(f"Time start: {time.strftime('%X')}")
thread1.start()
thread2.daemon = True
thread2.start()
print(f"Time end: {time.strftime('%X')}")
print(f'======== Total time: {time.time() - start:0.2f} ========')
Как видим, daemon второго потока был установлен в True дважды. Разумеется, хватило бы одного раза - второй был использован лишь для демонстрации ещё одного способа установки. Результат: второму потоку не хватило одной секунды для своего совершения, поэтому он прервался досрочно.
Time start: 07:54:41
Time end: 07:54:41
======== Total time: 0.00 ========
Current time: 07:54:43, delay 2 sec.
Process finished with exit code 0
(Кстати, проверить, является ли поток демоном, можно с помощью метода isDaemon(). Если является, то метод вернёт значение True.)
Чтобы узнать, когда именно второй поток прекратил свою работу, мы слегка изменим функцию clock и будем печатать информацию о её работе каждую секунду. Для этого тело функции превращаем в тело цикла с задержкой в 1 секунду и печатью результата. Тогда общее значение задержки будет определяться количеством повторений цикла.
def clock(delay: int):
for d in range(1, delay + 1):
time.sleep(1)
print(f"Current time: {time.strftime('%X')}, {d}; delay {delay} sec.")
Результат показывает, что последние данные о работе второго потока заканчиваются задержкой в 2 секунды. Результат работы в последнюю третью секунду он распечатать не успел, поскольку прервался вместе с основным процессом на 2-й секунде:
Time start: 17:20:42
Time end: 17:20:42
======== Total time: 0.00 ========
Current time: 17:20:43, 1; delay 2 sec.
Current time: 17:20:43, 1; delay 3 sec.
Current time: 17:20:44, 2; delay 2 sec.
Current time: 17:20:44, 2; delay 3 sec.
Process finished with exit code 0
Если и у второго потока сделать daemon=True, то общий процесс завершится мгновенно и оба потока не оставят абсолютно никаких следов своей активности:
Time start: 17:29:27
Time end: 17:29:27
======== Total time: 0.00 ========
Process finished with exit code 0
Таким образом, можно сделать вывод: общая длительность процесса определяется длительностью “самого долгого” потока, при условии, что он не daemon. В противном случае - длительностью основного потока.
Очень часто результаты вспомогательных потоков необходимо использовать в основном потоке. И тогда на помощь приходим метод join(), который приостанавливает работу основного потока до тех пор, пока не завершится поток присоединённый:
if __name__ == '__main__':
start = time.time()
print(f"Time start: {time.strftime('%X')}")
thread1.start()
thread1.join()
thread2.start()
print(f"Time end: {time.strftime('%X')}")
print(f'======== Total time: {time.time() - start:0.2f} ========')
Всё, что мы изменили в предыдущем скрипте - это присоединили первый поток к основному сразу же после его запуска. Что привело к следующему результату:
Time start: 17:53:40
Current time: 17:53:41, 1; delay 2 sec.
Current time: 17:53:42, 2; delay 2 sec.
Time end: 17:53:42
======== Total time: 2.00 ========
Process finished with exit code 0
Показательным здесь является то, что длительность основного потока увеличилась до 2-х секунд - ровно на время выполнения первого потока. Только дождавшись завершения первого поток, основной поток запустил второй, и тут же завершился. Остановив при этом весь процесс. Что привело к завершению и второго потока, которому так и не удалось поработать.
Но если мы сначала запустим оба потока, а только потом присоединим первый поток к основному потоку,
thread1.start()
thread2.start()
thread1.join()
то картина изменится:
Time start: 18:05:02
Current time: 18:05:03, 1; delay 2 sec.
Current time: 18:05:03, 1; delay 3 sec.
Current time: 18:05:04, 2; delay 2 sec.
Current time: 18:05:04, 2; delay 3 sec.
Time end: 18:05:04
======== Total time: 2.00 ========
Process finished with exit code 0
Важный вывод: сначала запуск всех потоков, а затем присоединение одного из потоков к основному потоку позволяет завершить все запущенные потоки. Потоки, время выполнения которых меньше или равно времени выполнения объединенного потока, будут иметь достаточно времени для завершения. Все остальные потоки будут работать только во время работы объединенного потока (в нашем случае это thread2).
Следовательно, если мы присоединим к основному потоку самый продолжительный поток, это позволит завершиться всем потокам. Общее время процесса в нашем случае будет определяться временем выполнения этого потока (в нашем случае 3 секунды):
Time start: 18:14:17
Current time: 18:14:18, 1; delay 2 sec.
Current time: 18:14:18, 1; delay 3 sec.
Current time: 18:14:19, 2; delay 2 sec.
Current time: 18:14:19, 2; delay 3 sec.
Current time: 18:14:20, 3; delay 3 sec.
Time end: 18:14:20
======== Total time: 3.00 ========
Process finished with exit code 0
А теперь небольшое задание - попробуйте самостоятельно ответить на вопрос: Что снанет с общем временем выполнения в этом
thread1.start()
thread2.start()
thread1.join()
thread2.join()
, в этом:
thread1.start()
thread2.start()
thread2.join()
thread1.join()
, и в этом случае?
thread1.start()
thread1.join()
thread2.start()
thread2.join()
И главное, попытайтесь объяснить, почему общее время работы изменилось именно так, а не иначе.
Пример кода с двумя функциями, который мы рассмотрели в самом начале, преследовал цель добиться максимальной асинхронности. Для чего мы отправили потенциально блокирующие функции в собственные потоки, откуда они уже больше никак не смогут помешать основному потоку. В этом, конечно, огромный плюс. Но есть и маленький минус - декоратор подсчёта времени работы остался в основном потоке (в функции main(), и сейчас мы не знаем, на сколько быстрее (или медленнее) стал работать код после добавления потоков.
Давайте, присоединим дополнительные потоки к основному и посмотрим, как изменилось время работы:
import time
from threading import Thread
from my_deco import time_counter
N = 5
DELAY = 0.5
def func1(n):
for i in range(n):
time.sleep(DELAY)
print(f'--- line #{i} from {n} is completed')
def func2(n):
for i in range(n):
time.sleep(DELAY)
print(f'=== line #{i} from {n} is completed')
@time_counter
def main():
thread1 = Thread(target=func1, args=(N,))
thread2 = Thread(target=func2, args=(N,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(f'All functions completed')
if __name__ == '__main__':
main()
======== Script started ========
--- line #0 from 5 is completed
=== line #0 from 5 is completed
--- line #1 from 5 is completed
=== line #1 from 5 is completed
. . . . . . . . .
All functions completed
======== Script execution time: 2.51 ========
Итого: 2,5 секунды - в 2 раза быстрее, чем обычный синхронный код. Что вполне объяснимо: теперь обе функции запускаются одновременно, поэтому время работы всего скрипта теперь равно времени каждой из них.
Есть ли способ ещё уменьшить это время? Вероятно - да, только для начала, хорошо было бы понять, в чём причина задержки.
А причина в том, что благодаря циклу внутри функции, каждая последующая итерация ника не может запуститься раньше предыдущей. То есть каждый раз, для запуска следующей итерации приходится ждать DELAY секунд задержки предыдущей.
Решение очевидно: в функции надо оставить только одну итерацию, а цикл перенести в функцию main(). Тогда можно создать число потоков, равное числу итераций двух функций. А из предыдущих примеров мы уже знаем, что если все эти потоки запустить одновременно, то из общее время выполнения будет равно времени самого долгого потока.
Ну, хорошо, сказано - сделано:
import time
from threading import Thread
from my_deco import time_counter
N = 5
DELAY = 0.5
def func1(i, n):
time.sleep(DELAY)
print(f'--- line #{i} from {n} is completed')
def func2(i, n):
time.sleep(DELAY)
print(f'=== line #{i} from {n} is completed')
@time_counter
def main():
threads = []
threads1 = [Thread(target=func1, args=(i, N)) for i in range(N)]
threads2 = [Thread(target=func2, args=(i, N)) for i in range(N)]
threads.extend(threads1)
threads.extend(threads2)
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(f'All functions completed')
if __name__ == '__main__':
main()
И время работы скрипта теперь сократилось до времени работы самого долгого потока:
======== Script started ========
--- line #0 from 5 is completed
=== line #0 from 5 is completed
--- line #1 from 5 is completed
=== line #1 from 5 is completed
. . . . . . . . .
All functions completed
======== Script execution time: 0.51 ========
В предыдущих примерах для создания и управления потоками были использованы функции. Разумеется, тоже самое можно сделать и с помощью классов.
В этом случае:
В следующем примере создаётся два daemon-потока. Благодаря тому, что самый долгий из них присоединяется к основному потоку, успевают закончится оба потока:
import time
from threading import Thread
class ClockThread(Thread):
def __init__(self, delay):
super().__init__()
self.delay = delay
self.daemon = True
def run(self):
time.sleep(self.delay)
print(f"Current time: {time.strftime('%X')}, delay {self.delay} sec.")
thread_1 = ClockThread(2)
thread_2 = ClockThread(3)
if __name__ == '__main__':
start = time.time()
print(f"Time start: {time.strftime('%X')}")
thread_1.start()
thread_2.start()
thread_2.join()
print(f"Time end: {time.strftime('%X')}")
print(f'======== Total time: {time.time() - start:0.2f} ========')
Скрипт, как и положено, работает 3 секунды:
Time start: 00:32:58
Current time: 00:33:00, delay 2 sec.
Current time: 00:33:01, delay 3 sec.
Time end: 00:33:01
======== Total time: 3.00 ========
Process finished with exit code 0
Как видим, никаких принципиальных изменений описание потоков с помощью классов не вызвало. По-прежнему, управление объекта потока осуществляется с помощью методов start() и join(), а параметр (атрибут) daemon можно сразу указать в __init__(), либо затем определить (переопределить) в экземпляре пользовательского класса.
Более подробно со всеми деталями этой темы вы сможете познакомиться из этого видео:
Принцип работы всех трёх способов получения асинхронного кода удобнее всего будет понять с помощью следующей схемы:
Все три перечисленных способа (потоки, мультипроцессы и пакет asyncio) по сути делают одно и тоже: позволяют запустить выполнение основной программы в параллельном режиме (зелёная и синяя линии) . Иными словами, некоторые (чаще всего “проблемные”) участки кода начинают выполняться как бы одновременно и независимо друг от друга. И тогда, если одна, или даже несколько ветвей этого параллельно процесса выполняются очень долго или вообще останавливаются, то на основной программе это никак не скажется — основная программа всё равно продолжит работать в обычном режиме.
Обратите внимание, что конструкция “как бы” использована здесь совсем не случайно - не всегда параллельные вычисления являются таковыми на самом деле. Просто иногда переключение между различными ветвями процесса происходит так быстро, что стороннему наблюдателю они кажутся параллельными. Примерно также, как 24 статических кадра, показанные в течении одной секунды, создают эффект непрерывного движения на киноэкране.
Собственно, в этом и заключается принципиальное различие между пакетом multiprocessing, где вычисления и в самом деле идут параллельно и двумя другими пакетами (threading и asyncio), где эффект параллелизма достигается очень быстрым переключением между несколькими “независимыми” частями программы в рамках одного процесса.
Технология реального параллельного вычисления получила название parallelism (параллелизм), а технология имитации параллельных вычислений посредством быстрых переключений — concurrency (конкаренси). Последний термин на русский язык тоже переводится как “параллелизм”, что согласитесь, крайне неудобно при одновременном употреблении с настоящим параллелизмом. (Особенно, если оба термина используются в рамках одного предложения!). Поэтому самым логичным в данной ситуации будет сохранение нативного термина — конкаренси.
Также, иногда можно встретить и другой перевод этого термина - “конкурентность”, что выглядит более чем спорным как с логической, так и с лингвистической точек зрения:
Параллельные вычисления, с точки зрения использования ресурсов компьютера, — удовольствие не из дешёвых, поскольку в этом случае задействуются несколько (или даже все!) ядра процессора, дополнительная оперативная память и т.д. Поэтому, такое решение оправдано только в случае выполнения задач сложных вычислений , где без постоянной и непрерывной работы процессора никак не обойтись (CPU-bound tasks).
На практике же, чаще всего приходится иметь дело с процессами относительно медленными: запросы к базе данных, сетевые взаимодействия и т.д. Это, так называемые, задачи, связанные с вводом-выводом (IO-bound tasks), в которых процессор, отправляя запрос к относительно медленному стороннему ресурсу, вынужден простаивать в ожидании ответа.
В этом случае совершенно логично будет использовать время ожидания процессора для чего-то более полезного. С этой целью используются две другие технологии (пакеты threading и asyncio), в которых время ожидания пробуждения “заснувшей задачи” используется на выполнения других задач.
Следует обратить особое внимание, что ресурсы машины в этом случае используются достаточно экономно — мы больше не создаём новые процессы, а рационально используем ресурсы в рамках одного процесса,
И здесь стоит отметить принципиальное технологическое отличие пакетов threading и asyncio.
В случае потоков (пакет threading) интерпретатор Python обращается за дополнительной помощью к операционной системе (ОС). Создавая новый поток он как бы говорит ей: “Теперь мне нужно, чтобы задача основного потока выполнялась бы одновременно с задачей нового потока до тех пор, пока этот новый поток не завершится”. И в этом случае ОС через строго равные промежутки времени попеременном переключается то на одну, то на другую задачу. Переключение длится доли секунды, поэтому для стороннего наблюдателя обе задачи выполняются как бы параллельно и одновременно.
Плюс этого метода очевиден: сам Python-код для каждого потока остаётся совершенно неизменным (здесь имеется в виду функция, которая передаётся в поток с параметром target). Сам поток — это всего лишь экземпляр класса Thread, а его управление осуществляется с помощью двух методов start() и join() (с точки зрения синтаксиса языка тоже ничего принципиально нового!).
Минусы не сразу бросаются в глаза, но они всё же есть:
Все вышеупомянутые недостатки отсутствуют в пакете asyncio. Здесь используется только один поток в рамках, разумеется, только одного процесса. Всё бы хорошо, если бы не один существенный недостаток: применение этого метода требует своего отдельного и принципиально нового кода, существенно отличающегося от привычного нам синтаксиса языка Python.
Впрочем, судите сами - вот, например, как будет выглядеть решение предыдущей задачи с помощью пакета asyncio:
import time
import asyncio
from my_deco import async_time_counter
N = 5
DELAY = 0.5
async def func1(n):
for i in range(n):
await asyncio.sleep(DELAY)
print(f'--- line #{i} from {n} is completed')
async def func2(n):
for i in range(n):
await asyncio.sleep(DELAY)
print(f'=== line #{i} from {n} is completed')
@async_time_counter
async def main():
print(f'All functions completed')
async def run():
task0 = asyncio.create_task(main())
task1 = asyncio.create_task(func1(N))
task2 = asyncio.create_task(func2(N))
await task0
await task1
await task2
if __name__ == '__main__':
asyncio.run(run())
Результат выполнения этого скрипта будет точно таким же, как и в предыдущем примере с потоками: управление на основную программа main() передаётся сразу же. И так как код в main() содержит только один принт, то эта функция завершается почти мгновенно, а результат от действия двух других функций виден уже после завершения основной программы:
======== Script started ========
All functions completed
======== Script execution time: 0.00 ========
--- line #0 from 5 is completed
=== line #0 from 5 is completed
--- line #1 from 5 is completed
=== line #1 from 5 is completed
. . . . . . . . . . . . . . . .
Process finished with exit code 0
Вероятно, для новичков, только что прошедших основы Python может возникнуть резонный вопрос: “А на каком языке написан этот код”? Собственно, в этом вопросе нет ничего удивительного, поскольку:
Получается, что и у этого способа тоже существуют свои минусы, и весьма значительные.
Таким образом, краткий обзор 3-х способов (технологий) создания асинхронного кода показал, что ни один из рассмотренных вариантов не имеет универсальных преимуществ перед двумя другими: у каждого есть как свои достоинства, так и недостатки. А значит все три способа имеют перспективы дальнейшего развития, совершенствования, и, конечно же, практического использования.
Поэтому ответ на вопрос “Какой вариант выбрать?” будет на удивление банальным: тот, что максимально подходит под конкретные требования вашей текущей задачи. Разумеется, при этом крайне важно одинаково хорошо владеть каждой из перечисленных технологий, чтобы в нужный момент остановить свой выбор именно на наиболее оптимальной, а не на той, которая лучше всех известна.
Вероятно, сейчас будет уместным вопрос: “А есть ли другие способы сделать код асинхронным, кроме тех трёх, что анонсированы в названии курса?”
Ответ - разумеется, да.
Пакет subprocess, позволяет создавать дополнительные процессы, в которых можно запускать различные программы, в том числе и Python-код.
Пакет concurrent.futures предоставляет удобный интерфейс асинхронных задач и параллельных вычислений. Он скрывая детали создания и управления потоками или процессами, поэтому может быть более предпочтителен в простых сценариях, где требуется простота использования и нет необходимости в прямом контроле над потоками или процессами. Однако, для более сложных сценариев или более низкоуровневого контроля, модули threading и multiprocessing могут предоставить большую гибкость.
Помимо пакетов, входящих в стандартную библиотеку Python, есть и другие, достаточно известные пакеты, которые туда не входят. Например, пакеты curio и trio для работы с корутинами.
Рассмотренные выше примеры можно отнести к разряду универсальных пакетов, способных практически любой синхронных код сделать асинхронном. Помимо этого, имеются также узкоспециализированные пакеты, позволяющие добиться асинхронности для вполне определённых программ и приложений. Например, пакет select используется для организации асинхронной работы сокетов (пакет socket).
Кроме того, в том же пакете socket существуют отдельные “асинхронные” методы и функции, входящие в состав обычных “синхронных” пакетов.
Разумеется, наш курс посвящён трём базовым “столпам” асинхронности, указанным в названии. Они - основа и база этого способа программирования. Однако, тема асинхронности в Python не будет до конца раскрыта без, пусть даже краткого, обзора некоторых дополнительных пакетов и методов, упомянутых выше. Чему и будет посвящён последний, пятый, урок этого курса.
Итак, переходим к подробному изучению трёх основных пакетов, перечисленных в названии курса. И начнём это изучение с потоков (пакет threading).
Более подробно со всеми деталями этой темы вы сможете познакомиться из этого видео:
Если попробовать перевести изначальное (английское) название нашего курса "Threading, Multiprocessing and Asyncio in Python" с помощью Google-переводчика, то мы получим: “потоки, многопроцессорность и асинхронность в Python”.
Увы, Google всего лишь подтверждает достаточно распространённое заблуждение: асинхронность, дескать, - это исключительно пакет asyncio, а всё остальное - вроде как не имеет к асинхронности никакого отношения.
На самом деле это не так. Пакет asyncio - всего лишь частный, но далеко не единственный способ добиться асинхронности кода. Поскольку и потоки, и мультипроцессинг делают ровно тоже самое - дают нам возможность превратить синхронный код в асинхронный.
Давайте прежде всего попробуем разобраться, в двух моментах:
Синхронный код — это код, в котором все инструкции выполняются строго последовательно, строка за строкой, и где переход к последующей строке кода возможен только в том случае, если полностью выполнена предыдущая.
Вот как раз это требование - “не выполнять следующую инструкцию, пока не завершена предыдущая”, и является главной проблемой синхронного кода. Поскольку, для программ, которые в процессе выполнения взаимодействуют с внешним миром (с другими программами), вполне типична ситуация, когда исполнении инструкции может внезапно потребовать значительно больше времени, чем обычно. Последствия вполне очевидны: программа начинает работать медленно, либо вообще “зависает”.
Чтобы этого не происходило, программный код должен работать “с оглядкой” на то, что происходит вокруг. И если следующая инструкция может затормозить выполнение основной программы, то её необходимо “запараллелить” с выполнением других, более быстрых инструкций, либо вообще отложить выполнение “долгой” инструкции до лучших времён.
Здесь особо следуют подчеркнуть, что совсем не обязательно встраивать в асинхронный код сложные алгоритмы оценки времени выполнения последующих инструкций. В подавляющем большинстве случаев достаточно всего лишь запараллелить выполнение проблемных участков, вывести их в фоновый режим, чтобы они не препятствовали быстрому выполнению основной программы.
Ну и теперь, когда мы уже (надеюсь!) получили некоторое представление об асинхронности, самое время дать строго научное определение этому явлению. Благо, в интернете их великое множество - одно непонятнее другого 😉. Лично мне понравилось определение асинхронности на сайте разработчиков Mozilla.org:
“Asynchronous programming is a technique that enables your program to start a potentially long-running task and still be able to be responsive to other events while that task runs, rather than having to wait until that task has finished.”
(Асинхронное программирование — это техника, которая позволяет вашей программе запустить потенциально длительную задачу и по-прежнему иметь возможность реагировать на другие события во время выполнения этой задачи, вместо того, чтобы ждать, пока эта задача завершится.)
https://developer.mozilla.org/en-US/docs/Learn/JavaScript/Asynchronous/Introducing
Таким образом, асинхронность — это то, что не даёт “зависнуть” вашей программе даже в том случае, если она добралась до блокирующих (или “очень долгих”) участков кода, поскольку эти участки кода будут выполняться параллельно (или почти параллельно) основной программе.
Собственно этим и объясняется невероятная популярность пакетов, способных превратить синхронный код в асинхронный.
Ну, и здесь, наверное, самое время разобрать какой-нибудь пример, позволяющий ещё лучше понять и закрепить всё выше сказанное.
Предположим, у нас есть две функции (func1 и func2), которые необходимо выполнить последовательно:
import time
N = 5
DELAY = 0.5
def func1(n):
for i in range(n):
time.sleep(DELAY)
print(f'--- line #{i} from {n} is completed')
def func2(n):
for i in range(n):
time.sleep(DELAY)
print(f'=== line #{i} from {n} is completed')
def main():
func1(N)
func2(N)
if __name__ == '__main__':
start = time.time()
main()
print(f'Total time: {time.time() - start}')
Главная управляющая функция здесь и дальше будет main(), а функции func1() и func2() вызываются в ней последовательно. Дополнительно, будет рассчитываться и общее время выполнения главной функции main(), очевидно равное времени выполнения всего скрипта.
В этом классическом примере синхронного кода видно, что управление второй функции func2() будет передано только после завершения первой функции func1(). Хорошо ещё, что в нашем примере значения числа повторений (N = 5) и времени задержки (DELAY = 0.5 секунды) выбраны относительно небольшими, поэтому программа успевает завершается за вполне короткое время, равное 5 секундам. Но что будет, если эти параметры будут с несколькими нулями в конце? Тогда начала выполнения функции func2 можно и не дождаться. Не говоря уже о появлении финального сообщения о завершении всех функции.
Похоже, что без асинхронного решения этой проблемы, когда-нибудь, в один далеко не прекрасный момент, мы можем оказать в очень непростой ситуации. Поэтому попробуем применить одну из 3-х техник, заявленных в названии курса, например, потоки.
Более, чем подробное объяснение работы этого и следующего примеров будут чуть дальше. Сейчас же мы просто, что называется, насладимся красотой и лёгкостью преобразований синхронного кода в асинхронный.
Но прежде всего давайте добавим в наш код одну полезную деталь, а именно: декоратор, который подсчитывает время работы нашей программы. Поскольку, все наши дальнейшие задачи так или иначе будут оцениваться с точки зрения времени выполнения кода. Поэтому, если смысл оптимизировать этот подсчёт с самого начала. Тем более, что для этого уж точно не надо знать методики асинхронного программирования. Достаточно будет наших обычных “синхронных” знаний.
Из основ языка Python многие из вас наверняка помнят, что повторяющийся код в теле функции логичнее всего вынести отдельно в качестве декоратора. Тем же, кто это пока ещё не знает, или же успел забыть, рекомендуются к просмотру эти два видео, в которых рассматриваются все 4 варианта создания декораторов:
Итак, код после точки входа переходит в декоратор, который размещается в новом модуле рабочего каталога my_deco.py:
def time_counter(func):
@functools.wraps(func)
def wrap():
start = time.time()
print("======== Script started ========")
func()
print(f"Time end: {time.strftime('%X')}")
print(f'======== Script execution time: {time.time() - start:0.2f} ========')
return wrap
А сам предыдущий скрипт дополняется импортом нового декоратора, и его добавлением к функции main():
import time
from my_deco import time_counter
N = 5
DELAY = 0.5
def func1(n):
for i in range(n):
time.sleep(DELAY)
print(f'--- line #{i} from {n} is completed')
def func2(n):
for i in range(n):
time.sleep(DELAY)
print(f'=== line #{i} from {n} is completed')
@time_counter
def main():
func1(N)
func2(N)
print(f'All functions completed')
if __name__ == '__main__':
main()
Ну вот, теперь можно добавлять потоки. Для этого прежде всего надо их импортировать:
from threading import Thread
И слегка изменить функцию main():
@time_counter
def main():
thread1 = Thread(target=func1, args=(N,))
thread2 = Thread(target=func2, args=(N,))
thread1.start()
thread2.start()
print(f'All functions completed')
Как видим, преобразования кода самые минимальные. Но зато какой результат!
======== Script started ========
All functions completed
======== Script execution time: 0.01 ========
--- line #0 from 5 is completed
=== line #0 from 5 is completed
--- line #1 from 5 is completed
=== line #1 from 5 is completed
. . . . . . . . . . . . . . . .
Process finished with exit code 0
Пожалуйста, обратите внимание, что строчка о завершении работы программы (======== Script execution time: 0.01 ========), ровно как и сообщение о завершении работы всех функций (All functions completed) идут раньше, чем информация, выдаваемая самими функциями. Это подтверждает тот факт, что потенциально блокирующие наш код функции func1() и func2() таковыми больше не являются - потоки позволяют легко их “перескочить” и передать управление коду, который находится дальше . А из этого следует, наш синхронный код превратился в асинхронный. И время его выполнения сократилось с 5 секунд (или даже с бесконечности!) до 0,01 секунды.
И в заключении рассмотрения этого примера давайте зафиксируем несколько наблюдений, которые очень пригодятся нам дальше при изучении потоков:
Таким образом, один из методов решения проблемы синхронности кода найден - это потоки. Очевидно, что такой замечательных инструмент требует дальнейшего и более глубокого изучения, чем мы и займёмся чуть дальше.
Но, как следуют из названия курса, имеется ещё, как минимум, два механизма (способа, метода) добиться асинхронности. Так есть ли смысл отвлекаться на изучение чего-то другого, если потоки и так позволили добиться нам такого впечатляющего результата?
Попробуем найти ответ на этот вопрос в следующей теме (статье).
Более подробно со всеми деталями этой темы вы сможете познакомиться из этого видео: