from multiprocessing import Pool pool = Pool([processes[, initializer [, initargs[, maxtasksperchild [, context]]]]])
processes - количество используемых рабочих процессов,initializer - вызываемый объект (функция),initargs - аргументами для initializermaxtasksperchild - количество задач рабочего процесса до обновления,context - контекста для запуска рабочих процессов.Класс Pool() модуля multiprocessing создает объект, управляющий пулом рабочих процессов, в который могут быть отправлены задания. Пул рабочих процессов поддерживает асинхронное выполнение задач с тайм-аутами и обратными вызовами и имеет параллельную реализацию.
Аргумент processes - это количество используемых рабочих процессов. Если аргумент processes не указан, то используется число, возвращаемое функцией os.cpu_count(). (Изменено в Python 3.13: теперь число по умолчанию определяется os.process_cpu_count().)
Если аргумент initializer не равен None, то при запуске каждый рабочий процесс будет выполнять вызываемый объект initializer с аргументами *initargs в виде: initializer(*initargs)
Аргумент maxtasksperchild - это количество задач, которые рабочий процесс может выполнить до того, как он выйдет и будет заменен новым рабочим процессом, чтобы освободить неиспользуемые ресурсы. По умолчанию для maxtasksperchild установлено значение None, это означает, что рабочие процессы будут жить столько же, сколько и пул.
Аргумент context можно использовать для указания контекста, используемого для запуска рабочих процессов. Обычно пул создается с помощью создания экземпляра класса multiprocessing.Pool() или объекта контекста context.Pool(), созданного функцией multiprocessing.get_context(). В обоих случаях контекст устанавливается соответствующим образом.
Обратите внимание, что методы объекта пула должны вызываться только процессом, создавшим пул. Проще говоря методы должны вызываться из кода, где был создан и запущен пул рабочих процессов.
Предупреждение. Объекты multiprocessing.Pool имеют внутренние ресурсы, которыми необходимо правильно управлять, используя пул с менеджером контекста или вручную вызывая методы пула Pool.close() и Pool.terminate(). Невыполнение этого требования может привести к зависанию процесса при завершении.
Обратите внимание, что неправильно полагаться на сборщик мусора для уничтожения пула, поскольку CPython не гарантирует, что будет вызван финализатор пула.
Примечание. Рабочие процессы в пуле обычно живут в течение всего срока рабочей очереди пула. Часто используемый в других системах (например, Apache, mod_wsgi и т. д.) шаблон для освобождения ресурсов, удерживаемых рабочими процессами, заключается в том, чтобы позволить им выполнить только заданный объем работы перед его выходом, очисткой и созданием нового процесса. Для пула процессов, эту возможность пользователю предоставляет аргумент maxtasksperchild.
Pool.Pool.apply() вызывает функцию с аргументами,Pool.apply_async() асинхронный вариант метода Pool.apply(),Pool.map() многопроцессорный эквивалент встроенной функции map(),Pool.map_async() асинхронный вариант метода Pool.map(),Pool.imap() более ленивая версия метода Pool.map(),Pool.imap_unordered() то же самое, что и Pool.imap(), только результаты идут по готовности,Pool.starmap() аналогичен методу Pool.map(), только другая передача аргументов,Pool.starmap_async() комбинация методов Pool.starmap() и Pool.map_async(),Pool.close() предотвращает отправку задач в пул,Pool.terminate() останавливает рабочие процессы,Pool.join() ждет, пока рабочие процессы закончатся,AsyncResult результат вызовов методов Pool.apply_async() и Pool.map_async()AsyncResult.get() возвращает результат, как только он придет,AsyncResult.wait() ждет, пока будет доступен результат,AsyncResult.ready() проверяет, завершился ли вызов,AsyncResult.successful() проверяет, был ли завершен вызов без исключения.Pool.Pool.apply(func[, args[, kwds]]):Метод Pool.apply() вызывает функцию func с аргументами args и ключевыми аргументами kwds. Блокирует, пока не будет готов результат.
С учетом этой блокировки, для параллельной работы лучше подходит метод Pool.apply_async(). Кроме того, аргумент метода func выполняется только в одном рабочем процессе пула.
Pool.apply_async(func[, args[, kwds[, callback[, error_callback]]]]):Метод Pool.apply_async() асинхронный вариант метода Pool.apply(), который возвращает объект AsyncResult.
Если указан обратный вызов callback, то это должен быть вызываемый объект, принимающий единственный аргумент. Когда результат становится готовым, к нему применяется этот обратный вызов. Если вызов callback не удался, то в этом случае вместо него применяется error_callback.
Если указан error_callback, то это должен быть вызываемый объект, который принимает единственный аргумент. Если целевая функция обратного вызова терпит неудачу, то вызывается error_callback с экземпляром исключения в качестве аргумента.
Обратные вызовы должны завершаться немедленно, в противном случае асинхронный поток, обрабатывающий результаты, будет заблокирован.
Pool.map(func, iterable[, chunksize]):Метод Pool.map() представляет собой многопроцессорный эквивалент встроенной функции map().
Аргумент метода iterable представляет из себя итерацию, элементы которой не распаковываются при передаче в функцию func. То есть функция func(x), может иметь только один аргумент x. Другими словами, если iterable=[(1, 2), (10, 20), ...], то запуск задач будет происходить следующим образом: [func((1, 2)), func((10, 20)), ...], где x будет равен кортежу, например (1, 2).
Если необходимо использовать функцию с несколькими аргументами, например func(x, y) и распаковывать итерации при запуске, например: [func(1, 2), func(10, 20), ...], то посмотрите в сторону метода Pool.starmap().
Метод блокирует выполнение программы до получения результатов работы всеми запущенными процессами.
Метод Pool.map() разбивает итерируемый объект на несколько частей, которые отправляет в пул процессов как отдельные задачи. Приблизительный размер этих фрагментов можно указать, задав для аргумента метода chunksize положительное целое число.
Обратите внимание, что Pool.map() может привести к высокому использованию памяти для очень длинных итераций. Для большей эффективности рассмотрите возможность использования метода Pool.imap() или Pool.imap_unordered() с явной опцией chunksize.
Pool.map_async(func, iterable[, chunksize[, callback[, error_callback]]]):Метод Pool.map_async() вариант метода Pool.map(), который возвращает объект AsyncResult.
Если указан обратный вызов callback, то это должен быть вызываемый объект, принимающий единственный аргумент. Когда результат становится готовым, к нему применяется этот обратный вызов. Если вызов callback не удался, то в этом случае вместо него применяется error_callback.
Если указан error_callback, то это должен быть вызываемый объект, который принимает единственный аргумент. Если целевая функция обратного вызова терпит неудачу, то вызывается error_callback с экземпляром исключения в качестве аргумента.
Обратные вызовы должны завершаться немедленно, в противном случае асинхронный поток, обрабатывающий результаты, будет заблокирован.
Pool.imap(func, iterable[, chunksize]):Метод Pool.imap() представляет из себя более ленивую версию метода Pool.map().
Аргумент метода chunksize совпадает с аргументом, используемым методом Pool.map(). Для очень длинных итераций использование большого значения chunksize может значительно ускорить выполнение задания, чем использование значения по умолчанию 1.
Также, если chunksize=1, то метод итератора it.next(), возвращаемый методом Pool.imap(), будет иметь необязательный параметр тайм-аута: .next(timeout). Этот метод итератора вызовет TimeoutError, если результат не может быть возвращен в течение timeout секунд.
Например:
from multiprocessing import Pool def worker(x): return x*x if __name__ == '__main__': # запускаем 4 рабочих процесса with Pool(processes=4) as pool: it = pool.imap(worker, range(10)) # использование встроенной функции next() print(next(it)) # выведет 0 print(next(it)) # выведет 1 # использование метода-итератора с аргументом `timeout` # выведет "4" если компьютер не очень медленный print(it.next(timeout=1))
Pool.imap_unordered(func, iterable[, chunksize]):Метод Pool.imap_unordered() то же самое, что и Pool.imap(), за исключением того, что порядок результатов возвращаемого итератора следует считать произвольным.
Только когда есть только один рабочий процесс, порядок гарантированно будет "правильным".
Pool.starmap(func, iterable[, chunksize]):Метод Pool.starmap() аналогичен методу Pool.map(), за исключением того, что элементы iterable, как ожидается, будут то же итерируемыми, которые будут распаковываться в качестве аргументов.
Например, если iterable=[(1, 2), (3, 4)], то результат передачи аргументов в вызываемый объект func будет таким: [func(1, 2), func(3, 4)].
Pool.starmap_async(func, iterable[, chunksize[, callback[, error_callback]]]):Метод Pool.starmap_async() представляет собой комбинацию методов Pool.starmap() и Pool.map_async().
Метод выполняет итерацию по iterable, распаковывает их и вызывает функцию func с распакованными аргументами из iterable. Возвращает объект результата.
Pool.close():Метод Pool.close() предотвращает отправку задач в пул. Как только все задачи будут выполнены, рабочие процессы завершатся.
Pool.terminate():Метод Pool.terminate() останавливает рабочие процессы немедленно, не давая завершить невыполненную работу. Метод будет вызван немедленно, когда объект пула будет обработан сборщиком мусора.
Pool.join():Метод Pool.join() ждет, пока запущенные рабочие процессы закончатся. Перед использованием Pool.join() необходимо вызвать Pool.close() или Pool.terminate().
AsyncResult.Объект AsyncResult представляет собой результат, возвращаемый методами Pool.apply_async() и Pool.map_async().
Объект AsyncResult определяет следующие методы:
AsyncResult.get() возвращает результат, как только он придет,AsyncResult.wait() ждет, пока будет доступен результат,AsyncResult.ready() проверяет, завершился ли вызов,AsyncResult.successful() проверяет, был ли завершен вызов без исключения.AsyncResult.get([timeout]):Метод AsyncResult.get() возвращает результат, как только он придет.
Если тайм-аут timeout не равен None и результат не приходит в течение этого тайм-аута, то возникает исключение TimeoutError. Если удаленный вызов вызвал исключение, это исключение будет вызвано повторно.
AsyncResult.wait([timeout]):Метод AsyncResult.wait() ждет, пока будет доступен результат или пока не пройдет время таймаута timeout.
AsyncResult.ready():Метод AsyncResult.ready() проверяет, завершился ли вызов.
AsyncResult.successful():Метод AsyncResult.successful() проверяет, был ли завершен вызов без возникновения исключения. Поднимет исключение ValueError, если результат не готов.
Изменено в Python 3.7: Если результат не готов, вместо AssertionError возникает ValueError.
В следующем примере показано использование пула:
from multiprocessing import Pool, TimeoutError, current_process import time, os def worker(x): print('WORKER() => ', current_process().name) return x*x if __name__ == '__main__': # запуск 4 рабочих процессов with Pool(processes=4) as pool: # результаты получим в # порядке поступления задач res = pool.map(worker, range(10)) print(res) # результаты получим в порядке их # готовности (могут быть не по порядку) for i in pool.imap_unordered(worker, range(10)): print(i, end=', ') print() # вычислит "worker(20)" асинхронно # запустится только один процесс res = pool.apply_async(worker, (20,)) # получение результата async_worker = res.get(timeout=1) print('1 процесс, worker(20) => ', async_worker) # вычислит "os.getpid()" асинхронно # запустится только один процесс res = pool.apply_async(os.getpid, ()) # получение результата async_getpid = res.get(timeout=1) print('1 процесс, os.getpid() => ', async_getpid) # запуск нескольких асинхронных вычислений # *может* использовать больше процессов multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)] # получение асинхронных результатов async_multi = [res.get(timeout=1) for res in multiple_results] print('4 асинхронных процесса, os.getpid():') print(async_multi) # заставим спать один рабочий в течение 10 секунд res = pool.apply_async(time.sleep, (10,)) try: # получение результата res_sleep = res.get(timeout=1) print(res_sleep) except TimeoutError: print("time.sleep(10) => получили multiprocessing.TimeoutError") print("На этот момент пул остается доступным для дополнительной работы") # выход из блока 'with' остановил пул print("Теперь пул закрыт и больше не доступен") # WORKER() => ForkPoolWorker-1 # WORKER() => ForkPoolWorker-2 # WORKER() => ForkPoolWorker-1 # WORKER() => ForkPoolWorker-3 # WORKER() => ForkPoolWorker-2 # WORKER() => ForkPoolWorker-2 # WORKER() => ForkPoolWorker-4 # WORKER() => ForkPoolWorker-1 # WORKER() => ForkPoolWorker-3 # WORKER() => ForkPoolWorker-2 # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] # WORKER() => ForkPoolWorker-4 # WORKER() => ForkPoolWorker-3 # WORKER() => ForkPoolWorker-4 # WORKER() => ForkPoolWorker-1 # WORKER() => ForkPoolWorker-4 # WORKER() => ForkPoolWorker-2 # WORKER() => ForkPoolWorker-4 # WORKER() => ForkPoolWorker-1 # WORKER() => ForkPoolWorker-3 # WORKER() => ForkPoolWorker-2 # 0, 1, 4, 9, 16, 25, 49, 36, 81, 64, # WORKER() => ForkPoolWorker-4 # 1 процесс, worker(20) => 400 # 1 процесс, os.getpid() => 30330 # 4 асинхронных процесса, os.getpid(): # [30330, 30330, 30333, 30330] # time.sleep(10) => получили multiprocessing.TimeoutError # На этот момент пул остается доступным для дополнительной работы # Теперь пул закрыт и больше не доступен
Pool.Вывод теста основной функциональности и методов объекта Pool очень длинный и приводиться не будет. Кому интересно, скопируйте код и запустите сами.
# test.py import multiprocessing, random import time, sys ######################################## # Функции, используемые тестовым кодом # ######################################## def calculate(func, args): proc_name = multiprocessing.current_process().name proc_name = proc_name.replace('Fork', '') result = func(*args) return f'Процесс {proc_name}, вывод функции {func.__name__}{args} = {result}' def calculatestar(args): return calculate(*args) def mul(a, b): time.sleep(0.5 * random.random()) return a * b def plus(a, b): time.sleep(0.5 * random.random()) return a + b def zero(x): return 1.0 / (x - 5.0) ######################################################### # Тестирование основной функциональности объекта `Pool` # ######################################################### def test(): PROCESSES = 4 print(f'\nСОЗДАНИЕ ПУЛА С {PROCESSES} ПРОЦЕССАМИ...') with multiprocessing.Pool(PROCESSES) as pool: ############################## # ТЕСТЫ # ############################## TASKS = [(mul, (i, 7)) for i in range(10)] + \ [(plus, (i, 8)) for i in range(10)] results = [pool.apply_async(calculate, t) for t in TASKS] imap_it = pool.imap(calculatestar, TASKS) imap_unordered_it = pool.imap_unordered(calculatestar, TASKS) print('\nУпорядоченные результаты: pool.apply_async():') for r in results: print('\t', r.get()) print('\nУпорядоченные результаты: pool.imap():') for x in imap_it: print('\t', x) print('\nНеупорядоченные результаты: pool.imap_unordered():') for x in imap_unordered_it: print('\t', x) print('\nУпорядоченные результаты: pool.map() => блокировка до полного завершения:') for x in pool.map(calculatestar, TASKS): print('\t', x) ################################# # ТЕСТИРОВАНИЕ ОБРАБОТКИ ОШИБОК # ################################# print('Тестирования обработки ошибок:') try: print(pool.apply(zero, (5,))) except ZeroDivisionError: print('\tПолучено `ZeroDivisionError`, как ожидалось от pool.apply()') else: raise AssertionError('expected ZeroDivisionError') try: print(pool.map(zero, list(range(10)))) except ZeroDivisionError: print('\tПолучено `ZeroDivisionError`, как ожидалось от pool.map()') else: raise AssertionError('expected ZeroDivisionError') try: print(list(pool.imap(zero, list(range(10))))) except ZeroDivisionError: print('\tПолучено `ZeroDivisionError`, как ожидалось от list(pool.imap())') else: raise AssertionError('expected ZeroDivisionError') it = pool.imap(zero, list(range(10))) for i in range(10): try: x = next(it) except ZeroDivisionError: if i == 5: pass except StopIteration: break else: if i == 5: raise AssertionError('expected ZeroDivisionError') assert i == 9 print('\tПолучено `ZeroDivisionError`, как ожидалось от IMapIterator.next()\n') ########################### # ТЕСТИРОВАНИЕ ТАЙМ-АУТОВ # ########################### print('Тестирование объекта ApplyResult.get() с тайм-аутом:', end=' ') res = pool.apply_async(calculate, TASKS[0]) while 1: sys.stdout.flush() try: sys.stdout.write(f'\n\t{res.get(0.02)}') break except multiprocessing.TimeoutError: sys.stdout.write('.') print('\n\nТестирование объекта IMapIterator.next() с тайм-аутом:', end=' ') it = pool.imap(calculatestar, TASKS) while 1: sys.stdout.flush() try: sys.stdout.write(f'\n\t{it.next(0.02)}') except StopIteration: break except multiprocessing.TimeoutError: sys.stdout.write('.') print('\n') if __name__ == '__main__': test()