Сообщить об ошибке.

Класс Condition() модуля threading в Python

Доступ потоков к общим ресурсам по условию

Синтаксис:

import threading

cond_var = threading.Condition(lock=None)

Параметры:

Возвращаемое значение:

Описание:

Класс Condition() модуля threading реализует объекты условий, которые позволяют одному или нескольким потокам ждать, пока они не будут уведомлены другим потоком о его выполнении. Типичный пример использования условий это использование блокировки для синхронизации доступа потоков к общему состоянию.

Если задан аргумент lock не равный None, то это должен быть объект threading.Lock() или threading.RLock(), который будет использоваться в качестве базовой блокировки. В противном случае в качестве базовой блокировки будет создан новый объект threading.RLock.

Условие Condition всегда связано с какой-то блокировкой, оно может быть передано или создано по умолчанию. Передача одной из них полезна, когда несколько условий должны совместно использовать одну и ту же блокировку. Блокировка является частью объекта Condition и не нужно отслеживать его отдельно.

Переменная условия подчиняется [протоколу управления контекстом]: оператор with получает соответствующую блокировку на время действия своего блока кода. Методы Condition.acquire() и Condition.release() также вызывают соответствующие методы связанной блокировки.

Другие методы объекта Condition() должны вызываться с удерживаемой связанной блокировкой. Метод .wait() снимает блокировку, а затем блокирует выполнение кода, до тех пор, пока другой поток не разбудит его, вызвав метод .notify() или .notify_all(). После пробуждения .wait() повторно получает блокировку и возвращает значение, так же этот метод поддерживает аргумент тайм-аута.

Метод .notify() пробуждает один из потоков, ожидающих переменной условия, если таковые ожидают. Метод .notify_all() пробуждает все потоки, ожидающие переменной условия.

Примечание: методы .notify() и .notify_all() не снимают блокировку. Это означает, что пробужденный поток или потоки не вернутся из своего вызова .wait() сразу, а только тогда, когда поток, который вызвал .notify() или .notify_all() окончательно снимет/откажется от блокировки.

Типичный стиль программирования с использованием переменных условий использует блокировку для синхронизации доступа к некоторому общему состоянию. Потоки, которые заинтересованы в конкретном изменении этого состояния, повторно вызывают метод .wait(), пока не увидят желаемое состояние, в то время как потоки, которые изменяют состояние, вызывают методы .notify() или .notify_all(), когда изменят состояние таким образом, чтобы оно могло быть приемлемо для одного из ждущих потоков. Например, следующий код представляет собой общую ситуацию производитель/потребитель с неограниченной емкостью буфера:

import threading
cond_var = threading.Condition(lock=None)

# Потребитель порции данных
with cond_var:
    while not an_item_is_available():
        cond_var.wait()
    get_an_available_item()

# Производитель порции данных
with cond_var:
    make_an_item_available()
    cond_var.notify()

Проверка цикла while на условие необходима приложению, потому что метод .wait() может возвращать результат через произвольно долгое время, а условие, вызвавшее метод .notify(), может больше не выполнится. Это присуще многопоточному программированию. Метод .wait_for() может использоваться для автоматизации проверки условий и упрощает вычисление тайм-аутов:

import threading
cond_var = threading.Condition(lock=None)

# Потребитель порции данных
with cond_var:
    cond_var.wait_for(an_item_is_available)
    get_an_available_item()

пробужденЧтобы выбрать между использованием методов .notify() и .notify_all(), необходимо понять, может ли одно изменение состояния быть интересным только для одного или нескольких ожидающих потоков. Например, в типичной ситуации производитель/потребитель, добавление одного элемента в буфер обмена требует пробуждения только одного потока-потребителя.

Методы объекта threading.Condition.


Condition.acquire(*args):

Метод Condition.acquire() устанавливает базовую блокировку. Этот метод вызывает соответствующий метод базовой блокировки. Возвращаемое значение - это то, что возвращает вызываемый метод.

Condition.release():

Метод Condition.release() снимает базовую блокировку. Этот метод вызывает соответствующий метод базовой блокировки. Возвращаемое значение отсутствует.

Condition.wait(timeout=None):

Метод Condition.wait() ждет, пока не появится уведомление или пока не истечет время ожидания timeout. Если поток вызвавший этот метод ранее не получил блокировку, то возникает ошибка RuntimeError.

Этот метод освобождает базовую блокировку, а затем блокирует выполнение кода до тех пор, пока он не будет разбужен вызовом Condition.notify() или Condition.notify_all() для того же условия в другом потоке или пока не наступит необязательный тайм-аут. После пробуждения или истечения времени ожидания он повторно устанавливает блокировку выполнения кода и возвращает результат.

Когда присутствует аргумент timeout, не равный None, то это должно быть число float, указывающее тайм-аут для операции в секундах или его долях.

Когда, в качестве базовой блокировкой используется threading.RLock(), блокировка не снимается с помощью его метода RLock.release(), если она до этого была получена несколько раз рекурсивно. Вместо этого используется внутренний интерфейс класса threading.RLock, который действительно разблокирует его, даже если он был рекурсивно получен несколько раз. Затем используется другой внутренний интерфейс для восстановления уровня рекурсии при повторной блокировке.

Метод Condition.wait() возвращает значение True, и если не истек заданный тайм-аут timeout, то False .

Condition.wait_for(predicate, timeout=None):

Метод Condition.wait_for() ждет, пока условие predicate не станет истинным. Передаваемое условие predicate в качестве аргумента должно быть вызываемым объектом, результат которого будет интерпретироваться как логическое значение. Может быть предусмотрен тайм-аут timeout, дающий максимальное время ожидания.

Этот служебный метод может повторно вызывать Condition.wait(), пока не будет выполнено условие predicate или пока не истечет время ожидания timeout. Возвращаемое значение является последним возвращаемым значением predicate и будет оцениваться как False, если время ожидания метода истекло.

Игнорируя функцию тайм-аута, вызов метода Condition.wait_for() примерно эквивалентен псевдокоду:

while not predicate():
    cond_var.wait()

Следовательно, применяются те же правила, что и к методу Condition.wait(): блокировка должна удерживаться при вызове и повторно захвачена при возврате результата. Условие predicate оценивается с удерживаемой блокировкой.

Condition.notify(n=1):

По умолчанию метод Condition.notify() пробуждает один поток, ожидающий этого условия, если таковой имеется. Если поток, вызывающий этот метод, ранее не получил блокировку, то возникает ошибка RuntimeError.

Метод Condition.notify() пробуждает не более n потоков, ожидающих переменной условия, это не работает, если нет ожидающих потоков.

Текущая реализация пробуждает ровно n потоков, если их ожидает как минимум n потоков. Однако полагаться на такое поведение небезопасно. Будущая оптимизированная реализация может иногда вызывать более n потоков.

Примечание: пробужденный поток фактически не возвращается к работе из своего вызова Condition.wait() до тех пор, пока он не сможет повторно получить блокировку. Метод Condition.notify() не снимает блокировку, это должен сделать поток вызвавший этот метод.

Condition.notify_all():

Метод Condition.notify_all() пробуждает все потоки, ожидающие этого условия. Этот метод действует как Condition.notify(), но пробуждает все ожидающие потоки вместо одного. Если поток, вызвавший этот метод, ранее не получил блокировку, то возникает исключение RuntimeError.

Пример получения блокировки, связанной с условием.

В примере, потоки используют контекстный менеджер with для получения блокировки, связанной с условием.

import threading, time, queue

def consumer(cond, queue):
    """ждет определенного состояния для использования ресурсов"""
    th_name = threading.current_thread().name
    print(f'Запуск потока потребителя {th_name}')
    with cond:
        cond.wait()
        print(f'Обработка ресурса потребителем {th_name}')
        time.sleep(0.3)


def producer(cond, queue):
    """подготовка ресурса, для использования потребителями"""
    th_name = threading.current_thread().name
    print(f'Запуск потока производителя {th_name}')
    with cond:
        print(f'{th_name} готовит ресурс для потребителей')
        time.sleep(0.5)
        print(f'{th_name} ресурс ГОТОВ!')
        cond.notify_all()


# установка переменной условия
condition = threading.Condition()

# создание потоков потребителей
c1 = threading.Thread(name='Consumer-1', 
                      target=consumer,
                      args=(condition,))
c2 = threading.Thread(name='Consumer-2', 
                      target=consumer,
                      args=(condition,))
# создание потока потребителей производителя
p = threading.Thread(name='PRODUCER', 
                     target=producer,
                     args=(condition,))
c1.start()
c2.start()
p.start()


# Запуск потока потребителя Consumer-1
# Запуск потока потребителя Consumer-2
# Запуск потока производителя PRODUCER
# PRODUCER готовит ресурс для потребителей
# PRODUCER ресурс ГОТОВ!
# Обработка ресурса потребителем Consumer-2
# Обработка ресурса потребителем Consumer-1