import threading lock = threading.Lock()
Класс Lock()
модуля threading
, реализует примитивные объекты блокировки. Как только поток получил блокировку, то последующие попытки получить его блокируются, пока поток не будет разблокирован. Любой поток может снять блокировку.
Обратите внимание, что класс threading.Lock()
на самом деле является фабричным, который возвращает экземпляр наиболее эффективной версии конкретного класса threading.Lock()
, поддерживаемого платформой.
Примитивная блокировка потока - это примитив синхронизации, который при блокировке не принадлежит конкретному потоку. В Python в настоящее время это самый низкий из доступных примитивов синхронизации.
Примитивная блокировка находится в одном из двух состояний: "locked" или "unlocked". Экземпляр класса создается в разблокированном "unlocked" состоянии. У него есть два основных метода: Lock.acquire()
и Lock.release()
.
Когда состояние "unlocked", то метод Lock.acquire()
изменяет состояние на "locked" и немедленно возвращает результат экземпляру. Когда состояние "locked", то вызов метода Lock.acquire()
блокируется до тех пор, пока вызов метода Lock.release()
в другом потоке не изменит его на "unlocked", затем вызов Lock.acquire()
сбрасывает его в "locked" и снова возвращает результат экземпляру.
Метод Lock.release()
следует вызывать только когда экземпляр Lock
находится в состоянии "locked". Этот метод меняет состояние экземпляра Lock
на "unlocked" и немедленно возвращает результат экземпляру. Если будет сделана попытка вызвать Lock.release()
для состояния "unlocked", то будет вызвана ошибка RuntimeError
.
Класс threading.Lock()
также поддерживают протокол управления контекстом.
Когда в Lock.acquire()
блокируется более одного потока, ожидающего перехода состояния в "unlocked", то после вызова Lock.release()
только один поток переходит в состояние "unlocked". Какой из ожидающих потоков начинает работать, не определено и может варьироваться в разных реализациях.
threading.Lock
.Все методы выполняются атомарно.
Lock.acquire()
устанавливает блокировку,Lock.release()
снимает блокировку,Lock.locked()
проверяет состояние блокировки,Lock.acquire(blocking=True, timeout=-1)
:Метод Lock.acquire()
устанавливает блокировку, блокирующую или неблокирующую.
При вызове метода с аргументом blocking
, установленным в True
(по умолчанию) - блокирует потоки до тех пор, пока блокировка не будет снята, затем снова установит ее в состояние "locked" и вернет True
.
При вызове метода с аргументом blocking
, установленным в False
, не ставит блокировку, а проверит, сможет ли метод с blocking=True
поставить блокировку, если нет, то немедленно вернет False
, в противном случае установит блокировку и возвратит True
.
При вызове с аргументом timeout
(значение может быть float
), установленным в положительное значение, будет блокировать выполнение кода не более чем на количество секунд, заданное величиной timeout
и до тех пор, пока блокировка не будет получена.
Аргумент тайм-аута timeout
, равный -1, указывает на неограниченное ожидание. Запрещается указывать тайм-аут timeout
при ложной блокировке.
Метод Lock.acquire()
возвращает значение True
, если блокировка получена успешно и False
, если нет (например, если истекло время ожидания timeout
).
Lock.release()
:Метод Lock.release()
снимает блокировку. Метод может быть вызван из любого потока, а не только из потока, который получил блокировку.
Когда блокировка включена, этот метод сбрасывает его до состояния "unlocked" и возвращает результат своему экземпляру. Если какие-либо другие потоки заблокированы, ожидая снятия блокировки, разрешает выполнение ровно одному из них.
При вызове метода при снятой блокировки возникает исключение RuntimeError
.
Метод ничего не возвращает.
Lock.locked()
:Метод Lock.locked()
возвращает True
, если блокировка получена.
threading.Lock()
.Важно иметь возможность контролировать доступ к общим ресурсам для предотвращения повреждения или пропуска данных. Встроенные в Python структуры данных (списки, словари и т. д.) являются поточно-ориентированными, поскольку глобальная блокировка интерпретатора GIL, используемая для защиты внутренних структур данных Python, не снимается в середине обновления.
Другие структуры данных, реализованные в Python или более простые типы, такие как целые числа и числа с плавающей запятой, не имеют такой защиты. Для защиты от одновременного доступа к объекту нескольких потоков используйте объект threading.Lock()
.
В примере функция worker()
увеличивает экземпляр счетчика, который управляет блокировкой, чтобы предотвратить одновременное изменение внутреннего состояния двух потоков. Если бы блокировка не использовалась, то была бы вероятность пропуска изменения атрибута value
.
import threading, random, time class Counter(): def __init__(self, start=0): self.lock = threading.Lock() self.value = start def increment(self): th_name = threading.current_thread().name print(f'Th: {th_name} - ждет блокировку') self.lock.acquire() try: print(f'Th: {th_name} - получил блокировку') self.value = self.value + 1 finally: self.lock.release() def worker(c): for i in range(2): pause = random.random() th_name = threading.current_thread().name print(f'Th: {th_name} - заснул на {pause:0.02f}') time.sleep(pause) c.increment() print(f'Th: {th_name} - сделано.') counter = Counter() for i in range(2): t = threading.Thread(target=worker, args=(counter,)) t.start() print('Ожидание рабочих потоков') main_thread = threading.main_thread() for t in threading.enumerate(): if t is not main_thread: t.join() print(f'Счетчик: {counter.value}') # Th: Thread-1 - заснул на 0.34 # Th: Thread-2 - заснул на 0.26 # Ожидание рабочих потоков # Th: Thread-2 - ждет блокировку # Th: Thread-2 - получил блокировку # Th: Thread-2 - заснул на 0.34 # Th: Thread-1 - ждет блокировку # Th: Thread-1 - получил блокировку # Th: Thread-1 - заснул на 0.33 # Th: Thread-2 - ждет блокировку # Th: Thread-2 - получил блокировку # Th: Thread-2 - сделано. # Th: Thread-1 - ждет блокировку # Th: Thread-1 - получил блокировку # Th: Thread-1 - сделано. # Счетчик: 4
Чтобы узнать, получил ли другой поток блокировку, не задерживая текущий поток, необходимо передать False
для аргумента blocking
в методе Lock.acquire()
. В следующем примере worker()
пытается получить блокировку три раза и подсчитывает, сколько для этого нужно сделать попыток. Между тем, lock_holder()
циклически переключает блокировку между удержанием и снятием блокировки с короткими паузами в каждом состоянии, для имитации нагрузки.
Функции worker()
требуется более трех итераций, чтобы получить блокировку три отдельных раза.
import logging, threading, time def lock_holder(lock): logging.debug('Запуск') while True: lock.acquire() try: logging.debug('Нагрузка...') time.sleep(0.5) finally: logging.debug('Работа закончена') lock.release() time.sleep(0.5) def worker(lock): logging.debug('Запуск') num_tries = 0 num_acquires = 0 while num_acquires < 3: time.sleep(0.5) logging.debug('Попытка блокировки ресурса') have_it = lock.acquire(blocking=False) try: num_tries += 1 if have_it: logging.debug(f'Попытка №{num_tries}: ПОЛУЧИЛОСЬ') num_acquires += 1 else: logging.debug(f'Попытка №{num_tries}: НЕ ПОЛУЧИЛОСЬ') finally: if have_it: lock.release() logging.debug('Ресурс успешно блокирован 3 раза, после {num_tries} попыток...') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) lock = threading.Lock() holder = threading.Thread( target=lock_holder, args=(lock,), name='LockHolder', daemon=True, ) holder.start() worker = threading.Thread( target=worker, args=(lock,), name='Worker', ) worker.start() # (LockHolder) Запуск # (LockHolder) Нагрузка... # (Worker ) Запуск # (LockHolder) Работа закончена # (Worker ) Попытка блокировки ресурса # (Worker ) Попытка №1: ПОЛУЧИЛОСЬ # (LockHolder) Нагрузка... # (Worker ) Попытка блокировки ресурса # (Worker ) Попытка №2: НЕ ПОЛУЧИЛОСЬ # (LockHolder) Работа закончена # (Worker ) Попытка блокировки ресурса # (Worker ) Попытка №3: ПОЛУЧИЛОСЬ # (LockHolder) Нагрузка... # (Worker ) Попытка блокировки ресурса # (Worker ) Попытка №4: НЕ ПОЛУЧИЛОСЬ # (LockHolder) Работа закончена # (Worker ) Попытка блокировки ресурса # (Worker ) Попытка №5: ПОЛУЧИЛОСЬ # (Worker ) Ресурс успешно блокирован 3 раза, после 5 попыток...
В этом примере выбираются все текстовые файлы из директории test_dir
и объединяются в один multi-thead-file.txt
. Программа читает и обрабатывает файлы из каталога test_dir
и пишет в общий файл multi-thead-file.txt
в несколько потоков.
Предупреждение. Выполнение этого кода не даст прирост производительности по сравнению с однопоточным режимом, т.к. запись в общий ресурс блокируется и по сути программа становиться однопоточной. Этот пример приведен чисто в учебных целях, что бы понять как организовать доступ к общему ресурсу из разных потоков не нарушая его целостности и увидеть применение класса threading.Lock()
на практическом примере.
Здесь блокировка threading.Lock()
используется, для того, что бы предотвратить одновременный доступ из других потоков к файлу с общими данными multi-thead-file.txt
. Если убрать блокировку, то в итоговом файле можно увидеть, что строки из разных файлов перемешаны.
Места в коде, где используется блокировка, помечены !!!!!
с обоих концов комментария.
Что бы запустить данный пример, необходимо подготовить данные скриптом prepare-data.py
, приведенным в обзорной статье к модулю threading
или использовать свои текстовые файлы (измените переменную test_dir
).
import pathlib, threading, time, queue class Worker(threading.Thread): def __init__(self, que, write_file, lock): super().__init__() self.daemon = True self.files_queue = files_queue self.write_file = write_file self.lock = lock # переопределяем метод def run(self): while True: # !!!!! блокируем доступ к файлу из других потоков, # !!!!! что бы строки не писались вперемешку из других # !!!!! открытых файлов. Для блокировки/разблокировки # !!!!! используем менеджер контекста with self.lock: # Получаем задание (имя файла) из очереди job = self.files_queue.get() print(f'Th:{self.name} обработка {job.name}') # открываем файл `job` на чтение, а `write_file` на # ДОБАВЛЕНИЕ 'a+' данных к файлу с общими данными with open(job, 'r') as fread, open(self.write_file, 'a+') as fwrite: # пишем имя открытого файла fwrite.write(f'\n\nДанные из файла: {job.name}\n\n') # читаем данные построчно (экономим память) for line in fread: # здесь обрабатываем строку, # например, заменим букву у на 0 line = line.replace('у', '0') # потом пишем в файл fwrite.write(line) # Сообщаем очереди, что задача выполнена self.files_queue.task_done() path = pathlib.Path('.') # каталог с файлами test_dir = 'test_dir' path_dir = path.joinpath(test_dir) # список файлов list_files = path_dir.glob('*.txt') # создаем и заполняем очередь именами файлов files_queue = queue.Queue() for file in list_files: files_queue.put(file) # общий файл данных write_file = 'multi-thead-file.txt' if files_queue.empty(): print('НЕТ файлов для обработки.') else: # !!!!! создаем блокировщик !!!!! lock = threading.Lock() # Создаем и запускаем, например 3 потока for _ in range(3): th = Worker(files_queue, write_file, lock) th.start() # Блокируем выполнение программы до тех пор пока # потоки не обслужат все элементы очереди files_queue.join()