from multiprocessing import Pool pool = Pool([processes[, initializer [, initargs[, maxtasksperchild [, context]]]]])
processes
- количество используемых рабочих процессов,initializer
- вызываемый объект (функция),initargs
- аргументами для initializer
maxtasksperchild
- количество задач рабочего процесса до обновления,context
- контекста для запуска рабочих процессов.Класс Pool()
модуля multiprocessing
создает объект, управляющий пулом рабочих процессов, в который могут быть отправлены задания. Пул рабочих процессов поддерживает асинхронное выполнение задач с тайм-аутами и обратными вызовами и имеет параллельную реализацию.
Аргумент processes
- это количество используемых рабочих процессов. Если аргумент processes
не указан, то используется число, возвращаемое функцией os.cpu_count()
.
Если аргумент initializer
не равен None
, то при запуске каждый рабочий процесс будет выполнять вызываемый объект initializer
с аргументами *initargs
в виде: initializer(*initargs)
Аргумент maxtasksperchild
- это количество задач, которые рабочий процесс может выполнить до того, как он выйдет и будет заменен новым рабочим процессом, чтобы освободить неиспользуемые ресурсы. По умолчанию для maxtasksperchild
установлено значение None
, это означает, что рабочие процессы будут жить столько же, сколько и пул.
Аргумент context
можно использовать для указания контекста, используемого для запуска рабочих процессов. Обычно пул создается с помощью создания экземпляра класса multiprocessing.Pool()
или объекта контекста context.Pool()
, созданного функцией multiprocessing.get_context()
. В обоих случаях контекст устанавливается соответствующим образом.
Обратите внимание, что методы объекта пула должны вызываться только процессом, создавшим пул. Проще говоря методы должны вызываться из кода, где был создан и запущен пул рабочих процессов.
Предупреждение. Объекты multiprocessing.Pool
имеют внутренние ресурсы, которыми необходимо правильно управлять, используя пул с менеджером контекста или вручную вызывая методы пула Pool.close()
и Pool.terminate()
. Невыполнение этого требования может привести к зависанию процесса при завершении.
Обратите внимание, что неправильно полагаться на сборщик мусора для уничтожения пула, поскольку CPython не гарантирует, что будет вызван финализатор пула.
Примечание. Рабочие процессы в пуле обычно живут в течение всего срока рабочей очереди пула. Часто используемый в других системах (например, Apache, mod_wsgi и т. д.) шаблон для освобождения ресурсов, удерживаемых рабочими процессами, заключается в том, чтобы позволить им выполнить только заданный объем работы перед его выходом, очисткой и созданием нового процесса. Для пула процессов, эту возможность пользователю предоставляет аргумент maxtasksperchild
.
Pool
.Pool.apply()
вызывает функцию с аргументами,Pool.apply_async()
асинхронный вариант метода Pool.apply()
,Pool.map()
многопроцессорный эквивалент встроенной функции map()
,Pool.map_async()
асинхронный вариант метода Pool.map()
,Pool.imap()
более ленивая версия метода Pool.map()
,Pool.imap_unordered()
то же самое, что и Pool.imap()
, только результаты идут по готовности,Pool.starmap()
аналогичен методу Pool.map()
, только другая передача аргументов,Pool.starmap_async()
комбинация методов Pool.starmap()
и Pool.map_async()
,Pool.close()
предотвращает отправку задач в пул,Pool.terminate()
останавливает рабочие процессы,Pool.join()
ждет, пока рабочие процессы закончатся,AsyncResult
результат вызовов методов Pool.apply_async()
и Pool.map_async()
AsyncResult.get()
возвращает результат, как только он придет,AsyncResult.wait()
ждет, пока будет доступен результат,AsyncResult.ready()
проверяет, завершился ли вызов,AsyncResult.successful()
проверяет, был ли завершен вызов без исключения.Pool
.Pool.apply(func[, args[, kwds]])
:Метод Pool.apply()
вызывает функцию func
с аргументами args
и ключевыми аргументами kwds
. Блокирует, пока не будет готов результат.
С учетом этой блокировки, для параллельной работы лучше подходит метод Pool.apply_async()
. Кроме того, аргумент метода func
выполняется только в одном рабочем процессе пула.
Pool.apply_async(func[, args[, kwds[, callback[, error_callback]]]])
:Метод Pool.apply_async()
асинхронный вариант метода Pool.apply()
, который возвращает объект AsyncResult
.
Если указан обратный вызов callback
, то это должен быть вызываемый объект, принимающий единственный аргумент. Когда результат становится готовым, к нему применяется этот обратный вызов. Если вызов callback
не удался, то в этом случае вместо него применяется error_callback
.
Если указан error_callback
, то это должен быть вызываемый объект, который принимает единственный аргумент. Если целевая функция обратного вызова терпит неудачу, то вызывается error_callback
с экземпляром исключения в качестве аргумента.
Обратные вызовы должны завершаться немедленно, в противном случае асинхронный поток, обрабатывающий результаты, будет заблокирован.
Pool.map(func, iterable[, chunksize])
:Метод Pool.map()
представляет собой многопроцессорный эквивалент встроенной функции map()
.
Аргумент метода iterable
представляет из себя итерацию, элементы которой не распаковываются при передаче в функцию func
. То есть функция func(x)
, может иметь только один аргумент x
. Другими словами, если iterable=[(1, 2), (10, 20), ...]
, то запуск задач будет происходить следующим образом: [func((1, 2)), func((10, 20)), ...]
, где x
будет равен кортежу, например (1, 2)
.
Если необходимо использовать функцию с несколькими аргументами, например func(x, y)
и распаковывать итерации при запуске, например: [func(1, 2), func(10, 20), ...]
, то посмотрите в сторону метода Pool.starmap()
.
Метод блокирует выполнение программы до получения результатов работы всеми запущенными процессами.
Метод Pool.map()
разбивает итерируемый объект на несколько частей, которые отправляет в пул процессов как отдельные задачи. Приблизительный размер этих фрагментов можно указать, задав для аргумента метода chunksize
положительное целое число.
Обратите внимание, что Pool.map()
может привести к высокому использованию памяти для очень длинных итераций. Для большей эффективности рассмотрите возможность использования метода Pool.imap()
или Pool.imap_unordered()
с явной опцией chunksize
.
Pool.map_async(func, iterable[, chunksize[, callback[, error_callback]]])
:Метод Pool.map_async()
вариант метода Pool.map()
, который возвращает объект AsyncResult
.
Если указан обратный вызов callback
, то это должен быть вызываемый объект, принимающий единственный аргумент. Когда результат становится готовым, к нему применяется этот обратный вызов. Если вызов callback
не удался, то в этом случае вместо него применяется error_callback
.
Если указан error_callback
, то это должен быть вызываемый объект, который принимает единственный аргумент. Если целевая функция обратного вызова терпит неудачу, то вызывается error_callback
с экземпляром исключения в качестве аргумента.
Обратные вызовы должны завершаться немедленно, в противном случае асинхронный поток, обрабатывающий результаты, будет заблокирован.
Pool.imap(func, iterable[, chunksize])
:Метод Pool.imap()
представляет из себя более ленивую версию метода Pool.map()
.
Аргумент метода chunksize
совпадает с аргументом, используемым методом Pool.map()
. Для очень длинных итераций использование большого значения chunksize
может значительно ускорить выполнение задания, чем использование значения по умолчанию 1.
Также, если chunksize=1
, то метод итератора it.next()
, возвращаемый методом Pool.imap()
, будет иметь необязательный параметр тайм-аута: .next(timeout)
. Этот метод итератора вызовет TimeoutError
, если результат не может быть возвращен в течение timeout
секунд.
Например:
from multiprocessing import Pool def worker(x): return x*x if __name__ == '__main__': # запускаем 4 рабочих процесса with Pool(processes=4) as pool: it = pool.imap(worker, range(10)) # использование встроенной функции next() print(next(it)) # выведет 0 print(next(it)) # выведет 1 # использование метода-итератора с аргументом `timeout` # выведет "4" если компьютер не очень медленный print(it.next(timeout=1))
Pool.imap_unordered(func, iterable[, chunksize])
:Метод Pool.imap_unordered()
то же самое, что и Pool.imap()
, за исключением того, что порядок результатов возвращаемого итератора следует считать произвольным.
Только когда есть только один рабочий процесс, порядок гарантированно будет "правильным".
Pool.starmap(func, iterable[, chunksize])
:Метод Pool.starmap()
аналогичен методу Pool.map()
, за исключением того, что элементы iterable
, как ожидается, будут то же итерируемыми, которые будут распаковываться в качестве аргументов.
Например, если iterable=[(1, 2), (3, 4)]
, то результат передачи аргументов в вызываемый объект func
будет таким: [func(1, 2), func(3, 4)]
.
Pool.starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])
:Метод Pool.starmap_async()
представляет собой комбинацию методов Pool.starmap()
и Pool.map_async()
.
Метод выполняет итерацию по iterable
, распаковывает их и вызывает функцию func
с распакованными аргументами из iterable
. Возвращает объект результата.
Pool.close()
:Метод Pool.close()
предотвращает отправку задач в пул. Как только все задачи будут выполнены, рабочие процессы завершатся.
Pool.terminate()
:Метод Pool.terminate()
останавливает рабочие процессы немедленно, не давая завершить невыполненную работу. Метод будет вызван немедленно, когда объект пула будет обработан сборщиком мусора.
Pool.join()
:Метод Pool.join()
ждет, пока запущенные рабочие процессы закончатся. Перед использованием Pool.join()
необходимо вызвать Pool.close()
или Pool.terminate()
.
AsyncResult
.Объект AsyncResult
представляет собой результат, возвращаемый методами Pool.apply_async()
и Pool.map_async()
.
Объект AsyncResult
определяет следующие методы:
AsyncResult.get()
возвращает результат, как только он придет,AsyncResult.wait()
ждет, пока будет доступен результат,AsyncResult.ready()
проверяет, завершился ли вызов,AsyncResult.successful()
проверяет, был ли завершен вызов без исключения.AsyncResult.get([timeout])
:Метод AsyncResult.get()
возвращает результат, как только он придет.
Если тайм-аут timeout
не равен None
и результат не приходит в течение этого тайм-аута, то возникает исключение TimeoutError
. Если удаленный вызов вызвал исключение, это исключение будет вызвано повторно.
AsyncResult.wait([timeout])
:Метод AsyncResult.wait()
ждет, пока будет доступен результат или пока не пройдет время таймаута timeout
.
AsyncResult.ready()
:Метод AsyncResult.ready()
проверяет, завершился ли вызов.
AsyncResult.successful()
:Метод AsyncResult.successful()
проверяет, был ли завершен вызов без возникновения исключения. Поднимет исключение ValueError
, если результат не готов.
Изменено в Python 3.7: Если результат не готов, вместо AssertionError
возникает ValueError
.
В следующем примере показано использование пула:
from multiprocessing import Pool, TimeoutError, current_process import time, os def worker(x): print('WORKER() => ', current_process().name) return x*x if __name__ == '__main__': # запуск 4 рабочих процессов with Pool(processes=4) as pool: # результаты получим в # порядке поступления задач res = pool.map(worker, range(10)) print(res) # результаты получим в порядке их # готовности (могут быть не по порядку) for i in pool.imap_unordered(worker, range(10)): print(i, end=', ') print() # вычислит "worker(20)" асинхронно # запустится только один процесс res = pool.apply_async(worker, (20,)) # получение результата async_worker = res.get(timeout=1) print('1 процесс, worker(20) => ', async_worker) # вычислит "os.getpid()" асинхронно # запустится только один процесс res = pool.apply_async(os.getpid, ()) # получение результата async_getpid = res.get(timeout=1) print('1 процесс, os.getpid() => ', async_getpid) # запуск нескольких асинхронных вычислений # *может* использовать больше процессов multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] # получение асинхронных результатов async_multi = [res.get(timeout=1) for res in multiple_results] print('4 асинхронных процесса, os.getpid():') print(async_multi) # заставим спать один рабочий в течение 10 секунд res = pool.apply_async(time.sleep, (10,)) try: # получение результата res_sleep = res.get(timeout=1) print(res_sleep) except TimeoutError: print("time.sleep(10) => получили multiprocessing.TimeoutError") print("На этот момент пул остается доступным для дополнительной работы") # выход из блока 'with' остановил пул print("Теперь пул закрыт и больше не доступен") # WORKER() => ForkPoolWorker-1 # WORKER() => ForkPoolWorker-2 # WORKER() => ForkPoolWorker-1 # WORKER() => ForkPoolWorker-3 # WORKER() => ForkPoolWorker-2 # WORKER() => ForkPoolWorker-2 # WORKER() => ForkPoolWorker-4 # WORKER() => ForkPoolWorker-1 # WORKER() => ForkPoolWorker-3 # WORKER() => ForkPoolWorker-2 # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # WORKER() => ForkPoolWorker-4 # WORKER() => ForkPoolWorker-3 # WORKER() => ForkPoolWorker-4 # WORKER() => ForkPoolWorker-1 # WORKER() => ForkPoolWorker-4 # WORKER() => ForkPoolWorker-2 # WORKER() => ForkPoolWorker-4 # WORKER() => ForkPoolWorker-1 # WORKER() => ForkPoolWorker-3 # WORKER() => ForkPoolWorker-2 # 0, 1, 4, 9, 16, 25, 49, 36, 81, 64, # WORKER() => ForkPoolWorker-4 # 1 процесс, worker(20) => 400 # 1 процесс, os.getpid() => 30330 # 4 асинхронных процесса, os.getpid(): # [30330, 30330, 30333, 30330] # time.sleep(10) => получили multiprocessing.TimeoutError # На этот момент пул остается доступным для дополнительной работы # Теперь пул закрыт и больше не доступен
Pool
.Вывод теста основной функциональности и методов объекта Pool
очень длинный и приводиться не будет. Кому интересно, скопируйте код и запустите сами.
# test.py import multiprocessing, random import time, sys ######################################## # Функции, используемые тестовым кодом # ######################################## def calculate(func, args): proc_name = multiprocessing.current_process().name proc_name = proc_name.replace('Fork', '') result = func(*args) return f'Процесс {proc_name}, вывод функции {func.__name__}{args} = {result}' def calculatestar(args): return calculate(*args) def mul(a, b): time.sleep(0.5 * random.random()) return a * b def plus(a, b): time.sleep(0.5 * random.random()) return a + b def zero(x): return 1.0 / (x - 5.0) ######################################################### # Тестирование основной функциональности объекта `Pool` # ######################################################### def test(): PROCESSES = 4 print(f'\nСОЗДАНИЕ ПУЛА С {PROCESSES} ПРОЦЕССАМИ...') with multiprocessing.Pool(PROCESSES) as pool: ############################## # ТЕСТЫ # ############################## TASKS = [(mul, (i, 7)) for i in range(10)] + \ [(plus, (i, 8)) for i in range(10)] results = [pool.apply_async(calculate, t) for t in TASKS] imap_it = pool.imap(calculatestar, TASKS) imap_unordered_it = pool.imap_unordered(calculatestar, TASKS) print('\nУпорядоченные результаты: pool.apply_async():') for r in results: print('\t', r.get()) print('\nУпорядоченные результаты: pool.imap():') for x in imap_it: print('\t', x) print('\nНеупорядоченные результаты: pool.imap_unordered():') for x in imap_unordered_it: print('\t', x) print('\nУпорядоченные результаты: pool.map() => блокировка до полного завершения:') for x in pool.map(calculatestar, TASKS): print('\t', x) ################################# # ТЕСТИРОВАНИЕ ОБРАБОТКИ ОШИБОК # ################################# print('Тестирования обработки ошибок:') try: print(pool.apply(zero, (5,))) except ZeroDivisionError: print('\tПолучено `ZeroDivisionError`, как ожидалось от pool.apply()') else: raise AssertionError('expected ZeroDivisionError') try: print(pool.map(zero, list(range(10)))) except ZeroDivisionError: print('\tПолучено `ZeroDivisionError`, как ожидалось от pool.map()') else: raise AssertionError('expected ZeroDivisionError') try: print(list(pool.imap(zero, list(range(10))))) except ZeroDivisionError: print('\tПолучено `ZeroDivisionError`, как ожидалось от list(pool.imap())') else: raise AssertionError('expected ZeroDivisionError') it = pool.imap(zero, list(range(10))) for i in range(10): try: x = next(it) except ZeroDivisionError: if i == 5: pass except StopIteration: break else: if i == 5: raise AssertionError('expected ZeroDivisionError') assert i == 9 print('\tПолучено `ZeroDivisionError`, как ожидалось от IMapIterator.next()\n') ########################### # ТЕСТИРОВАНИЕ ТАЙМ-АУТОВ # ########################### print('Тестирование объекта ApplyResult.get() с тайм-аутом:', end=' ') res = pool.apply_async(calculate, TASKS[0]) while 1: sys.stdout.flush() try: sys.stdout.write(f'\n\t{res.get(0.02)}') break except multiprocessing.TimeoutError: sys.stdout.write('.') print('\n\nТестирование объекта IMapIterator.next() с тайм-аутом:', end=' ') it = pool.imap(calculatestar, TASKS) while 1: sys.stdout.flush() try: sys.stdout.write(f'\n\t{it.next(0.02)}') except StopIteration: break except multiprocessing.TimeoutError: sys.stdout.write('.') print('\n') if __name__ == '__main__': test()