Сообщить об ошибке.

Класс Event() модуля threading в Python

Синхронизация потоков при помощи событий

Синтаксис:

import threading

event = threading.Event()

Параметры:

  • нет

Возвращаемое значение:

Описание:

Класс Event() модуля threading, реализует объекты событий.

Класс threading.Event() управляет флагом, которому можно присвоить значение True с помощью метода Event.set() и сбросить в значение False с помощью метода Event.clear(). Метод метод Event.wait() блокируется до тех пор, пока флаг не станет истинным True.

По умолчанию флаг имеет значение False.

Это один из простейших механизмов связи между потоками: один поток сигнализирует о событии, а другие потоки ждут его.

Методы объекта threading.Event.


Event.is_set():

Метод Event.is_set() возвращает True тогда и только тогда, когда внутренний флаг имеет значение True.

Event.set():

Метод Event.set() устанавливает для внутреннего флага значение True.

Пробуждаются все потоки, ожидающие его выполнения. Потоки, которые вызывают метод Event.wait() после установки флага, не будут блокироваться вообще.

Event.clear():

Метод Event.clear() сбрасывает внутренний флаг на значение False.

Впоследствии потоки, вызывающие метод Event.wait(), будут блокироваться до тех пор, пока не будет вызван Event.set(), чтобы снова установить внутренний флаг в True.

Event.wait(timeout=None):

Метод Event.wait() блокирует выполнение до тех пор, пока внутренний флаг не станет истинным True.

Если внутренний флаг при входе True, то метод сразу возвращает результат. В противном случае блокирует до тех пор, пока другой поток не вызовет метод Event.set() для установки внутреннего флага в значение True или пока не наступит необязательный тайм-аут timeout.

Когда присутствует аргумент timeout не равный None, то он должен быть числом с плавающей запятой, указывающее тайм-аут для операции в секундах или его долях.

Метод Event.wait() ВОЗВРАЩАЕТ (не устанавливает флаг) True когда внутренний флаг установлен в True, либо до вызова ожидания, либо после начала ожидания, поэтому он всегда будет возвращать True, за исключением случаев, когда задан timeout и время ожидания операции истекло.

Пример синхронизации потоков при помощи threading.Event().

Для одновременного выполнения отдельных операций используются несколько потоков , однако бывают случаи, когда важно иметь возможность синхронизировать операции двух или более потоков. Использование объектов событий threading.Event() - это простой способ взаимодействия между потоками.

Событие управляет внутренним флагом, который вызывающие могут либо установить, либо очистить. Другие потоки могут ждать установки флага. Обратите внимание, что метод event.wait() блокирует выполнение кода до тех пор, пока внутренний флаг не станет истинным.

import threading, time

def wait_event():
    print('Старт WAIT_EVENT()')
    event.wait()
    print('Код обработки по событию в WAIT_EVENT()')

def wait_timeout(time_out):
    print('Старт WAIT_TIMEOUT() ')
    while not event.is_set():
        is_set = event.wait(timeout=time_out)
        print(f'TimeOut {time_out} секунды истек')
        if is_set:
            print('Код обработки по событию в WAIT_TIMEOUT()')
        else:
            print('Пока ждем события, код обработки чего-то другого')
            time.sleep(3)

# установка глобального события
event = threading.Event()

t1 = threading.Thread(name='blocking', 
                  target=wait_event)
t1.start()

t2 = threading.Thread(name='non-blocking', 
                  target=wait_timeout, 
                  args=(2,))
t2.start()

print('Ожидание перед вызовом Event.set()')
time.sleep(5)
event.set()
print('Установлено событие в основном потоке')


# Старт WAIT_EVENT()
# Старт WAIT_TIMEOUT() 
# Ожидание перед вызовом Event.set()
# TimeOut 2 секунды истек
# Пока ждем события, код обработки чего-то другого
# Установлено событие в основном потоке
# Код обработки по событию в WAIT_EVENT()
# TimeOut 2 секунды истек
# Код обработки по событию в WAIT_TIMEOUT()

В этом примере wait_timeout() проверяет статус события без блокировки на неопределенный срок, а в функции wait_event() вызов event.wait() блокирует исполнение кода, пока статус события не изменится.

  • Метод event.wait() принимает аргумент - количество секунд ожидания события перед тайм-аутом.
  • Метод event.is_set() можно использовать отдельно для события, не опасаясь блокировки.

Пример использования события при объединении файлов.

В этом примере собираются все текстовые файлы из директории test_dir и объединяются в один multi-thead-file.txt. Программа читает и обрабатывает файлы из каталога test_dir в 2 потока, а пишет в общий файл multi-thead-file.txt в 1 поток.

Предупреждение. Выполнение этого кода даст прирост производительности по сравнению с однопоточным режимом, если в функции reader() будут производиться существенные изменения файла, приводящие к задержке однопоточного кода более 1 микросекунды (если меньше, то нет смысла в дополнительных потоках, GIL сведет все на нет). Так как запись производится в общий ресурс (общий файл), то записывать данные в несколько потоков нет никакого смысла. Этот пример приведен чисто в учебных целях, что бы увидеть применение событий threading.Event() на практическом примере.

При осуществлении записи в несколько потоков пришлось бы обеспечивать целостность данных, поступающих из разных потоков в общий ресурс при помощи блокировки общего файла, при этом смысл записи в несколько потоков теряется.

Здесь событие threading.Event() используется, что бы прервать ожидание поступления данных от reader(), тем самым выйти из цикла в функции writer() и закончить программу.

Места в коде, где используется событие, помечены !!!!! с обоих концов комментария.

Что бы запустить данный пример, необходимо подготовить данные скриптом prepare-data.py, приведенным в обзорной статье к модулю threading или использовать свои текстовые файлы (измените переменную test_dir).

import threading, queue, pathlib

def writer(wr_file): # ПИСАТЕЛЬ
    while True:
        # ожидаем получение данных
        if data_queue.empty():
            # !!!!! проверяем: живы ли потоки читателей `reader()` !!!!!
            if  event_reader.is_set():
                # если очередь пуста и все 
                # читатели завершили работу ТО:
                print('Все файлы объединены.')
                # конец работы - завершаем цикл
                break
        else:
            # как только поступили данные 
            # извлекаем их и записываем
            r_file, data = data_queue.get()
            # пишем данные
            print(f'Пишем файл {r_file.name}')
            with open(wr_file, 'a+') as fw:
                # дописываем в начало файла его имя
                fw.write(f'\n\nИмя файла {r_file}\n\n')
                # пишем сами данные
                fw.write(data)

def reader(i): # ЧИТАТЕЛЬ
    # здесь читаем и обрабатываем данные файлов
    while True:
        # Проверяем, есть ли файлы в очереди
        if files_queue.empty():
            print(f'Поток {i} завершен.')
            # выходим из цикла
            break
        # Получаем имя файла из очереди
        r_file = files_queue.get()
        print(f'Th{i}: Читаем  файл {r_file.name}')
        with open(r_file, 'r') as fr:
            # читаем построчно
            data = []
            for line in fr:
                # обрабатываем данные построчно
                line = line.replace('у', '0').lower()
                # складываем обработанные строки в список
                data.append(line)
            # объединяем данные в текст
            data_all = ''.join(data)
            # Ставим в очередь кортеж (имя_файла, данные)
            data_queue.put((r_file, data_all)) 

# !!!!! включаем управление событиями !!!!!
event_reader = threading.Event()
# очередь с названием файлов
files_queue = queue.Queue()
# очередь с обработанными данными
data_queue = queue.Queue()

path = pathlib.Path('.')
# каталог с файлами
test_dir = 'test_dir'
path_dir = path.joinpath(test_dir)
# список файлов
list_files = path_dir.glob('*.txt')

# создаем и заполняем очередь именами файлов
for file in list_files:
    files_queue.put(file)

# общий файл c обработанными данными
write_file = 'multi-thead-file.txt'

if files_queue.empty():
    print('НЕТ файлов для обработки.') 
else:
    # пишем в 1 поток. Если данные писать в несколько потоков, 
    # то нужно еще использовать блокировщик threading.Lock() 
    # или данные в итоговом файле будут перемешаны.
    tw = threading.Thread(target=writer, args=(write_file,))
    tw.start()
    
    # читаем и обрабатываем в 2 потока
    threads = []
    for i in range(2):
        tr = threading.Thread(target=reader, args=(i+1,))
        threads.append(tr)
        tr.start()

    # ждем, когда все файлы прочитаются
    [thread.join() for thread in threads]
    # как все потоки reader() завершены
    # !!!!! скажем об этом writer() !!!!!
    event_reader.set()