import threading
barrier = threading.Barrier(parties, action=None, timeout=None)
parties
- количества потоков,action=None
- вызываемый объект,timeout=None
- тайм-аута по умолчанию для метода Barrier.wait()
.Класс Barrier()
модуля threading
создает объект-барьер для количества parties
потоков.
Аргумент action
, если он предусмотрен, вызывается одним из потоков при их освобождении.
Аргумент timeout
- это значение тайм-аута по умолчанию, если он не указан для метода Barrier.wait()
.
Этот класс предоставляет простой примитив синхронизации для использования фиксированным числом потоков, которым необходимо ждать друг друга. Каждый из потоков пытается преодолеть барьер, вызывая метод Barrier.wait()
, и будет блокироваться, пока все потоки не выполнят свои вызовы Barrier.wait()
. Как только все потоки выполнили свои вызовы, то в этот момент потоки освобождаются одновременно.
Барьер можно использовать повторно любое количество раз для одного и того же количества потоков.
В качестве общего примера приведем простой способ синхронизации клиентского и серверного потоков:
import threading
barrier = threading.Barrier(2, timeout=5)
def server():
start_server()
barrier.wait()
while True:
connection = accept_connection()
process_server_connection(connection)
def client():
barrier.wait()
while True:
connection = make_connection()
process_client_connection(connection)
Объекты барьеров threading.Barrier
также поддерживают протокол управления контекстом.
Barrier
.Barrier.wait()
ждет точку прохождения барьера,Barrier.reset()
возвращает барьер в пустое состояние,Barrier.abort()
ставит барьер в нерабочее состояние,Barrier.parties
количество потоков, необходимых для прохождения барьера,Barrier.n_waiting
количество потоков, ждущих прохождения барьера,Barrier.broken
проверяет барьер на нерабочее состояние,Barrier.wait(timeout=None)
:Метод Barrier.wait()
ждет точку прохождения барьера.
Когда все потоки, участвующие в барьере, вызвали эту функцию, все они освобождаются одновременно. Если предоставляется аргумент timeout
тайм-аут, то он используется вместо одноименного аргумента конструктора класса.
Возвращаемое значение - это целое число в диапазоне от 0 до количества участвующих сторон parties
- 1, различное для каждого потока. Такое поведение можно использовать для выбора потока для выполнения некоторых специальных операций, например:
i = barrier.wait()
if i == 0:
# Только один поток должен это вывести
print("барьер пройден")
Если конструктору было предоставлен аргумент action
, один из потоков вызовет его перед освобождением. Если этот вызов вызывает ошибку, то барьер переводится в неработающее состояние.
Если время вызова истекло, то барьер переводится в нерабочее состояние.
Метод Barrier.wait()
может вызвать исключение threading.BrokenBarrierError
, если барьер нерабочий или сброшен во время ожидания потока.
Barrier.reset()
:Метод Barrier.reset()
возвращает барьер в пустое состояние. Все ожидающие его потоки получат исключение threading.BrokenBarrierError
.
Обратите внимание, если есть другие потоки, состояние которых неизвестно, то для использования этой функции может потребоваться некоторая внешняя синхронизация. Если барьер в нерабочем состоянии, то может быть лучше оставить его и создать новый.
Barrier.abort()
:Метод Barrier.abort()
ставит барьер в нерабочее состояние.
Это приводит к тому, что любые активные или будущие вызовы метода Barrier.wait()
завершаются ошибкой threading.BrokenBarrierError
. Используйте это, например, если один из потоков должен прерваться, чтобы избежать тупиковой блокировки приложения.
Возможно, предпочтительнее просто создать барьер с разумным значением тайм-аута, чтобы автоматически защитить один из потоков от сбоя.
Barrier.parties
:Атрибут Barrier.parties
возвращает количество потоков, необходимых для прохождения барьера.
Barrier.n_waiting
:Атрибут Barrier.n_waiting
возвращает количество потоков, ждущих в данный момент прохождения барьера.
Barrier.broken
:Атрибут Barrier.broken
возвращает логическое значение True
, если барьер находится в нерабочем состоянии.
Барьер устанавливает контрольную точку, и все участвующие потоки блокируются, пока все участвующие "стороны" не достигнут этой точки. Это позволяет потокам запускаться по отдельности, а затем приостанавливать работу, пока все они не будут готовы к продолжению.
В этом примере объект Barrier
будет блокировать запускаемые потоки, до тех пор, пока все они не встанут в режим ожидания. Когда условие будет выполнено, все потоки освобождаются в контрольной точке одновременно. Возвращаемое значение Barrier.wait()
указывает номер освобождаемой стороны и может быть использовано для ограничения некоторых потоков от выполнения таких действий, как очистка общего ресурса.
import threading, time
def worker(barrier):
th_name = threading.current_thread().name
print(f'{th_name} в ожидании барьера с {barrier.n_waiting} другими')
worker_id = barrier.wait()
print(f'{th_name} прохождение барьера {worker_id}')
# число потоков, при использовании
# барьеров оно должно быть постоянным
NUM_THREADS = 3
# установка барьера
barrier = threading.Barrier(NUM_THREADS)
threads = []
# создаем и запускаем потоки
for i in range(NUM_THREADS):
th = threading.Thread(name=f'Worker-{i}',
target=worker,
args=(barrier,),
)
threads.append(th)
print(f'Запуск {th.name}')
th.start()
time.sleep(0.3)
# блокируем основной поток программы
# до завершения работы всех потоков
for thread in threads:
thread.join()
# Запуск Worker-0
# Worker-0 в ожидании барьера с 0 другими
# Запуск Worker-1
# Worker-1 в ожидании барьера с 1 другими
# Запуск Worker-2
# Worker-2 в ожидании барьера с 2 другими
# Worker-2 прохождение барьера 2
# Worker-0 прохождение барьера 0
# Worker-1 прохождение барьера 1
Метод Barrier.abort()
экземпляра Barrier
заставляет все ожидающие потоки получать исключение threading.BrokenBarrierError
.
Такое поведение позволяет потокам очищаться, если обработка остановлена, пока они заблокированы ожиданием контрольной точки методом Barrier.wait()
.
import threading, time
def worker(barrier):
th_name = threading.current_thread().name
print(f'{th_name} в ожидании барьера с {barrier.n_waiting} другими')
try:
worker_id = barrier.wait()
except threading.BrokenBarrierError:
print(f'{th_name} сброшен')
else:
print(f'{th_name} прохождение барьера {worker_id}')
# число потоков, при использовании
# барьеров оно должно быть постоянным
NUM_THREADS = 3
# обратите внимание происходит установка
# барьера на 1 больше чем запускается потоков
barrier = threading.Barrier(NUM_THREADS + 1)
threads = []
# создаем и запускаем потоки
for i in range(NUM_THREADS):
th = threading.Thread(name=f'Worker-{i}',
target=worker,
args=(barrier,),
)
threads.append(th)
print(f'Запуск {th.name}')
th.start()
time.sleep(0.3)
# сброс барьера
barrier.abort()
# блокируем основной поток программы
# до завершения работы всех потоков
for thread in threads:
thread.join()
# Запуск Worker-0
# Worker-0 в ожидании барьера с 0 другими
# Запуск Worker-1
# Worker-1 в ожидании барьера с 1 другими
# Запуск Worker-2
# Worker-2 в ожидании барьера с 2 другими
# Worker-0 сброшен
# Worker-2 сброшен
# Worker-1 сброшен