Модуль multiprocessing
определяет три вида очередей Queue
, SimpleQueue
и JoinableQueue
- это очереди FIFO с несколькими производителями и несколькими потребителями, смоделированные на основе обычной многопоточной очереди FIFO queue.Queue
модуля стандартной библиотеки queue
.
import multiprocessing # очередь FIFO queue = multiprocessing.Queue([maxsize]) # очередь `Queue` с методами `.task_done()` и `.join()` queue_join = multiprocessing.JoinableQueue([maxsize]) # упрощенная очередь, похожа на объект `Pipe` queue_simple = multiprocessing.SimpleQueue()
maxsize
- максимальное количество элементов в очереди.Класс Queue()
модуля multiprocessing
возвращает общую очередь процесса, реализованную с помощью канала и нескольких блокировок/семафоров. Когда процесс впервые помещает элемент в очередь, запускается поток фидера, который переносит объекты из буфера в канал.
Класс multiprocessing.Queue()
реализует все методы многопоточной очереди FIFO queue.Queue
, за исключением ее методов .task_done()
и .join()
.
Если вы используете multiprocessing.JoinableQueue
, то необходимо вызывать метод JoinableQueue.task_done()
для каждой задачи, удаленной из очереди, иначе семафор, используемый для подсчета количества незавершенных задач, может в конечном итоге переполниться, вызвав исключение.
Примечание-1. Исключения queue.Empty и queue.Full
обычного модуля стандартной библиотеки queue
вызываются для сигнализации тайм-аутов. Эти исключения недоступны в пространстве имен модуля multiprocessing
, следовательно, для перехвата этих исключений необходимо импортировать.
Примечание-2. Когда объект помещается в очередь, то он обрабатывается, а фоновый поток позже сбрасывает обработанные данные в нижележащий канал. Это имеет некоторые последствия, которые немного удивительны, но не должны вызывать каких-либо практических трудностей - если они действительно беспокоят, то можно использовать очередь, созданную с помощью диспетчера multiprocessing.Manager
.
Queue.empty()
вернет False
, а метод Queue.get_nowait()
сможет вернуть результат, не поднимая исключения queue.Empty
.Предупреждение-1: если процесс убит, при попытке использовать очередь, с помощью метода Process.terminate()
или функции os.kill()
, то данные в очереди, скорее всего, будут повреждены. Это может привести к тому, что любой другой процесс, при попытке использовать очередь позже получит исключение.
Предупреждение-2. Как упоминалось выше, если дочерний процесс поместил элементы в очередь и не использовал метод Queue.cancel_join_thread()
, то этот процесс не завершится, пока все буферизованные элементы не будут сброшены в конвейер.
Это означает, что если попытаться присоединиться к этому процессу и все элементы, помещенные в очередь были израсходованы, то можно попасть в тупик. Точно так же, если дочерний процесс не является демоническим, тогда родительский процесс может зависнуть при выходе, когда он попытается присоединиться ко всем своим не демоническим дочерним процессам.
Queue
.Queue.qsize()
возвращает примерный размер очереди,Queue.empty()
проверяет, что очередь пуста,Queue.full()
проверяет, очередь заполнена,Queue.put()
ставит элемент в очередь.,Queue.put_nowait()
эквивалент метода Queue.put(obj, False)
,Queue.get()
удаляет и возвращает элемент из очереди,Queue.get_nowait()
эквивалент метода Queue.get(False)
,Queue.close()
закрывает очередь,Queue.join_thread()
присоединяется к фоновому потоку,Queue.cancel_join_thread()
предотвращает блокировку Queue.join_thread()
,Queue.qsize()
:Метод Queue.qsize()
возвращает примерный размер очереди. Из-за семантики многопоточности/многопроцессорности это число не является надежным.
Обратите внимание, что метод может вызвать исключение NotImplementedError
на платформах Unix, таких как Mac OS X, где системный вызов sem_getvalue()
не реализован.
Queue.empty()
:Метод Queue.empty()
возвращает True
, если очередь пуста и False
в противном случае. Из-за многопоточной/многопроцессорной семантики вызов метода ненадежен.
Queue.full()
:Метод Queue.full()
возвращает True
, если очередь заполнена и False
в противном случае. Из-за многопоточной/многопроцессорной семантики вызов метода ненадежен.
Queue.put(obj[, block[, timeout]])
:Метод Queue.put()
ставит obj
в очередь.
block=True
(по умолчанию), а аргумент timeout=None
(по умолчанию), то при необходимости блокируется, пока не станет доступен свободный слот. timeout
является положительным числом, то он блокируется не более timeout
секунд и поднимает исключение queue.Full
, если в течение этого времени не стал доступен свободный слот. block=False
и если свободный слот доступен, то сразу поместит элемент в очередь, иначе вызовет исключение queue.Full
. В этом случае тайм-аут timeout
игнорируется.Изменено в Python 3.8: Если очередь закрыта, то вместо AssertionError
вызывается исключение ValueError
.
Queue.put_nowait(obj)
:Метод Queue.put_nowait()
представляет собой эквивалент метода Queue.put(obj, False)
.
Queue.get([block[, timeout]])
:Метод Queue.get()
удаляет и возвращает элемент из очереди.
block=True
(по умолчанию), а аргумент timeout=None
(по умолчанию), то при необходимости блокируйтся, пока элемент не станет доступным. timeout
является положительным числом, то он блокируется не более timeout
секунд и поднимает исключение queue.Empty
, если в течение этого времени ни один элемент не был доступен. block=False
и если элемент доступен, то сразу извлечет его, иначе вызовет исключение queue.Empty
. В этом случае тайм-аут timeout
игнорируется.Изменено в версии 3.8: Если очередь закрыта, то вместо OSError
вызывается исключение ValueError
.
Queue.get_nowait()
:Метод Queue.put_nowait()
представляет собой эквивалент метода Queue.get(False)
.
Queue.close()
:Метод Queue.close()
указывает, что текущий процесс больше не будет помещать данные в эту очередь. Фоновый поток завершится, как только он сбросит все буферизованные данные в очередь, в нижележащий канал.
Метод вызывается автоматически, когда очередь обрабатывается сборщиком мусора.
Queue.join_thread()
:Метод Queue.join_thread()
присоединяется к фоновому потоку.
Метод можно использовать только после вызова метода Queue.close()
. Он блокируется до тех пор, пока фоновый поток не выйдет, гарантируя, что все данные в буфере были сброшены в очередь, в нижележащий канал.
По умолчанию, если процесс не является создателем очереди, то при выходе он попытается присоединиться к фоновому потоку очереди. Процесс может вызвать метод Queue.cancel_join_thread()
, чтобы метод Queue.join_thread()
не использовался.
Queue.cancel_join_thread()
:Метод Queue.cancel_join_thread()
предотвращает операцию блокировки методом Queue.join_thread()
. В частности, этот метод предотвращает автоматическое присоединение фонового потока при выходе из процесса.
Лучшим именем для этого метода может быть allow_exit_without_flush()
. Это, скорее всего, приведет к потере данных в очереди, которые почти наверняка не придется их использовать. Метод должен вызываться только в том случае, если необходимо, чтобы текущий процесс немедленно завершился, не дожидаясь отправки данных в очередь в нижележащий канал.
Примечание. Функциональность этого класса требует наличия работающей реализации общего семафора в операционной системе хоста. Без него функции этого класса будут отключены, а попытки создать экземпляр multiprocessing.Queue
приведут к ошибке ImportError
. То же самое верно для любого из специализированных типов очередей, перечисленных в начале материала.
JoinableQueue
.Объект очереди JoinableQueue
наследует все методы объекта Queue
и дополнительно имеет методы .task_done()
и .join()
.
JoinableQueue.task_done()
указывает, что текущая задача выполнена,JoinableQueue.join()
блокирует, очередь обрабатывается.JoinableQueue.task_done()
:Метод JoinableQueue.task_done()
указывает, что задача, ранее поставленная в очередь, завершена. Используется потребителями очереди.
Для каждого метода Queue.get()
, используемого для извлечения задачи из очереди, последующий вызов JoinableQueue.task_done()
сообщает очереди, что обработка задачи завершена.
Если метод JoinableQueue.join()
в настоящее время блокируется, то он возобновится, когда все элементы будут обработаны. Это означает, что вызов метода JoinableQueue.task_done()
был получен для каждого элемента, который был помещен Queue.put()
в очередь.
Если вызывается больше, чем было помещено элементов в очередь, то вызывает ошибку ValueError
.
JoinableQueue.join()
:Метод JoinableQueue.join()
блокирует, пока все элементы в очереди не будут получены и обработаны.
Счетчик незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь.
Счетчик уменьшается всякий раз, когда потребитель вызывает метод JoinableQueue.task_done()
, тем самым сообщая, что элемент был получен и вся работа над ним завершена. Когда количество незавершенных задач упадет до нуля, метод JoinableQueue.join()
разблокирует дальнейший ход программы.
SimpleQueue
.Объект SimpleQueue
- это упрощенный тип очереди, по поведению очень похожий на объект канала multiprocessing.Pipe
, работающий в однонаправленном режиме duplex=False
.
SimpleQueue.close()
закрывает очередь,SimpleQueue.empty()
проверяет, что очередь пуста,SimpleQueue.get()
возвращает и удаляет элемент из очереди,SimpleQueue.put()
ставит элемент в очередь,SimpleQueue.close()
:Метод SimpleQueue.close()
закрывает очередь и освобождает внутренние ресурсы.
Очередь не может больше использоваться после ее закрытия. Например, нельзя больше вызывать методы SimpleQueue.get()
, SimpleQueue.put()
и SimpleQueue.empty()
.
Новое в Python 3.9.
SimpleQueue.empty()
:Метод SimpleQueue.empty()
возвращает True
, если очередь пуста и False
в противном случае.
SimpleQueue.get()
:Метод SimpleQueue.get()
возвращает, а потом удаляет элемент из очереди.
SimpleQueue.put(item)
:Метод SimpleQueue.put()
ставит элемент item
в очередь.
import multiprocessing import time, random def worker(input, output): """Функция, выполняемая рабочими процессами""" for func, args in iter(input.get, 'STOP'): result = calculate(func, args) output.put(result) def calculate(func, args): """Функция, используемая для вычисления результата""" proc_name = multiprocessing.current_process().name result = func(*args) return f'{proc_name}, результат функции {func.__name__}{args} = {result}' ######################################## # Функции, на которые ссылаются задачи # ######################################## def mul(a, b): time.sleep(0.5*random.random()) return a * b def plus(a, b): time.sleep(0.5*random.random()) return a + b def test(): NUMBER_OF_PROCESSES = 4 TASKS1 = [(mul, (i, 7)) for i in range(20)] TASKS2 = [(plus, (i, 8)) for i in range(10)] # Создание очередей task_queue = multiprocessing.Queue() done_queue = multiprocessing.Queue() # Заполнение очереди заданий for task in TASKS1: task_queue.put(task) # Запуск рабочих процессов for i in range(NUMBER_OF_PROCESSES): multiprocessing.Process(target=worker, args=(task_queue, done_queue)).start() # Получение и печать результатов print('НЕУПОРЯДОЧЕННЫЕ РЕЗУЛЬТАТЫ:\n') print('TASKS1:\n') for i in range(len(TASKS1)): print('\t', done_queue.get()) # Добавляем больше задач с помощью метода `put()` for task in TASKS2: task_queue.put(task) # Выводим еще несколько результатов print('TASKS2:\n') for i in range(len(TASKS2)): print('\t', done_queue.get()) # Говорим дочерним процессам остановиться print('STOP.') for i in range(NUMBER_OF_PROCESSES): task_queue.put('STOP') if __name__ == '__main__': test()