import threading barrier = threading.Barrier(parties, action=None, timeout=None)
parties
- количества потоков,action=None
- вызываемый объект,timeout=None
- тайм-аута по умолчанию для метода Barrier.wait()
.Класс Barrier()
модуля threading
создает объект-барьер для количества parties
потоков.
Аргумент action
, если он предусмотрен, вызывается одним из потоков при их освобождении.
Аргумент timeout
- это значение тайм-аута по умолчанию, если он не указан для метода Barrier.wait()
.
Этот класс предоставляет простой примитив синхронизации для использования фиксированным числом потоков, которым необходимо ждать друг друга. Каждый из потоков пытается преодолеть барьер, вызывая метод Barrier.wait()
, и будет блокироваться, пока все потоки не выполнят свои вызовы Barrier.wait()
. Как только все потоки выполнили свои вызовы, то в этот момент потоки освобождаются одновременно.
Барьер можно использовать повторно любое количество раз для одного и того же количества потоков.
В качестве общего примера приведем простой способ синхронизации клиентского и серверного потоков:
import threading barrier = threading.Barrier(2, timeout=5) def server(): start_server() barrier.wait() while True: connection = accept_connection() process_server_connection(connection) def client(): barrier.wait() while True: connection = make_connection() process_client_connection(connection)
Объекты барьеров threading.Barrier
также поддерживают протокол управления контекстом.
Barrier
.Barrier.wait()
ждет точку прохождения барьера,Barrier.reset()
возвращает барьер в пустое состояние,Barrier.abort()
ставит барьер в нерабочее состояние,Barrier.parties
количество потоков, необходимых для прохождения барьера,Barrier.n_waiting
количество потоков, ждущих прохождения барьера,Barrier.broken
проверяет барьер на нерабочее состояние,Barrier.wait(timeout=None)
:Метод Barrier.wait()
ждет точку прохождения барьера.
Когда все потоки, участвующие в барьере, вызвали эту функцию, все они освобождаются одновременно. Если предоставляется аргумент timeout
тайм-аут, то он используется вместо одноименного аргумента конструктора класса.
Возвращаемое значение - это целое число в диапазоне от 0 до количества участвующих сторон parties
- 1, различное для каждого потока. Такое поведение можно использовать для выбора потока для выполнения некоторых специальных операций, например:
i = barrier.wait() if i == 0: # Только один поток должен это вывести print("барьер пройден")
Если конструктору было предоставлен аргумент action
, один из потоков вызовет его перед освобождением. Если этот вызов вызывает ошибку, то барьер переводится в неработающее состояние.
Если время вызова истекло, то барьер переводится в нерабочее состояние.
Метод Barrier.wait()
может вызвать исключение threading.BrokenBarrierError
, если барьер нерабочий или сброшен во время ожидания потока.
Barrier.reset()
:Метод Barrier.reset()
возвращает барьер в пустое состояние. Все ожидающие его потоки получат исключение threading.BrokenBarrierError
.
Обратите внимание, если есть другие потоки, состояние которых неизвестно, то для использования этой функции может потребоваться некоторая внешняя синхронизация. Если барьер в нерабочем состоянии, то может быть лучше оставить его и создать новый.
Barrier.abort()
:Метод Barrier.abort()
ставит барьер в нерабочее состояние.
Это приводит к тому, что любые активные или будущие вызовы метода Barrier.wait()
завершаются ошибкой threading.BrokenBarrierError
. Используйте это, например, если один из потоков должен прерваться, чтобы избежать тупиковой блокировки приложения.
Возможно, предпочтительнее просто создать барьер с разумным значением тайм-аута, чтобы автоматически защитить один из потоков от сбоя.
Barrier.parties
:Атрибут Barrier.parties
возвращает количество потоков, необходимых для прохождения барьера.
Barrier.n_waiting
:Атрибут Barrier.n_waiting
возвращает количество потоков, ждущих в данный момент прохождения барьера.
Barrier.broken
:Атрибут Barrier.broken
возвращает логическое значение True
, если барьер находится в нерабочем состоянии.
Барьер устанавливает контрольную точку, и все участвующие потоки блокируются, пока все участвующие "стороны" не достигнут этой точки. Это позволяет потокам запускаться по отдельности, а затем приостанавливать работу, пока все они не будут готовы к продолжению.
В этом примере объект Barrier
будет блокировать запускаемые потоки, до тех пор, пока все они не встанут в режим ожидания. Когда условие будет выполнено, все потоки освобождаются в контрольной точке одновременно. Возвращаемое значение Barrier.wait()
указывает номер освобождаемой стороны и может быть использовано для ограничения некоторых потоков от выполнения таких действий, как очистка общего ресурса.
import threading, time def worker(barrier): th_name = threading.current_thread().name print(f'{th_name} в ожидании барьера с {barrier.n_waiting} другими') worker_id = barrier.wait() print(f'{th_name} прохождение барьера {worker_id}') # число потоков, при использовании # барьеров оно должно быть постоянным NUM_THREADS = 3 # установка барьера barrier = threading.Barrier(NUM_THREADS) threads = [] # создаем и запускаем потоки for i in range(NUM_THREADS): th = threading.Thread(name=f'Worker-{i}', target=worker, args=(barrier,), ) threads.append(th) print(f'Запуск {th.name}') th.start() time.sleep(0.3) # блокируем основной поток программы # до завершения работы всех потоков for thread in threads: thread.join() # Запуск Worker-0 # Worker-0 в ожидании барьера с 0 другими # Запуск Worker-1 # Worker-1 в ожидании барьера с 1 другими # Запуск Worker-2 # Worker-2 в ожидании барьера с 2 другими # Worker-2 прохождение барьера 2 # Worker-0 прохождение барьера 0 # Worker-1 прохождение барьера 1
Метод Barrier.abort()
экземпляра Barrier
заставляет все ожидающие потоки получать исключение threading.BrokenBarrierError
.
Такое поведение позволяет потокам очищаться, если обработка остановлена, пока они заблокированы ожиданием контрольной точки методом Barrier.wait()
.
import threading, time def worker(barrier): th_name = threading.current_thread().name print(f'{th_name} в ожидании барьера с {barrier.n_waiting} другими') try: worker_id = barrier.wait() except threading.BrokenBarrierError: print(f'{th_name} сброшен') else: print(f'{th_name} прохождение барьера {worker_id}') # число потоков, при использовании # барьеров оно должно быть постоянным NUM_THREADS = 3 # обратите внимание происходит установка # барьера на 1 больше чем запускается потоков barrier = threading.Barrier(NUM_THREADS + 1) threads = [] # создаем и запускаем потоки for i in range(NUM_THREADS): th = threading.Thread(name=f'Worker-{i}', target=worker, args=(barrier,), ) threads.append(th) print(f'Запуск {th.name}') th.start() time.sleep(0.3) # сброс барьера barrier.abort() # блокируем основной поток программы # до завершения работы всех потоков for thread in threads: thread.join() # Запуск Worker-0 # Worker-0 в ожидании барьера с 0 другими # Запуск Worker-1 # Worker-1 в ожидании барьера с 1 другими # Запуск Worker-2 # Worker-2 в ожидании барьера с 2 другими # Worker-0 сброшен # Worker-2 сброшен # Worker-1 сброшен