Сообщить об ошибке.

Класс Queue() модуля multiprocessing в Python

Обмен данными между процессами при помощи очередей

Модуль multiprocessing определяет три вида очередей Queue, SimpleQueue и JoinableQueue - это очереди FIFO с несколькими производителями и несколькими потребителями, смоделированные на основе обычной многопоточной очереди FIFO queue.Queue модуля стандартной библиотеки queue.

Синтаксис:

import multiprocessing

# очередь FIFO 
queue = multiprocessing.Queue([maxsize])

# очередь `Queue` с методами `.task_done()` и `.join()`
queue_join = multiprocessing.JoinableQueue([maxsize])

# упрощенная очередь, похожа на объект `Pipe`
queue_simple = multiprocessing.SimpleQueue()

Параметры:

  • maxsize - максимальное количество элементов в очереди.

Возвращаемое значение:

  1. объект Queue.
  2. объект JoinableQueue.
  3. объект SimpleQueue.

Описание:

Класс Queue() модуля multiprocessing возвращает общую очередь процесса, реализованную с помощью канала и нескольких блокировок/семафоров. Когда процесс впервые помещает элемент в очередь, запускается поток фидера, который переносит объекты из буфера в канал.

Класс multiprocessing.Queue() реализует все методы многопоточной очереди FIFO queue.Queue, за исключением ее методов .task_done() и .join().

Если вы используете multiprocessing.JoinableQueue, то необходимо вызывать метод JoinableQueue.task_done() для каждой задачи, удаленной из очереди, иначе семафор, используемый для подсчета количества незавершенных задач, может в конечном итоге переполниться, вызвав исключение.

Примечание-1. Исключения queue.Empty и queue.Full обычного модуля стандартной библиотеки queue вызываются для сигнализации тайм-аутов. Эти исключения недоступны в пространстве имен модуля multiprocessing, следовательно, для перехвата этих исключений необходимо импортировать.

Примечание-2. Когда объект помещается в очередь, то он обрабатывается, а фоновый поток позже сбрасывает обработанные данные в нижележащий канал. Это имеет некоторые последствия, которые немного удивительны, но не должны вызывать каких-либо практических трудностей - если они действительно беспокоят, то можно использовать очередь, созданную с помощью диспетчера multiprocessing.Manager.

  1. После помещения объекта в пустую очередь может возникнуть бесконечно малая задержка перед тем, как метод очереди Queue.empty() вернет False, а метод Queue.get_nowait() сможет вернуть результат, не поднимая исключения queue.Empty.
  2. Если несколько процессов ставят объекты в очередь, возможно, что объекты будут получены на другом конце не по порядку. Но если объекты ставятся в очередь одним и тем же процессом, то всегда будут располагаться в ожидаемом порядке относительно друг друга.

Предупреждение-1: если процесс убит, при попытке использовать очередь, с помощью метода Process.terminate() или функции os.kill(), то данные в очереди, скорее всего, будут повреждены. Это может привести к тому, что любой другой процесс, при попытке использовать очередь позже получит исключение.

Предупреждение-2. Как упоминалось выше, если дочерний процесс поместил элементы в очередь и не использовал метод Queue.cancel_join_thread(), то этот процесс не завершится, пока все буферизованные элементы не будут сброшены в конвейер.
Это означает, что если попытаться присоединиться к этому процессу и все элементы, помещенные в очередь были израсходованы, то можно попасть в тупик. Точно так же, если дочерний процесс не является демоническим, тогда родительский процесс может зависнуть при выходе, когда он попытается присоединиться ко всем своим не демоническим дочерним процессам.


Методы объекта Queue.

Queue.qsize():

Метод Queue.qsize() возвращает примерный размер очереди. Из-за семантики многопоточности/многопроцессорности это число не является надежным.

Обратите внимание, что метод может вызвать исключение NotImplementedError на платформах Unix, таких как Mac OS X, где системный вызов sem_getvalue() не реализован.

Queue.empty():

Метод Queue.empty() возвращает True, если очередь пуста и False в противном случае. Из-за многопоточной/многопроцессорной семантики вызов метода ненадежен.

Queue.full():

Метод Queue.full() возвращает True, если очередь заполнена и False в противном случае. Из-за многопоточной/многопроцессорной семантики вызов метода ненадежен.

Queue.put(obj[, block[, timeout]]):

Метод Queue.put() ставит obj в очередь.

  • Если необязательный аргумент block=True (по умолчанию), а аргумент timeout=None (по умолчанию), то при необходимости блокируется, пока не станет доступен свободный слот.
  • Если значение timeout является положительным числом, то он блокируется не более timeout секунд и поднимает исключение queue.Full, если в течение этого времени не стал доступен свободный слот.
  • В случае block=False и если свободный слот доступен, то сразу поместит элемент в очередь, иначе вызовет исключение queue.Full. В этом случае тайм-аут timeout игнорируется.

Изменено в Python 3.8: Если очередь закрыта, то вместо AssertionError вызывается исключение ValueError.

Queue.put_nowait(obj):

Метод Queue.put_nowait() представляет собой эквивалент метода Queue.put(obj, False).

Queue.get([block[, timeout]]):

Метод Queue.get() удаляет и возвращает элемент из очереди.

  • Если необязательный аргумент block=True (по умолчанию), а аргумент timeout=None(по умолчанию), то при необходимости блокируйтся, пока элемент не станет доступным.
  • Если значение timeout является положительным числом, то он блокируется не более timeout секунд и поднимает исключение queue.Empty, если в течение этого времени ни один элемент не был доступен.
  • В случае block=False и если элемент доступен, то сразу извлечет его, иначе вызовет исключение queue.Empty. В этом случае тайм-аут timeout игнорируется.

Изменено в версии 3.8: Если очередь закрыта, то вместо OSError вызывается исключение ValueError.

Queue.get_nowait():

Метод Queue.put_nowait() представляет собой эквивалент метода Queue.get(False).

Queue.close():

Метод Queue.close() указывает, что текущий процесс больше не будет помещать данные в эту очередь. Фоновый поток завершится, как только он сбросит все буферизованные данные в очередь, в нижележащий канал.

Метод вызывается автоматически, когда очередь обрабатывается сборщиком мусора.

Queue.join_thread():

Метод Queue.join_thread() присоединяется к фоновому потоку.

Метод можно использовать только после вызова метода Queue.close(). Он блокируется до тех пор, пока фоновый поток не выйдет, гарантируя, что все данные в буфере были сброшены в очередь, в нижележащий канал.

По умолчанию, если процесс не является создателем очереди, то при выходе он попытается присоединиться к фоновому потоку очереди. Процесс может вызвать метод Queue.cancel_join_thread(), чтобы метод Queue.join_thread() не использовался.

Queue.cancel_join_thread():

Метод Queue.cancel_join_thread() предотвращает операцию блокировки методом Queue.join_thread(). В частности, этот метод предотвращает автоматическое присоединение фонового потока при выходе из процесса.

Лучшим именем для этого метода может быть allow_exit_without_flush(). Это, скорее всего, приведет к потере данных в очереди, которые почти наверняка не придется их использовать. Метод должен вызываться только в том случае, если необходимо, чтобы текущий процесс немедленно завершился, не дожидаясь отправки данных в очередь в нижележащий канал.

Примечание. Функциональность этого класса требует наличия работающей реализации общего семафора в операционной системе хоста. Без него функции этого класса будут отключены, а попытки создать экземпляр multiprocessing.Queue приведут к ошибке ImportError. То же самое верно для любого из специализированных типов очередей, перечисленных в начале материала.


Методы объекта JoinableQueue.

Объект очереди JoinableQueue наследует все методы объекта Queue и дополнительно имеет методы .task_done() и .join().

JoinableQueue.task_done():

Метод JoinableQueue.task_done() указывает, что задача, ранее поставленная в очередь, завершена. Используется потребителями очереди.

Для каждого метода Queue.get(), используемого для извлечения задачи из очереди, последующий вызов JoinableQueue.task_done() сообщает очереди, что обработка задачи завершена.

Если метод JoinableQueue.join() в настоящее время блокируется, то он возобновится, когда все элементы будут обработаны. Это означает, что вызов метода JoinableQueue.task_done() был получен для каждого элемента, который был помещен Queue.put() в очередь.

Если вызывается больше, чем было помещено элементов в очередь, то вызывает ошибку ValueError.

JoinableQueue.join():

Метод JoinableQueue.join() блокирует, пока все элементы в очереди не будут получены и обработаны.

Счетчик незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь.

Счетчик уменьшается всякий раз, когда потребитель вызывает метод JoinableQueue.task_done(), тем самым сообщая, что элемент был получен и вся работа над ним завершена. Когда количество незавершенных задач упадет до нуля, метод JoinableQueue.join() разблокирует дальнейший ход программы.

Методы объекта SimpleQueue.

Объект SimpleQueue - это упрощенный тип очереди, по поведению очень похожий на объект канала multiprocessing.Pipe, работающий в однонаправленном режиме duplex=False.

SimpleQueue.close():

Метод SimpleQueue.close() закрывает очередь и освобождает внутренние ресурсы.

Очередь не может больше использоваться после ее закрытия. Например, нельзя больше вызывать методы SimpleQueue.get(), SimpleQueue.put() и SimpleQueue.empty().

Новое в Python 3.9.

SimpleQueue.empty():

Метод SimpleQueue.empty() возвращает True, если очередь пуста и False в противном случае.

SimpleQueue.get():

Метод SimpleQueue.get() возвращает, а потом удаляет элемент из очереди.

SimpleQueue.put(item):

Метод SimpleQueue.put() ставит элемент item в очередь.


Пример обмена данными между процессами при помощи очередей:

import multiprocessing
import time, random

def worker(input, output):
    """Функция, выполняемая рабочими процессами"""
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

def calculate(func, args):
    """Функция, используемая для вычисления результата"""
    proc_name = multiprocessing.current_process().name
    result = func(*args)
    return f'{proc_name}, результат функции {func.__name__}{args} = {result}' 

########################################
# Функции, на которые ссылаются задачи #
########################################
def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Создание очередей
    task_queue = multiprocessing.Queue()
    done_queue = multiprocessing.Queue()

    # Заполнение очереди заданий
    for task in TASKS1:
        task_queue.put(task)

    # Запуск рабочих процессов
    for i in range(NUMBER_OF_PROCESSES):
        multiprocessing.Process(target=worker, args=(task_queue, done_queue)).start()

    # Получение и печать результатов
    print('НЕУПОРЯДОЧЕННЫЕ РЕЗУЛЬТАТЫ:\n')
    print('TASKS1:\n')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Добавляем больше задач с помощью метода `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Выводим еще несколько результатов
    print('TASKS2:\n')
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Говорим дочерним процессам остановиться
    print('STOP.')
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')

if __name__ == '__main__':
    test()