Сообщить об ошибке.

Класс Barrier() модуля threading в Python

Синхронизация потоков при помощи барьеров

Синтаксис:

import threading

barrier = threading.Barrier(parties, action=None, timeout=None)

Параметры:

  • parties - количества потоков,
  • action=None - вызываемый объект,
  • timeout=None - тайм-аута по умолчанию для метода Barrier.wait().

Возвращаемое значение:

Описание:

Класс Barrier() модуля threading создает объект-барьер для количества parties потоков.

Аргумент action, если он предусмотрен, вызывается одним из потоков при их освобождении.

Аргумент timeout - это значение тайм-аута по умолчанию, если он не указан для метода Barrier.wait().

Этот класс предоставляет простой примитив синхронизации для использования фиксированным числом потоков, которым необходимо ждать друг друга. Каждый из потоков пытается преодолеть барьер, вызывая метод Barrier.wait(), и будет блокироваться, пока все потоки не выполнят свои вызовы Barrier.wait(). Как только все потоки выполнили свои вызовы, то в этот момент потоки освобождаются одновременно.

Барьер можно использовать повторно любое количество раз для одного и того же количества потоков.

В качестве общего примера приведем простой способ синхронизации клиентского и серверного потоков:

import threading

barrier = threading.Barrier(2, timeout=5)

def server():
    start_server()
    barrier.wait()
    while True:
        connection = accept_connection()
        process_server_connection(connection)

def client():
    barrier.wait()
    while True:
        connection = make_connection()
        process_client_connection(connection)

Объекты барьеров threading.Barrier также поддерживают протокол управления контекстом.

Атрибуты и методы объекта Barrier.


Barrier.wait(timeout=None):

Метод Barrier.wait() ждет точку прохождения барьера.

Когда все потоки, участвующие в барьере, вызвали эту функцию, все они освобождаются одновременно. Если предоставляется аргумент timeout тайм-аут, то он используется вместо одноименного аргумента конструктора класса.

Возвращаемое значение - это целое число в диапазоне от 0 до количества участвующих сторон parties - 1, различное для каждого потока. Такое поведение можно использовать для выбора потока для выполнения некоторых специальных операций, например:

i = barrier.wait()
if i == 0:
    # Только один поток должен это вывести
    print("барьер пройден")

Если конструктору было предоставлен аргумент action, один из потоков вызовет его перед освобождением. Если этот вызов вызывает ошибку, то барьер переводится в неработающее состояние.

Если время вызова истекло, то барьер переводится в нерабочее состояние.

Метод Barrier.wait() может вызвать исключение threading.BrokenBarrierError, если барьер нерабочий или сброшен во время ожидания потока.

Barrier.reset():

Метод Barrier.reset() возвращает барьер в пустое состояние. Все ожидающие его потоки получат исключение threading.BrokenBarrierError.

Обратите внимание, если есть другие потоки, состояние которых неизвестно, то для использования этой функции может потребоваться некоторая внешняя синхронизация. Если барьер в нерабочем состоянии, то может быть лучше оставить его и создать новый.

Barrier.abort():

Метод Barrier.abort() ставит барьер в нерабочее состояние.

Это приводит к тому, что любые активные или будущие вызовы метода Barrier.wait() завершаются ошибкой threading.BrokenBarrierError. Используйте это, например, если один из потоков должен прерваться, чтобы избежать тупиковой блокировки приложения.

Возможно, предпочтительнее просто создать барьер с разумным значением тайм-аута, чтобы автоматически защитить один из потоков от сбоя.

Barrier.parties:

Атрибут Barrier.parties возвращает количество потоков, необходимых для прохождения барьера.

Barrier.n_waiting:

Атрибут Barrier.n_waiting возвращает количество потоков, ждущих в данный момент прохождения барьера.

Barrier.broken:

Атрибут Barrier.broken возвращает логическое значение True, если барьер находится в нерабочем состоянии.


Примеры установки барьеров для потоков.

Барьер устанавливает контрольную точку, и все участвующие потоки блокируются, пока все участвующие "стороны" не достигнут этой точки. Это позволяет потокам запускаться по отдельности, а затем приостанавливать работу, пока все они не будут готовы к продолжению.

В этом примере объект Barrier будет блокировать запускаемые потоки, до тех пор, пока все они не встанут в режим ожидания. Когда условие будет выполнено, все потоки освобождаются в контрольной точке одновременно. Возвращаемое значение Barrier.wait() указывает номер освобождаемой стороны и может быть использовано для ограничения некоторых потоков от выполнения таких действий, как очистка общего ресурса.

import threading, time

def worker(barrier):
    th_name = threading.current_thread().name
    print(f'{th_name} в ожидании барьера с {barrier.n_waiting} другими')
    worker_id = barrier.wait()
    print(f'{th_name} прохождение барьера {worker_id}')


# число потоков, при использовании
# барьеров оно должно быть постоянным
NUM_THREADS = 3

# установка барьера
barrier = threading.Barrier(NUM_THREADS)

threads = []
# создаем и запускаем потоки
for i in range(NUM_THREADS):
    th = threading.Thread(name=f'Worker-{i}', 
                          target=worker,
                          args=(barrier,),
                         )
    threads.append(th)
    print(f'Запуск {th.name}')
    th.start()
    time.sleep(0.3)

# блокируем основной поток программы 
# до завершения работы всех потоков
for thread in threads:
    thread.join()
    
# Запуск Worker-0
# Worker-0 в ожидании барьера с 0 другими
# Запуск Worker-1
# Worker-1 в ожидании барьера с 1 другими
# Запуск Worker-2
# Worker-2 в ожидании барьера с 2 другими
# Worker-2 прохождение барьера 2
# Worker-0 прохождение барьера 0
# Worker-1 прохождение барьера 1

Метод Barrier.abort() экземпляра Barrier заставляет все ожидающие потоки получать исключение threading.BrokenBarrierError.

Такое поведение позволяет потокам очищаться, если обработка остановлена, пока они заблокированы ожиданием контрольной точки методом Barrier.wait().

import threading, time

def worker(barrier):
    th_name = threading.current_thread().name
    print(f'{th_name} в ожидании барьера с {barrier.n_waiting} другими')
    try:
        worker_id = barrier.wait()
    except threading.BrokenBarrierError:
        print(f'{th_name} сброшен')
    else:
        print(f'{th_name} прохождение барьера {worker_id}')

# число потоков, при использовании
# барьеров оно должно быть постоянным
NUM_THREADS = 3

# обратите внимание происходит установка 
# барьера на 1 больше чем запускается потоков
barrier = threading.Barrier(NUM_THREADS + 1)

threads = []
# создаем и запускаем потоки
for i in range(NUM_THREADS):
    th = threading.Thread(name=f'Worker-{i}', 
                          target=worker,
                          args=(barrier,),
                         )
    threads.append(th)
    print(f'Запуск {th.name}')
    th.start()
    time.sleep(0.3)

# сброс барьера
barrier.abort()

# блокируем основной поток программы 
# до завершения работы всех потоков
for thread in threads:
    thread.join()
    
# Запуск Worker-0
# Worker-0 в ожидании барьера с 0 другими
# Запуск Worker-1
# Worker-1 в ожидании барьера с 1 другими
# Запуск Worker-2
# Worker-2 в ожидании барьера с 2 другими
# Worker-0 сброшен
# Worker-2 сброшен
# Worker-1 сброшен