Сообщить об ошибке.

Примитивы синхронизации задач в asyncio в Python

Примитивы Lock, Event, Condition и Semaphore в asyncio

Примитивы синхронизации модуля asyncio очень похожи на примитивы синхронизации модуля threading с двумя важными оговорками:

  • примитивы синхронизации asyncio не являются потокобезопасными, поэтому их не следует использовать для синхронизации потоков ОС;
  • методы этих примитивов синхронизации не принимают аргумент таймаута. Для выполнения операций с таймаутами используйте функцию asyncio.wait_for() .

Базовые примитивы синхронизации asyncio:

Изменено в Python 3.9: было удалено получение блокировки с помощью await lock или with (yield from lock). Вместо этого используйте async lock.

asyncio.Lock(*, loop=None):

Класс asyncio.Lock() реализует блокировку взаимное исполнения критических участков кода для задач asyncio. Не потокобезопасный.

Блокировка asyncio может использоваться, чтобы гарантировать монопольный доступ к общему ресурсу.

Изменено в Python 3.10 аргумент loop удален.

Предпочтительный способ использования asyncio.Lock() - это асинхронный оператор async with:

# получаем объект блокировки
lock = asyncio.Lock()

async with lock:
    # доступ к общему состоянию

# Что эквивалентно:
await lock.acquire()
try:
    # доступ к общему состоянию
finally:
    lock.release()

Методы объекта Lock:

Lock.acquire():

Метод Lock.acquire() получает блокировку. Для других - устанавливает блокировку в состояние locked и возвращает True.

Метод представляет собой сопрограмму.

Если более чем одна сопрограмма блокируется методом Lock.acquire, то продолжать работу будет только одна сопрограмма, которая получила блокировку, в то время как другие сопрограммы будут ждать, пока блокировка не будет снята.

Получение блокировки справедливо: выполняющаяся сопрограмма будет первой сопрограммой, которая начала ожидать блокировки.

Lock.release():

Метод Lock.release() сбрасывает полученную блокировку в состояние unlocked и возвращает результат.

Если блокировка уже разблокирована, то возникает исключение RuntimeError.

Lock.locked():

Метод Lock.locked() возвращает True, если блокировка установлена.


asyncio.Event(*, loop=None):

Класс asyncio.Event() представляет собой объект события. Не потокобезопасный.

Объект события asyncio можно использовать для уведомления нескольких задач asyncio о том, что произошло какое-то событие.

Объект Event управляет внутренним флагом, которому можно присвоить значение True с помощью метода Event.set() и сбросить значение в False с помощью метода Event.clear().

Метод Event.wait() блокируется, пока флаг не будет установлен в значение True. Первоначально флаг установлен в значение False.

Изменено в Python 3.10 аргумент loop удален.

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())

Методы объекта Event:

Event.wait():

Метод Event.wait() ждет, пока объект события будет установлено в True. Метод представляет собой сопрограмму.

Если событие установлено, то немедленно возвращает True. В противном случае блокирует, пока другая задача не вызовет метод Event.set().

Event.set():

Метод Event.set() устанавливает событие.

Все задачи, ожидающие установки события, будут немедленно разбужены.

Event.clear():

Метод Event.clear() очищает/сбрасывает событие.

Задачи, ожидающие методом Event.wait() будут блокироваться до тех пор, пока метод `Event.set() снова не установит объект события.

Event.is_set():

Метод Event.is_set() возвращает True, если событие установлено.


asyncio.Condition(lock=None, *, loop=None):

Класс asyncio.Condition() представляет собой объект какого-то условного события. Не потокобезопасный.

Примитив синхронизации по условию Condition модуля asyncio может использоваться задачей для ожидания некоторого события и затем получения монопольного доступа к общему ресурсу.

По сути, объект Condition сочетает в себе функции события Event и блокировки Lock. Можно иметь несколько объектов Condition, совместно использующих одну Lock, что позволяет координировать монопольный доступ к общему ресурсу между различными задачами, заинтересованными в определенных состояниях этого общего ресурса.

Необязательный аргумент lock должен быть объектом asyncio.Lock или None. В последнем случае автоматически создается новый объект Lock.

Изменено в Python 3.10 аргумент loop удален.

Предпочтительный способ использования asyncio.Condition() - это асинхронный оператор async with:

# получаем объект условия 
cond = asyncio.Condition()

async with cond:
    await cond.wait()

# Что эквивалентно:

await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()

Методы объекта Condition:

Condition.acquire():

Метод Condition.acquire() получает базовую блокировку. Для других - устанавливает состояние блокировки и возвращает True.

Представляет собой сопрограмму. Метод будет ждать, пока базовая блокировка не будет снята.

Condition.notify(n=1):

Метод Condition.notify() будит не более n задач (по умолчанию 1), ожидающих этого условия. Метод не работает, если нет ожидающих задач.

Блокировка должна быть получена до вызова этого метода и снимается только после ее получения. Если базовая блокировка не получена, то при вызове этого метода возникает ошибка RuntimeError.

Condition.locked():

Метод Condition.is_set() возвращает True, если основная блокировка получена.

Condition.notify_all():

Метод Condition.notify_all() будит все задачи, ожидающие этого условия.

Этот метод действует как Condition.notify(), но пробуждает все ожидающие задачи.

Блокировка должна быть получена до вызова этого метода и снимается только после ее получения. Если базовая блокировка не получена, то при вызове этого метода возникает ошибка RuntimeError.

Condition.release():

Метод Condition.release() снимает базовую блокировку.

Если базовая блокировка не получена, то при вызове этого метода возникает ошибка RuntimeError.

Condition.wait():

Метод Condition.wait() будет ждать уведомления. Представляет собой сопрограмму.

Если вызывающая задача не получила блокировку при вызове этого метода, возникает ошибка RuntimeError.

Этот метод снимает базовую блокировку, а затем блокирует, пока не будет разбужен вызовом Condition.notify() или Condition.notify_all(). После пробуждения объект Condition повторно получает свою блокировку, а этот метод возвращает True.

Condition.wait_for(predicate):

Метод Condition.wait_for() будет ждать, пока аргумент predicate станет истинным. Представляет собой сопрограмму.

Предикат predicate должен быть вызываемым объектом, результат которого будет интерпретироваться как логическое значение. Конечное значение - это возвращаемое значение.


asyncio.Semaphore(value=1, *, loop=None):

Класс asyncio.Semaphore() представляет собой объект семафора. Не потокобезопасный.

Семафор управляет внутренним счетчиком, который уменьшается при каждом вызове метода Semaphore.acquire() и увеличивается при каждом вызове Semaphore.release(). Счетчик никогда не может опуститься ниже нуля; когда Semaphore.acquire() обнаруживает, что он равен нулю, то блокируется, ожидая, пока какая-либо задача не вызовет Semaphore.release().

Необязательный аргумент value (по умолчанию 1) задает начальное значение для внутреннего счетчика. Если заданное значение меньше 0,то возникает ошибка ValueError.

Изменено в Python 3.10 аргумент loop удален.

Предпочтительный способ использования asyncio.Semaphore() - это асинхронный оператор async with:

# получаем объект семафора
sem = asyncio.Semaphore(10)

async with sem:
    # работа с общим ресурсом

# что эквивалентно:

await sem.acquire()
try:
    # работа с общим ресурсом
finally:
    sem.release()

Semaphore.acquire():

Метод Semaphore.acquire() приобретает семафор. Представляет собой сопрограмму.

Если внутренний счетчик больше нуля, то уменьшает его на единицу и немедленно возвращает True. Если счетчик равен нулю, то ждет вызова Semaphore.release() и возвращает True.

Semaphore.locked():

Метод Semaphore.locked() возвращает True, если семафор не может быть получен немедленно.

Semaphore.release():

Метод Semaphore.release() освобождает семафор, увеличив внутренний счетчик на единицу. Может разбудить задачу, ожидающую получения семафора.

В отличие от BoundedSemaphore, объект Semaphore позволяет делать больше вызовов Semaphore.release(), чем Semaphore.acquire().

asyncio.BoundedSemaphore(value=1, *, loop=None):

Класс asyncio.BoundedSemaphore() представляет собой ограниченный объект семафора, рассмотренного выше. Не потокобезопасный.

Класс asyncio.BoundedSemaphore() - это версия asyncio.Semaphore, которая вызывает исключение ValueError в при вызове метода Semaphore.release(), если увеличивает внутренний счетчик выше начального значения value.

Изменено в Python 3.10 аргумент loop удален.

asyncio.Barrier(parties):

Класс asyncio.Barrier() (добавлен в Python 3.11) представляет собой объект барьера. Непотокобезопасен.

Объект Barrier - это простой примитив синхронизации, который позволяет блокировать ход выполнения кода до тех пор, пока на нем не будет ожидаться определенное количество задач parties. Задачи могут ожидать с помощью метода Barrier.wait() и будут заблокированы до тех пор, пока указанное количество задач не завершит ожидание с помощью Barrier.wait(). В этот момент все ожидающие задачи будут разблокированы одновременно.

Можно использовать async with как альтернативу ожиданию при вызове Barrier.wait().

Класс asyncio.Barrier можно использовать повторно любое количество раз.

Новое в Python 3.11.

Пример:

async def example_barrier():
   # создаем барьер с 3 задачами
   b = asyncio.Barrier(3)

   # 2 новые ожидающие задачи 
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # Третий вызов .wait() проходит барьер
   await b.wait()
   print(b)
   print("=> барьер пройден")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

Результат этого примера:

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
=> барьер пройден
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

Методы объекта Barrier.

Barrier.wait():

Метод Barrier.wait() представляет собой сопрограмму. Проходит барьер, когда все задачи, участвующие в барьере, вызвали эту функцию, все они разблокируются одновременно.

Когда ожидающая или заблокированная задача в барьере отменяется, то эта задача выходит из барьера, который остается в том же состоянии. Если состояние барьера - "заполнение", то количество ожидающих задач уменьшается на 1.

Возвращаемое значение - целое число в диапазоне от 0 до party-1, разное для каждой задачи. Такое поведение можно использовать для выбора задачи для выполнения какой-либо специальной уборки, например:

...
async with barrier as position:
   if position == 0:
      # Только одна задача печатает это
      print('Конец "фазы слива"')

Этот метод может вызвать исключение asyncio.BrokenBarrierError, если барьер сломан или сброшен во время ожидания задачи. Метод также может вызвать asyncio.CancelledError, если задача будет отменена.

Barrier.reset():

Метод Barrier.reset() представляет собой сопрограмму. Возвращает барьер в пустое состояние по умолчанию. Любые ожидающие его задачи получат исключение asyncio.BrokenBarrierError.

Если барьер сломан, возможно, лучше просто оставить его и создать новый.

Barrier.abort():

Метод Barrier.abort() представляет собой сопрограмму. Этот метод приводит барьер в нерабочее состояние. Что приводит к сбою любых активных или будущих вызовов Barrier.wait() с ошибкой BrokenBarrierError.

Используйте Barrier.abort(), например, если одну из задач необходимо прервать, чтобы избежать бесконечно ожидающих задач.

Barrier.parties:

Свойство Barrier.parties возвращает количество задач, необходимых для прохождения барьера.

Barrier.n_waiting:

Свойство Barrier.parties возвращает количество задач, ожидающих в данный момент в барьере при заполнении.

Barrier.broken:

Свойство Barrier.parties возвращает логическое значение, которое имеет значение True, если барьер находится в сломанном состоянии.

asyncio.BrokenBarrierError:

Исключение asyncio.BrokenBarrierError (добавлено в Python 3.11) - это подкласс RuntimeError, возникает, когда объект Barrier сбрасывается или ломается.