import threading cond_var = threading.Condition(lock=None)
lock=None
- объект блокировки threading.Lock()
или threading.RLock()
, который будет использоваться условием.Класс Condition()
модуля threading
реализует объекты условий, которые позволяют одному или нескольким потокам ждать, пока они не будут уведомлены другим потоком о его выполнении. Типичный пример использования условий это использование блокировки для синхронизации доступа потоков к общему состоянию.
Если задан аргумент lock
не равный None
, то это должен быть объект threading.Lock()
или threading.RLock()
, который будет использоваться в качестве базовой блокировки. В противном случае в качестве базовой блокировки будет создан новый объект threading.RLock
.
Условие Condition
всегда связано с какой-то блокировкой, оно может быть передано или создано по умолчанию. Передача одной из них полезна, когда несколько условий должны совместно использовать одну и ту же блокировку. Блокировка является частью объекта Condition
и не нужно отслеживать его отдельно.
Переменная условия подчиняется [протоколу управления контекстом]: оператор with
получает соответствующую блокировку на время действия своего блока кода. Методы Condition.acquire()
и Condition.release()
также вызывают соответствующие методы связанной блокировки.
Другие методы объекта Condition()
должны вызываться с удерживаемой связанной блокировкой. Метод .wait()
снимает блокировку, а затем блокирует выполнение кода, до тех пор, пока другой поток не разбудит его, вызвав метод .notify()
или .notify_all()
. После пробуждения .wait()
повторно получает блокировку и возвращает значение, так же этот метод поддерживает аргумент тайм-аута.
Метод .notify()
пробуждает один из потоков, ожидающих переменной условия, если таковые ожидают. Метод .notify_all()
пробуждает все потоки, ожидающие переменной условия.
Примечание: методы .notify()
и .notify_all()
не снимают блокировку. Это означает, что пробужденный поток или потоки не вернутся из своего вызова .wait()
сразу, а только тогда, когда поток, который вызвал .notify()
или .notify_all()
окончательно снимет/откажется от блокировки.
Типичный стиль программирования с использованием переменных условий использует блокировку для синхронизации доступа к некоторому общему состоянию. Потоки, которые заинтересованы в конкретном изменении этого состояния, повторно вызывают метод .wait()
, пока не увидят желаемое состояние, в то время как потоки, которые изменяют состояние, вызывают методы .notify()
или .notify_all()
, когда изменят состояние таким образом, чтобы оно могло быть приемлемо для одного из ждущих потоков. Например, следующий код представляет собой общую ситуацию производитель/потребитель с неограниченной емкостью буфера:
import threading cond_var = threading.Condition(lock=None) # Потребитель порции данных with cond_var: while not an_item_is_available(): cond_var.wait() get_an_available_item() # Производитель порции данных with cond_var: make_an_item_available() cond_var.notify()
Проверка цикла while
на условие необходима приложению, потому что метод .wait()
может возвращать результат через произвольно долгое время, а условие, вызвавшее метод .notify()
, может больше не выполнится. Это присуще многопоточному программированию. Метод .wait_for()
может использоваться для автоматизации проверки условий и упрощает вычисление тайм-аутов:
import threading cond_var = threading.Condition(lock=None) # Потребитель порции данных with cond_var: cond_var.wait_for(an_item_is_available) get_an_available_item()
пробужденЧтобы выбрать между использованием методов .notify()
и .notify_all()
, необходимо понять, может ли одно изменение состояния быть интересным только для одного или нескольких ожидающих потоков. Например, в типичной ситуации производитель/потребитель, добавление одного элемента в буфер обмена требует пробуждения только одного потока-потребителя.
threading.Condition
.Condition.acquire()
устанавливает базовую блокировку,Condition.release()
снимает базовую блокировку,Condition.wait()
ждет, пока не появится уведомление,Condition.wait_for()
ждет, пока внутреннее условие не станет истинным,Condition.notify()
пробуждает один поток, ожидающий условия,Condition.notify_all()
пробуждает все потоки, ожидающие условия,Condition.acquire(*args)
:Метод Condition.acquire()
устанавливает базовую блокировку. Этот метод вызывает соответствующий метод базовой блокировки. Возвращаемое значение - это то, что возвращает вызываемый метод.
Condition.release()
:Метод Condition.release()
снимает базовую блокировку. Этот метод вызывает соответствующий метод базовой блокировки. Возвращаемое значение отсутствует.
Condition.wait(timeout=None)
:Метод Condition.wait()
ждет, пока не появится уведомление или пока не истечет время ожидания timeout
. Если поток вызвавший этот метод ранее не получил блокировку, то возникает ошибка RuntimeError
.
Этот метод освобождает базовую блокировку, а затем блокирует выполнение кода до тех пор, пока он не будет разбужен вызовом Condition.notify()
или Condition.notify_all()
для того же условия в другом потоке или пока не наступит необязательный тайм-аут. После пробуждения или истечения времени ожидания он повторно устанавливает блокировку выполнения кода и возвращает результат.
Когда присутствует аргумент timeout
, не равный None
, то это должно быть число float
, указывающее тайм-аут для операции в секундах или его долях.
Когда, в качестве базовой блокировкой используется threading.RLock()
, блокировка не снимается с помощью его метода RLock.release()
, если она до этого была получена несколько раз рекурсивно. Вместо этого используется внутренний интерфейс класса threading.RLock
, который действительно разблокирует его, даже если он был рекурсивно получен несколько раз. Затем используется другой внутренний интерфейс для восстановления уровня рекурсии при повторной блокировке.
Метод Condition.wait()
возвращает значение True
, и если не истек заданный тайм-аут timeout
, то False
.
Condition.wait_for(predicate, timeout=None)
:Метод Condition.wait_for()
ждет, пока условие predicate
не станет истинным. Передаваемое условие predicate
в качестве аргумента должно быть вызываемым объектом, результат которого будет интерпретироваться как логическое значение. Может быть предусмотрен тайм-аут timeout
, дающий максимальное время ожидания.
Этот служебный метод может повторно вызывать Condition.wait()
, пока не будет выполнено условие predicate
или пока не истечет время ожидания timeout
. Возвращаемое значение является последним возвращаемым значением predicate
и будет оцениваться как False
, если время ожидания метода истекло.
Игнорируя функцию тайм-аута, вызов метода Condition.wait_for()
примерно эквивалентен псевдокоду:
while not predicate(): cond_var.wait()
Следовательно, применяются те же правила, что и к методу Condition.wait()
: блокировка должна удерживаться при вызове и повторно захвачена при возврате результата. Условие predicate
оценивается с удерживаемой блокировкой.
Condition.notify(n=1)
:По умолчанию метод Condition.notify()
пробуждает один поток, ожидающий этого условия, если таковой имеется. Если поток, вызывающий этот метод, ранее не получил блокировку, то возникает ошибка RuntimeError
.
Метод Condition.notify()
пробуждает не более n
потоков, ожидающих переменной условия, это не работает, если нет ожидающих потоков.
Текущая реализация пробуждает ровно n
потоков, если их ожидает как минимум n
потоков. Однако полагаться на такое поведение небезопасно. Будущая оптимизированная реализация может иногда вызывать более n
потоков.
Примечание: пробужденный поток фактически не возвращается к работе из своего вызова Condition.wait()
до тех пор, пока он не сможет повторно получить блокировку. Метод Condition.notify()
не снимает блокировку, это должен сделать поток вызвавший этот метод.
Condition.notify_all()
:Метод Condition.notify_all()
пробуждает все потоки, ожидающие этого условия. Этот метод действует как Condition.notify()
, но пробуждает все ожидающие потоки вместо одного. Если поток, вызвавший этот метод, ранее не получил блокировку, то возникает исключение RuntimeError
.
В примере, потоки используют контекстный менеджер with
для получения блокировки, связанной с условием.
import threading, time, queue def consumer(cond, queue): """ждет определенного состояния для использования ресурсов""" th_name = threading.current_thread().name print(f'Запуск потока потребителя {th_name}') with cond: cond.wait() print(f'Обработка ресурса потребителем {th_name}') time.sleep(0.3) def producer(cond, queue): """подготовка ресурса, для использования потребителями""" th_name = threading.current_thread().name print(f'Запуск потока производителя {th_name}') with cond: print(f'{th_name} готовит ресурс для потребителей') time.sleep(0.5) print(f'{th_name} ресурс ГОТОВ!') cond.notify_all() # установка переменной условия condition = threading.Condition() # создание потоков потребителей c1 = threading.Thread(name='Consumer-1', target=consumer, args=(condition,)) c2 = threading.Thread(name='Consumer-2', target=consumer, args=(condition,)) # создание потока потребителей производителя p = threading.Thread(name='PRODUCER', target=producer, args=(condition,)) c1.start() c2.start() p.start() # Запуск потока потребителя Consumer-1 # Запуск потока потребителя Consumer-2 # Запуск потока производителя PRODUCER # PRODUCER готовит ресурс для потребителей # PRODUCER ресурс ГОТОВ! # Обработка ресурса потребителем Consumer-2 # Обработка ресурса потребителем Consumer-1