Примитивы синхронизации модуля asyncio
очень похожи на примитивы синхронизации модуля threading
с двумя важными оговорками:
asyncio
не являются потокобезопасными, поэтому их не следует использовать для синхронизации потоков ОС;asyncio.wait_for()
.asyncio
:asyncio.Lock
- монопольный доступ к общему ресурсу,asyncio.Event
- доступ к ресурсу по событию,asyncio.Condition
- сочетает в себе Lock
и Event
,asyncio.Semaphore
- управляет внутренним счетчиком,asyncio.BoundedSemaphore
- ограниченный объект Semaphore
,asyncio.Barrier
- ждет завершения ожидания определенного количество задач.Изменено в Python 3.9: было удалено получение блокировки с помощью await lock
или with (yield from lock)
. Вместо этого используйте async lock
.
asyncio.Lock(*, loop=None)
:Класс asyncio.Lock()
реализует блокировку взаимное исполнения критических участков кода для задач asyncio
. Не потокобезопасный.
Блокировка asyncio
может использоваться, чтобы гарантировать монопольный доступ к общему ресурсу.
Изменено в Python 3.10 аргумент loop
удален.
Предпочтительный способ использования asyncio.Lock()
- это асинхронный оператор async with
:
# получаем объект блокировки lock = asyncio.Lock() async with lock: # доступ к общему состоянию # Что эквивалентно: await lock.acquire() try: # доступ к общему состоянию finally: lock.release()
Lock
:Lock.acquire()
:Метод Lock.acquire()
получает блокировку. Для других - устанавливает блокировку в состояние locked и возвращает True
.
Метод представляет собой сопрограмму.
Если более чем одна сопрограмма блокируется методом Lock.acquire
, то продолжать работу будет только одна сопрограмма, которая получила блокировку, в то время как другие сопрограммы будут ждать, пока блокировка не будет снята.
Получение блокировки справедливо: выполняющаяся сопрограмма будет первой сопрограммой, которая начала ожидать блокировки.
Lock.release()
:Метод Lock.release()
сбрасывает полученную блокировку в состояние unlocked и возвращает результат.
Если блокировка уже разблокирована, то возникает исключение RuntimeError
.
Lock.locked()
:Метод Lock.locked()
возвращает True
, если блокировка установлена.
asyncio.Event(*, loop=None)
:Класс asyncio.Event()
представляет собой объект события. Не потокобезопасный.
Объект события asyncio
можно использовать для уведомления нескольких задач asyncio
о том, что произошло какое-то событие.
Объект Event
управляет внутренним флагом, которому можно присвоить значение True
с помощью метода Event.set()
и сбросить значение в False
с помощью метода Event.clear()
.
Метод Event.wait()
блокируется, пока флаг не будет установлен в значение True
. Первоначально флаг установлен в значение False
.
Изменено в Python 3.10 аргумент loop
удален.
async def waiter(event): print('waiting for it ...') await event.wait() print('... got it!') async def main(): # Create an Event object. event = asyncio.Event() # Spawn a Task to wait until 'event' is set. waiter_task = asyncio.create_task(waiter(event)) # Sleep for 1 second and set the event. await asyncio.sleep(1) event.set() # Wait until the waiter task is finished. await waiter_task asyncio.run(main())
Event
:Event.wait()
:Метод Event.wait()
ждет, пока объект события будет установлено в True
. Метод представляет собой сопрограмму.
Если событие установлено, то немедленно возвращает True
. В противном случае блокирует, пока другая задача не вызовет метод Event.set()
.
Event.set()
:Метод Event.set()
устанавливает событие.
Все задачи, ожидающие установки события, будут немедленно разбужены.
Event.clear()
:Метод Event.clear()
очищает/сбрасывает событие.
Задачи, ожидающие методом Event.wait()
будут блокироваться до тех пор, пока метод `Event.set() снова не установит объект события.
Event.is_set()
:Метод Event.is_set()
возвращает True
, если событие установлено.
asyncio.Condition(lock=None, *, loop=None)
:Класс asyncio.Condition()
представляет собой объект какого-то условного события. Не потокобезопасный.
Примитив синхронизации по условию Condition
модуля asyncio
может использоваться задачей для ожидания некоторого события и затем получения монопольного доступа к общему ресурсу.
По сути, объект Condition
сочетает в себе функции события Event
и блокировки Lock
. Можно иметь несколько объектов Condition
, совместно использующих одну Lock
, что позволяет координировать монопольный доступ к общему ресурсу между различными задачами, заинтересованными в определенных состояниях этого общего ресурса.
Необязательный аргумент lock
должен быть объектом asyncio.Lock
или None
. В последнем случае автоматически создается новый объект Lock
.
Изменено в Python 3.10 аргумент loop
удален.
Предпочтительный способ использования asyncio.Condition()
- это асинхронный оператор async with
:
# получаем объект условия cond = asyncio.Condition() async with cond: await cond.wait() # Что эквивалентно: await cond.acquire() try: await cond.wait() finally: cond.release()
Condition
:Condition.acquire()
:Метод Condition.acquire()
получает базовую блокировку. Для других - устанавливает состояние блокировки и возвращает True
.
Представляет собой сопрограмму. Метод будет ждать, пока базовая блокировка не будет снята.
Condition.notify(n=1)
:Метод Condition.notify()
будит не более n
задач (по умолчанию 1), ожидающих этого условия. Метод не работает, если нет ожидающих задач.
Блокировка должна быть получена до вызова этого метода и снимается только после ее получения. Если базовая блокировка не получена, то при вызове этого метода возникает ошибка RuntimeError
.
Condition.locked()
:Метод Condition.is_set()
возвращает True
, если основная блокировка получена.
Condition.notify_all()
:Метод Condition.notify_all()
будит все задачи, ожидающие этого условия.
Этот метод действует как Condition.notify()
, но пробуждает все ожидающие задачи.
Блокировка должна быть получена до вызова этого метода и снимается только после ее получения. Если базовая блокировка не получена, то при вызове этого метода возникает ошибка RuntimeError
.
Condition.release()
:Метод Condition.release()
снимает базовую блокировку.
Если базовая блокировка не получена, то при вызове этого метода возникает ошибка RuntimeError
.
Condition.wait()
:Метод Condition.wait()
будет ждать уведомления. Представляет собой сопрограмму.
Если вызывающая задача не получила блокировку при вызове этого метода, возникает ошибка RuntimeError.
Этот метод снимает базовую блокировку, а затем блокирует, пока не будет разбужен вызовом Condition.notify()
или Condition.notify_all()
. После пробуждения объект Condition
повторно получает свою блокировку, а этот метод возвращает True
.
Condition.wait_for(predicate)
:Метод Condition.wait_for()
будет ждать, пока аргумент predicate
станет истинным. Представляет собой сопрограмму.
Предикат predicate
должен быть вызываемым объектом, результат которого будет интерпретироваться как логическое значение. Конечное значение - это возвращаемое значение.
asyncio.Semaphore(value=1, *, loop=None)
:Класс asyncio.Semaphore()
представляет собой объект семафора. Не потокобезопасный.
Семафор управляет внутренним счетчиком, который уменьшается при каждом вызове метода Semaphore.acquire()
и увеличивается при каждом вызове Semaphore.release()
. Счетчик никогда не может опуститься ниже нуля; когда Semaphore.acquire()
обнаруживает, что он равен нулю, то блокируется, ожидая, пока какая-либо задача не вызовет Semaphore.release()
.
Необязательный аргумент value
(по умолчанию 1) задает начальное значение для внутреннего счетчика. Если заданное значение меньше 0,то возникает ошибка ValueError
.
Изменено в Python 3.10 аргумент loop
удален.
Предпочтительный способ использования asyncio.Semaphore()
- это асинхронный оператор async with
:
# получаем объект семафора sem = asyncio.Semaphore(10) async with sem: # работа с общим ресурсом # что эквивалентно: await sem.acquire() try: # работа с общим ресурсом finally: sem.release()
Semaphore.acquire()
:Метод Semaphore.acquire()
приобретает семафор. Представляет собой сопрограмму.
Если внутренний счетчик больше нуля, то уменьшает его на единицу и немедленно возвращает True
. Если счетчик равен нулю, то ждет вызова Semaphore.release()
и возвращает True
.
Semaphore.locked()
:Метод Semaphore.locked()
возвращает True
, если семафор не может быть получен немедленно.
Semaphore.release()
:Метод Semaphore.release()
освобождает семафор, увеличив внутренний счетчик на единицу. Может разбудить задачу, ожидающую получения семафора.
В отличие от BoundedSemaphore
, объект Semaphore
позволяет делать больше вызовов Semaphore.release()
, чем Semaphore.acquire()
.
asyncio.BoundedSemaphore(value=1, *, loop=None)
:Класс asyncio.BoundedSemaphore()
представляет собой ограниченный объект семафора, рассмотренного выше. Не потокобезопасный.
Класс asyncio.BoundedSemaphore()
- это версия asyncio.Semaphore
, которая вызывает исключение ValueError
в при вызове метода Semaphore.release()
, если увеличивает внутренний счетчик выше начального значения value
.
Изменено в Python 3.10 аргумент loop
удален.
asyncio.Barrier(parties)
:Класс asyncio.Barrier()
(добавлен в Python 3.11) представляет собой объект барьера. Непотокобезопасен.
Объект Barrier
- это простой примитив синхронизации, который позволяет блокировать ход выполнения кода до тех пор, пока на нем не будет ожидаться определенное количество задач parties
. Задачи могут ожидать с помощью метода Barrier.wait()
и будут заблокированы до тех пор, пока указанное количество задач не завершит ожидание с помощью Barrier.wait()
. В этот момент все ожидающие задачи будут разблокированы одновременно.
Можно использовать async with
как альтернативу ожиданию при вызове Barrier.wait()
.
Класс asyncio.Barrier
можно использовать повторно любое количество раз.
Новое в Python 3.11.
Пример:
async def example_barrier(): # создаем барьер с 3 задачами b = asyncio.Barrier(3) # 2 новые ожидающие задачи asyncio.create_task(b.wait()) asyncio.create_task(b.wait()) await asyncio.sleep(0) print(b) # Третий вызов .wait() проходит барьер await b.wait() print(b) print("=> барьер пройден") await asyncio.sleep(0) print(b) asyncio.run(example_barrier())
Результат этого примера:
<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
=> барьер пройден
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>
Barrier
.Barrier.wait()
:Метод Barrier.wait()
представляет собой сопрограмму. Проходит барьер, когда все задачи, участвующие в барьере, вызвали эту функцию, все они разблокируются одновременно.
Когда ожидающая или заблокированная задача в барьере отменяется, то эта задача выходит из барьера, который остается в том же состоянии. Если состояние барьера - "заполнение", то количество ожидающих задач уменьшается на 1.
Возвращаемое значение - целое число в диапазоне от 0 до party-1
, разное для каждой задачи. Такое поведение можно использовать для выбора задачи для выполнения какой-либо специальной уборки, например:
... async with barrier as position: if position == 0: # Только одна задача печатает это print('Конец "фазы слива"')
Этот метод может вызвать исключение asyncio.BrokenBarrierError
, если барьер сломан или сброшен во время ожидания задачи. Метод также может вызвать asyncio.CancelledError
, если задача будет отменена.
Barrier.reset()
:Метод Barrier.reset()
представляет собой сопрограмму. Возвращает барьер в пустое состояние по умолчанию. Любые ожидающие его задачи получат исключение asyncio.BrokenBarrierError
.
Если барьер сломан, возможно, лучше просто оставить его и создать новый.
Barrier.abort()
:Метод Barrier.abort()
представляет собой сопрограмму. Этот метод приводит барьер в нерабочее состояние. Что приводит к сбою любых активных или будущих вызовов Barrier.wait()
с ошибкой BrokenBarrierError
.
Используйте Barrier.abort()
, например, если одну из задач необходимо прервать, чтобы избежать бесконечно ожидающих задач.
Barrier.parties
:Свойство Barrier.parties
возвращает количество задач, необходимых для прохождения барьера.
Barrier.n_waiting
:Свойство Barrier.parties
возвращает количество задач, ожидающих в данный момент в барьере при заполнении.
Barrier.broken
:Свойство Barrier.parties
возвращает логическое значение, которое имеет значение True
, если барьер находится в сломанном состоянии.
asyncio.BrokenBarrierError
:Исключение asyncio.BrokenBarrierError
(добавлено в Python 3.11) - это подкласс RuntimeError
, возникает, когда объект Barrier
сбрасывается или ломается.