Сообщить об ошибке.

Примитивы синхронизации процессов модуля multiprocessing в Python

Синхронизация процессов при помощи блокировок

Как правило, примитивы синхронизации не так необходимы в программе, использующей несколько ядер процессора, как в многопоточной. Однако, модуль multiprocessing содержит эквиваленты всех примитивов синхронизации из модуля threading.

Обратите внимание, что можно также создавать примитивы синхронизации с помощью объекта Manager.

Примечание-1. В Mac OS X системная функция sem_timedwait() не поддерживается, поэтому вызов метода .acquire() с тайм-аутом, будет имитировать поведение sem_timedwait() с использованием цикла ожидания.

Примечание-2. Если поступает сигнал SIGINT, генерируемый Ctrl-C, когда основной поток заблокирован вызовом Lock.acquire(), RLock.acquire(), Semaphore.acquire(), BoundedSemaphore.acquire(), Condition.acquire() или Condition.wait(), то тогда вызов соответствующего метода будет немедленно прерван и поднимется исключение KeyboardInterrupt.

Это поведение отличается от поведения эквивалентных вызовов блокировок модуля threading, где SIGINT будет игнорироваться, пока выполняются вызовы вышеперечисленных методов.

Примечание-3. Для некоторых классов этого модуля требуется работающая реализация общего семафора в операционной системе хоста. Без него модуль multiprocessing.synchronize будет отключен, а попытки его импорта приведут к ошибке ImportError.

Модуль multiprocessing определяет следующие примитивы синхронизации:


multiprocessing.Barrier(parties[, action[, timeout]]):

Класс multiprocessing.Barrier() возвращает объект примитива синхронизации Barrier (барьер) и представляет собой полный клон объекта threading.Barrier.


multiprocessing.Semaphore([value]):

Класс multiprocessing.Semaphore() возвращает объект примитива синхронизации Semaphore (Семафор) и представляет собой близкий аналог объекта threading.BoundedSemaphore.

Существует единственное отличие от его близкого аналога: первый аргумент метода .acquire() называется block, что согласуется с методом Lock.acquire().


multiprocessing.BoundedSemaphore([value]):

Класс multiprocessing.BoundedSemaphore() возвращает объект примитива синхронизации BoundedSemaphore (Ограниченный семафор) и представляет собой близкий аналог объекта threading.BoundedSemaphore.

Существует единственное отличие от его близкого аналога: первый аргумент метода .acquire() называется block, что согласуется с методом Lock.acquire().

Обратите внимание, что в Mac OS X метод неотличим от семафора, поскольку системная функция sem_getvalue() не реализована на этой платформе.


multiprocessing.Condition([lock]):

Класс multiprocessing.Condition() возвращает объект примитива синхронизации Condition (Условие) и представляет собой псевдоним объекта threading.Condition.

Если указан аргумент lock, то это должен быть объект Lock или RLock из модуля multiprocessing.


multiprocessing.Event():

Класс multiprocessing.Event возвращает объект примитива синхронизации Event (Событие) и представляет собой полный клон объекта threading.Event.


multiprocessing.Lock():

Класс multiprocessing.Lock() возвращает объект простой блокировки: близкий аналог threading.Lock().

Как только процесс или поток получил блокировку, то последующие попытки получить ее от любого процесса или потока будут блокироваться до тех пор, пока она не будет снята. Любой процесс или поток может освободить его. Концепции и поведение объекта блокировки потока threading.Lock, реплицируются применительно к процессам или потокам в multiprocessing.Lock, за исключением случаев, указанных выше.

Обратите внимание, что объект Lock на самом деле является фабричным классом, который возвращает экземпляр multiprocessing.synchronize.Lock, инициализированный контекстом по умолчанию.

Объект Lock поддерживает протокол диспетчера контекста и поэтому может использоваться в операторах with.

Методы объекта Lock:

Lock.acquire(block=True, timeout=None):

Метод Lock.acquire() ставит блокирующую или неблокирующую блокировку.

Если аргумент block=True (по умолчанию), то вызов метода будет блокироваться до тех пор, пока блокировка не перейдет в разблокированное состояние unlock, затем установит для него значение lock заблокировано и вернет значение True. Обратите внимание, что имя этого аргумента отличается от имени в threading.Lock.acquire().

Если block=False, то вызов метода не блокируется. Если блокировка в настоящее время находится в состоянии lock, то метод вернет значение False, в противном случае - установит блокировку в состояние lock и вернет значение True.

Аргумент timeout:

  • Если для аргумента timeout (float) задано положительное значение, то метод блокируется на максимум количество секунд, заданное тайм-аутом, пока блокировка не будет получена.
  • Вызов с отрицательным значением timeout эквивалентно timeout=0.
  • Вызов со значением timeout=None (по умолчанию) устанавливает период timeout на бесконечность.

Метод будет возвращать True, если блокировка успешно получена или False, если время ожидания timeout истекло.

Обратите внимание, что обработка отрицательных значений или значений None аргумента timeout отличается от поведения, реализованного в threading.Lock.acquire(). Если аргумент block=False, то тайм-аут не имеет практического значения и следовательно игнорируется вообще.

Lock.release():

Метод Lock.release() снимает блокировку. Может быть вызван из любого процесса или потока, а не только из процесса или потока, который изначально получил блокировку.

Поведение такое же, как и в threading.Lock.release(), за исключением того, что при вызове метода для снятой блокировки возникает исключение ValueError.


multiprocessing.RLock():

Класс multiprocessing.RLock возвращает объект рекурсивной блокировки: близкий аналог threading.RLock.

Рекурсивная блокировка должна быть снята процессом или потоком, который ее получил. Как только процесс или поток получил рекурсивную блокировку, тот же процесс или поток может получить ее снова без блокировки. Этот-же процесс или поток должен освобождать блокировку один раз при каждом захвате.

Обратите внимание, что объект RLock представляет собой фабричный класс, который возвращает экземпляр multiprocessing.synchronize.RLock, который инициализируется контекстом по умолчанию.

Объект RLock поддерживает протокол диспетчера контекста и может использоваться в операторах with.

Методы объекта RLock:

RLock.acquire(block=True, timeout=None):

Метод RLock.acquire() устанавливает блокирующую или неблокирующую блокировку.

При вызове с аргументом block=True, блокируется до тех пор, пока блокировка не перейдет в состояние ullock (т. е. не будет принадлежать ни одному процессу или потоку), если только блокировка уже не принадлежит текущему процессу или потоку. Затем текущий процесс или поток становится владельцем блокировки, если он еще не владеет ей, а уровень рекурсии внутри блокировки увеличивается на единицу, в результате чего возвращается значение True.

Обратите внимание, что есть несколько отличий в поведении первого аргумента этого метода от реализации его в threading.RLock.acquire(), начиная с имени самого аргумента.

При вызове с аргументом block=False, не блокируется. Если блокировка уже была получена и принадлежит другому процессу или потоку, то текущий процесс или поток не становится владельцем и уровень рекурсии в блокировке не изменяется, в результате чего возвращается значение False. Если блокировка находится в состоянии unlock, то текущий процесс или поток становится ее владельцем и уровень рекурсии увеличивается, в результате чего возвращается значение True.

Использование и поведение аргумента timeout такие же, как и в Lock.acquire(). Обратите внимание, что некоторые из этих поведений timeout отличаются от поведений реализованных в threading.RLock.acquire().

RLock.release():

Метод RLock.release() опускает блокировку, уменьшив уровень рекурсии.

Если после уменьшения, уровень рекурсии равен нулю, то сбрасывает блокировку на уровень unlock, не принадлежащую ни одному процессу или потоку. При этом, если какие-либо другие процессы или потоки заблокированы и ждут состояния блокировки - unlock, то разрешает продолжить ровно одному из них. Если после уменьшения, уровень рекурсии все еще не равен нулю, то блокировка остается в состоянии lock и принадлежит вызывающему процессу или потоку.

Вызывайте этот метод только тогда, когда вызывающий процесс или поток владеет блокировкой. Если метод вызывается процессом или потоком, отличным от владельца блокировки или если блокировка находится в состоянии unlock, то вызывается исключение AssertionError.

Обратите внимание, что тип исключения, возникающего в этой ситуации, отличается от реализованного в threading.RLock.release().

Синхронизация процессов на примере доступа к общему ресурсу.

Допустим есть общий ресурс (файл, очередь, без разницы) и есть воркеры, которые выполняются разными процессами с разной нагрузкой, а результаты работы складываются в этот общий ресурс. Задача: не допустить смешивания данных, поступающие из разных процессов в общий ресурс, данные необходимо получить строго по порядку, т.е. сначала от одного процесса, потом другого и т.д.

Пример с очередью конечно надуманный, но он как нельзя лучше показывает, что такое синхронизация процессов и как она работает.

Реальные задачи синхронизации процессов стоят, например при объединении нескольких файлов в один, когда несколько процессов одновременно и построчно читают каждый свой файл, обрабатывают прочитанные строки (например что-то меняют) и сразу пытаются писать их (каждый процесс свою обработанную строку) в общий файл. Так вот если не использовать синхронизацию между читающими процессами, то строки из разных файлов в общем файле будут перемешаны/перепутаны. Смотрите пример с объектом threading.Lock и попробуйте повторит его с процессами.

import multiprocessing, time, random

def worker_lock(lock, lst):
    """worker с использованием блокировки""" 
    # PID процесса
    pid_proc = multiprocessing.current_process().pid
    # блокируем доступ к очереди, пока складываем в нее данные
    with lock:
        for n in range(3):
            # имитируем нагрузку, для того, что бы была 
            # конкуренция доступа к общему ресурсу (очереди)
            time.sleep(random.uniform(0.01, 0.1))
            # пока доступ из других процессов заблокирован, 
            # складываем данные - кортежи (pid, n)
            lst.put((f'PID-{pid_proc}', n))

def worker(lst):
    """worker без использования блокировки""" 
    pid_proc = multiprocessing.current_process().pid
    for n in range(3):
        time.sleep(random.uniform(0.01, 0.1))
        lst.put((f'PID-{pid_proc}', n))

if __name__ == '__main__':
    # кол-во процессов
    PROCESSES = 4

    # создаем объект блокировки
    lock = multiprocessing.Lock()
    # создаем очередь
    queue = multiprocessing.Queue()
    
    print('\nОЧЕРЕДЬ С БЛОКИРОВКОЙ:')
    procs = []
    for _ in range(PROCESSES):
        proc = multiprocessing.Process(target=worker_lock, args=(lock, queue))
        procs.append(proc)
        proc.start()

    # ждем результатов
    [proc.join() for proc in procs]
    # получаем данные из очереди, 
    # тем самым ее освобождаем 
    while not queue.empty():
        print(queue.get())
    
    # освобождаем ресурсы
    [proc.close() for proc in procs]

    print('\nОЧЕРЕДЬ БЕЗ БЛОКИРОВКИ:')
    procs = []
    for _ in range(PROCESSES):
        # что бы не использовать лишние ресурсы, будем 
        # использовать тот же объект очереди повторно 
        # (очередь освободили от предыдущих данных выше)
        proc = multiprocessing.Process(target=worker, args=(queue,))
        procs.append(proc)
        proc.start()
    
    # ждем результатов
    [proc.join() for proc in procs]
    # получаем данные из очереди
    while not queue.empty():
        print(queue.get())

    # освобождаем ресурсы
    [proc.close() for proc in procs]


# ОЧЕРЕДЬ С БЛОКИРОВКОЙ:
# ('PID-12443', 0)
# ('PID-12443', 1)
# ('PID-12443', 2)
# ('PID-12444', 0)
# ('PID-12444', 1)
# ('PID-12444', 2)
# ('PID-12445', 0)
# ('PID-12445', 1)
# ('PID-12445', 2)
# ('PID-12446', 0)
# ('PID-12446', 1)
# ('PID-12446', 2)

# ОЧЕРЕДЬ БЕЗ БЛОКИРОВКИ:
# ('PID-12456', 0)
# ('PID-12453', 0)
# ('PID-12454', 0)
# ('PID-12453', 1)
# ('PID-12454', 1)
# ('PID-12455', 0)
# ('PID-12454', 2)
# ('PID-12453', 2)
# ('PID-12455', 1)
# ('PID-12456', 1)
# ('PID-12456', 2)
# ('PID-12455', 2)

Из вывода программы видно, что там, где не использовалась блокировка - данные в очереди из разных процессов перемешаны.