Сообщить об ошибке.

Класс Lock() модуля threading в Python

Синхронизация потоков при помощи блокировок

Синтаксис:

import threading

lock = threading.Lock()

Параметры:

  • нет.

Возвращаемое значение:

Описание:

Класс Lock() модуля threading, реализует примитивные объекты блокировки. Как только поток получил блокировку, то последующие попытки получить его блокируются, пока поток не будет разблокирован. Любой поток может снять блокировку.

Обратите внимание, что класс threading.Lock() на самом деле является фабричным, который возвращает экземпляр наиболее эффективной версии конкретного класса threading.Lock(), поддерживаемого платформой.

Примитивная блокировка потока - это примитив синхронизации, который при блокировке не принадлежит конкретному потоку. В Python в настоящее время это самый низкий из доступных примитивов синхронизации.

Примитивная блокировка находится в одном из двух состояний: "locked" или "unlocked". Экземпляр класса создается в разблокированном "unlocked" состоянии. У него есть два основных метода: Lock.acquire() и Lock.release().

Когда состояние "unlocked", то метод Lock.acquire() изменяет состояние на "locked" и немедленно возвращает результат экземпляру. Когда состояние "locked", то вызов метода Lock.acquire() блокируется до тех пор, пока вызов метода Lock.release() в другом потоке не изменит его на "unlocked", затем вызов Lock.acquire() сбрасывает его в "locked" и снова возвращает результат экземпляру.

Метод Lock.release() следует вызывать только когда экземпляр Lock находится в состоянии "locked". Этот метод меняет состояние экземпляра Lock на "unlocked" и немедленно возвращает результат экземпляру. Если будет сделана попытка вызвать Lock.release() для состояния "unlocked", то будет вызвана ошибка RuntimeError.

Класс threading.Lock() также поддерживают протокол управления контекстом.

Когда в Lock.acquire() блокируется более одного потока, ожидающего перехода состояния в "unlocked", то после вызова Lock.release() только один поток переходит в состояние "unlocked". Какой из ожидающих потоков начинает работать, не определено и может варьироваться в разных реализациях.

Методы объекта threading.Lock.

Все методы выполняются атомарно.


Lock.acquire(blocking=True, timeout=-1):

Метод Lock.acquire() устанавливает блокировку, блокирующую или неблокирующую.

При вызове метода с аргументом blocking, установленным в True (по умолчанию) - блокирует потоки до тех пор, пока блокировка не будет снята, затем снова установит ее в состояние "locked" и вернет True.

При вызове метода с аргументом blocking, установленным в False, не ставит блокировку, а проверит, сможет ли метод с blocking=True поставить блокировку, если нет, то немедленно вернет False, в противном случае установит блокировку и возвратит True.

При вызове с аргументом timeout (значение может быть float), установленным в положительное значение, будет блокировать выполнение кода не более чем на количество секунд, заданное величиной timeout и до тех пор, пока блокировка не будет получена.

Аргумент тайм-аута timeout, равный -1, указывает на неограниченное ожидание. Запрещается указывать тайм-аут timeout при ложной блокировке.

Метод Lock.acquire() возвращает значение True, если блокировка получена успешно и False, если нет (например, если истекло время ожидания timeout).

Lock.release():

Метод Lock.release() снимает блокировку. Метод может быть вызван из любого потока, а не только из потока, который получил блокировку.

Когда блокировка включена, этот метод сбрасывает его до состояния "unlocked" и возвращает результат своему экземпляру. Если какие-либо другие потоки заблокированы, ожидая снятия блокировки, разрешает выполнение ровно одному из них.

При вызове метода при снятой блокировки возникает исключение RuntimeError.

Метод ничего не возвращает.

Lock.locked():

Метод Lock.locked() возвращает True, если блокировка получена.


Общие примеры использования блокировок threading.Lock().

Важно иметь возможность контролировать доступ к общим ресурсам для предотвращения повреждения или пропуска данных. Встроенные в Python структуры данных (списки, словари и т. д.) являются поточно-ориентированными, поскольку глобальная блокировка интерпретатора GIL, используемая для защиты внутренних структур данных Python, не снимается в середине обновления.

Другие структуры данных, реализованные в Python или более простые типы, такие как целые числа и числа с плавающей запятой, не имеют такой защиты. Для защиты от одновременного доступа к объекту нескольких потоков используйте объект threading.Lock().

В примере функция worker() увеличивает экземпляр счетчика, который управляет блокировкой, чтобы предотвратить одновременное изменение внутреннего состояния двух потоков. Если бы блокировка не использовалась, то была бы вероятность пропуска изменения атрибута value.

import threading, random, time

class Counter():

    def __init__(self, start=0):
        self.lock = threading.Lock()
        self.value = start

    def increment(self):
        th_name = threading.current_thread().name
        print(f'Th: {th_name} - ждет блокировку')
        self.lock.acquire()
        try:
            print(f'Th: {th_name} - получил блокировку')
            self.value = self.value + 1
        finally:
            self.lock.release()


def worker(c):
    for i in range(2):
        pause = random.random()
        th_name = threading.current_thread().name
        print(f'Th: {th_name} - заснул на {pause:0.02f}')
        time.sleep(pause)
        c.increment()
    print(f'Th: {th_name} - сделано.')


counter = Counter()
for i in range(2):
    t = threading.Thread(target=worker, args=(counter,))
    t.start()

print('Ожидание рабочих потоков')
main_thread = threading.main_thread()
for t in threading.enumerate():
    if t is not main_thread:
        t.join()
print(f'Счетчик: {counter.value}')


# Th: Thread-1 - заснул на 0.34
# Th: Thread-2 - заснул на 0.26
# Ожидание рабочих потоков
# Th: Thread-2 - ждет блокировку
# Th: Thread-2 - получил блокировку
# Th: Thread-2 - заснул на 0.34
# Th: Thread-1 - ждет блокировку
# Th: Thread-1 - получил блокировку
# Th: Thread-1 - заснул на 0.33
# Th: Thread-2 - ждет блокировку
# Th: Thread-2 - получил блокировку
# Th: Thread-2 - сделано.
# Th: Thread-1 - ждет блокировку
# Th: Thread-1 - получил блокировку
# Th: Thread-1 - сделано.
# Счетчик: 4

Чтобы узнать, получил ли другой поток блокировку, не задерживая текущий поток, необходимо передать False для аргумента blocking в методе Lock.acquire(). В следующем примере worker() пытается получить блокировку три раза и подсчитывает, сколько для этого нужно сделать попыток. Между тем, lock_holder() циклически переключает блокировку между удержанием и снятием блокировки с короткими паузами в каждом состоянии, для имитации нагрузки.

Функции worker() требуется более трех итераций, чтобы получить блокировку три отдельных раза.

import logging, threading, time


def lock_holder(lock):
    logging.debug('Запуск')
    while True:
        lock.acquire()
        try:
            logging.debug('Нагрузка...')
            time.sleep(0.5)
        finally:
            logging.debug('Работа закончена')
            lock.release()
        time.sleep(0.5)


def worker(lock):
    logging.debug('Запуск')
    num_tries = 0
    num_acquires = 0
    while num_acquires < 3:
        time.sleep(0.5)
        logging.debug('Попытка блокировки ресурса')
        have_it = lock.acquire(blocking=False)
        try:
            num_tries += 1
            if have_it:
                logging.debug(f'Попытка №{num_tries}: ПОЛУЧИЛОСЬ')
                num_acquires += 1
            else:
                logging.debug(f'Попытка №{num_tries}: НЕ ПОЛУЧИЛОСЬ')
        finally:
            if have_it:
                lock.release()
    logging.debug('Ресурс успешно блокирован 3 раза, после {num_tries} попыток...')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

lock = threading.Lock()

holder = threading.Thread(
    target=lock_holder,
    args=(lock,),
    name='LockHolder',
    daemon=True,
)
holder.start()

worker = threading.Thread(
    target=worker,
    args=(lock,),
    name='Worker',
)
worker.start()


# (LockHolder) Запуск
# (LockHolder) Нагрузка...
# (Worker    ) Запуск
# (LockHolder) Работа закончена
# (Worker    ) Попытка блокировки ресурса
# (Worker    ) Попытка №1: ПОЛУЧИЛОСЬ
# (LockHolder) Нагрузка...
# (Worker    ) Попытка блокировки ресурса
# (Worker    ) Попытка №2: НЕ ПОЛУЧИЛОСЬ
# (LockHolder) Работа закончена
# (Worker    ) Попытка блокировки ресурса
# (Worker    ) Попытка №3: ПОЛУЧИЛОСЬ
# (LockHolder) Нагрузка...
# (Worker    ) Попытка блокировки ресурса
# (Worker    ) Попытка №4: НЕ ПОЛУЧИЛОСЬ
# (LockHolder) Работа закончена
# (Worker    ) Попытка блокировки ресурса
# (Worker    ) Попытка №5: ПОЛУЧИЛОСЬ
# (Worker    ) Ресурс успешно блокирован 3 раза, после 5 попыток...

Пример блокировки общего ресурса при многопоточном объединении файлов.

В этом примере выбираются все текстовые файлы из директории test_dir и объединяются в один multi-thead-file.txt. Программа читает и обрабатывает файлы из каталога test_dir и пишет в общий файл multi-thead-file.txt в несколько потоков.

Предупреждение. Выполнение этого кода не даст прирост производительности по сравнению с однопоточным режимом, т.к. запись в общий ресурс блокируется и по сути программа становиться однопоточной. Этот пример приведен чисто в учебных целях, что бы понять как организовать доступ к общему ресурсу из разных потоков не нарушая его целостности и увидеть применение класса threading.Lock() на практическом примере.

Здесь блокировка threading.Lock() используется, для того, что бы предотвратить одновременный доступ из других потоков к файлу с общими данными multi-thead-file.txt. Если убрать блокировку, то в итоговом файле можно увидеть, что строки из разных файлов перемешаны.

Места в коде, где используется блокировка, помечены !!!!! с обоих концов комментария.

Что бы запустить данный пример, необходимо подготовить данные скриптом prepare-data.py, приведенным в обзорной статье к модулю threading или использовать свои текстовые файлы (измените переменную test_dir).

import pathlib, threading, time, queue

class Worker(threading.Thread):

    def __init__(self, que, write_file, lock):
        super().__init__()
        self.daemon = True
        self.files_queue = files_queue
        self.write_file = write_file
        self.lock = lock
        
    # переопределяем метод
    def run(self):
        while True:
            # !!!!! блокируем доступ к файлу из других потоков, 
            # !!!!! что бы строки не писались вперемешку из других
            # !!!!! открытых файлов. Для блокировки/разблокировки
            # !!!!! используем менеджер контекста
            with self.lock: 
                # Получаем задание (имя файла) из очереди
                job = self.files_queue.get()
                print(f'Th:{self.name} обработка {job.name}')
                # открываем файл `job` на чтение, а `write_file` на 
                # ДОБАВЛЕНИЕ 'a+' данных к файлу с общими данными 
                with open(job, 'r') as fread, open(self.write_file, 'a+') as fwrite:
                    # пишем имя открытого файла
                    fwrite.write(f'\n\nДанные из файла: {job.name}\n\n')
                    # читаем данные построчно (экономим память)
                    for line in fread:
                        # здесь обрабатываем строку, 
                        # например, заменим букву у на 0
                        line = line.replace('у', '0')
                        # потом пишем в файл
                        fwrite.write(line)

            # Сообщаем очереди, что задача выполнена
            self.files_queue.task_done()


path = pathlib.Path('.')
# каталог с файлами
test_dir = 'test_dir'
path_dir = path.joinpath(test_dir)
# список файлов
list_files = path_dir.glob('*.txt')

# создаем и заполняем очередь именами файлов
files_queue = queue.Queue()
for file in list_files:
    files_queue.put(file)

# общий файл данных
write_file = 'multi-thead-file.txt'

if files_queue.empty():
    print('НЕТ файлов для обработки.') 
else:
    # !!!!! создаем блокировщик !!!!! 
    lock = threading.Lock()
    # Создаем и запускаем, например 3 потока
    for _ in range(3):
        th = Worker(files_queue, write_file, lock)
        th.start()

    # Блокируем выполнение программы до тех пор пока
    # потоки не обслужат все элементы очереди
    files_queue.join()