Сообщить об ошибке.

Класс Pool() модуля multiprocessing в Python

Создание, запуск и получение результатов от пула процессов

Синтаксис:

from multiprocessing import Pool

pool = Pool([processes[, initializer
            [, initargs[, maxtasksperchild
            [, context]]]]])

Параметры:

  • processes - количество используемых рабочих процессов,
  • initializer - вызываемый объект (функция),
  • initargs - аргументами для initializer
  • maxtasksperchild - количество задач рабочего процесса до обновления,
  • context - контекста для запуска рабочих процессов.

Возвращаемое значение:

Описание:

Класс Pool() модуля multiprocessing создает объект, управляющий пулом рабочих процессов, в который могут быть отправлены задания. Пул рабочих процессов поддерживает асинхронное выполнение задач с тайм-аутами и обратными вызовами и имеет параллельную реализацию.

Аргумент processes - это количество используемых рабочих процессов. Если аргумент processes не указан, то используется число, возвращаемое функцией os.cpu_count().

Если аргумент initializer не равен None, то при запуске каждый рабочий процесс будет выполнять вызываемый объект initializer с аргументами *initargs в виде: initializer(*initargs)

Аргумент maxtasksperchild - это количество задач, которые рабочий процесс может выполнить до того, как он выйдет и будет заменен новым рабочим процессом, чтобы освободить неиспользуемые ресурсы. По умолчанию для maxtasksperchild установлено значение None, это означает, что рабочие процессы будут жить столько же, сколько и пул.

Аргумент context можно использовать для указания контекста, используемого для запуска рабочих процессов. Обычно пул создается с помощью создания экземпляра класса multiprocessing.Pool() или объекта контекста context.Pool(), созданного функцией multiprocessing.get_context(). В обоих случаях контекст устанавливается соответствующим образом.

Обратите внимание, что методы объекта пула должны вызываться только процессом, создавшим пул. Проще говоря методы должны вызываться из кода, где был создан и запущен пул рабочих процессов.

Предупреждение. Объекты multiprocessing.Pool имеют внутренние ресурсы, которыми необходимо правильно управлять, используя пул с менеджером контекста или вручную вызывая методы пула Pool.close() и Pool.terminate(). Невыполнение этого требования может привести к зависанию процесса при завершении.

Обратите внимание, что неправильно полагаться на сборщик мусора для уничтожения пула, поскольку CPython не гарантирует, что будет вызван финализатор пула.

Примечание. Рабочие процессы в пуле обычно живут в течение всего срока рабочей очереди пула. Часто используемый в других системах (например, Apache, mod_wsgi и т. д.) шаблон для освобождения ресурсов, удерживаемых рабочими процессами, заключается в том, чтобы позволить им выполнить только заданный объем работы перед его выходом, очисткой и созданием нового процесса. Для пула процессов, эту возможность пользователю предоставляет аргумент maxtasksperchild.

Методы объекта Pool.


Pool.apply(func[, args[, kwds]]):

Метод Pool.apply() вызывает функцию func с аргументами args и ключевыми аргументами kwds. Блокирует, пока не будет готов результат.

С учетом этой блокировки, для параллельной работы лучше подходит метод Pool.apply_async(). Кроме того, аргумент метода func выполняется только в одном рабочем процессе пула.

Pool.apply_async(func[, args[, kwds[, callback[, error_callback]]]]):

Метод Pool.apply_async() асинхронный вариант метода Pool.apply(), который возвращает объект AsyncResult.

Если указан обратный вызов callback, то это должен быть вызываемый объект, принимающий единственный аргумент. Когда результат становится готовым, к нему применяется этот обратный вызов. Если вызов callback не удался, то в этом случае вместо него применяется error_callback.

Если указан error_callback, то это должен быть вызываемый объект, который принимает единственный аргумент. Если целевая функция обратного вызова терпит неудачу, то вызывается error_callback с экземпляром исключения в качестве аргумента.

Обратные вызовы должны завершаться немедленно, в противном случае асинхронный поток, обрабатывающий результаты, будет заблокирован.

Pool.map(func, iterable[, chunksize]):

Метод Pool.map() представляет собой многопроцессорный эквивалент встроенной функции map().

Аргумент метода iterable представляет из себя итерацию, элементы которой не распаковываются при передаче в функцию func. То есть функция func(x), может иметь только один аргумент x. Другими словами, если iterable=[(1, 2), (10, 20), ...], то запуск задач будет происходить следующим образом: [func((1, 2)), func((10, 20)), ...], где x будет равен кортежу, например (1, 2).

Если необходимо использовать функцию с несколькими аргументами, например func(x, y) и распаковывать итерации при запуске, например: [func(1, 2), func(10, 20), ...], то посмотрите в сторону метода Pool.starmap().

Метод блокирует выполнение программы до получения результатов работы всеми запущенными процессами.

Метод Pool.map() разбивает итерируемый объект на несколько частей, которые отправляет в пул процессов как отдельные задачи. Приблизительный размер этих фрагментов можно указать, задав для аргумента метода chunksize положительное целое число.

Обратите внимание, что Pool.map() может привести к высокому использованию памяти для очень длинных итераций. Для большей эффективности рассмотрите возможность использования метода Pool.imap() или Pool.imap_unordered() с явной опцией chunksize.

Pool.map_async(func, iterable[, chunksize[, callback[, error_callback]]]):

Метод Pool.map_async() вариант метода Pool.map(), который возвращает объект AsyncResult.

Если указан обратный вызов callback, то это должен быть вызываемый объект, принимающий единственный аргумент. Когда результат становится готовым, к нему применяется этот обратный вызов. Если вызов callback не удался, то в этом случае вместо него применяется error_callback.

Если указан error_callback, то это должен быть вызываемый объект, который принимает единственный аргумент. Если целевая функция обратного вызова терпит неудачу, то вызывается error_callback с экземпляром исключения в качестве аргумента.

Обратные вызовы должны завершаться немедленно, в противном случае асинхронный поток, обрабатывающий результаты, будет заблокирован.

Pool.imap(func, iterable[, chunksize]):

Метод Pool.imap() представляет из себя более ленивую версию метода Pool.map().

Аргумент метода chunksize совпадает с аргументом, используемым методом Pool.map(). Для очень длинных итераций использование большого значения chunksize может значительно ускорить выполнение задания, чем использование значения по умолчанию 1.

Также, если chunksize=1, то метод итератора it.next(), возвращаемый методом Pool.imap(), будет иметь необязательный параметр тайм-аута: .next(timeout). Этот метод итератора вызовет TimeoutError, если результат не может быть возвращен в течение timeout секунд.

Например:

from multiprocessing import Pool

def worker(x):
    return x*x

if __name__ == '__main__':
    # запускаем 4 рабочих процесса
    with Pool(processes=4) as pool:
        it = pool.imap(worker, range(10))
        # использование встроенной функции next()
        print(next(it))    # выведет 0
        print(next(it))    # выведет 1
        # использование метода-итератора с аргументом `timeout`
        # выведет "4" если компьютер не очень медленный
        print(it.next(timeout=1))

Pool.imap_unordered(func, iterable[, chunksize]):

Метод Pool.imap_unordered() то же самое, что и Pool.imap(), за исключением того, что порядок результатов возвращаемого итератора следует считать произвольным.

Только когда есть только один рабочий процесс, порядок гарантированно будет "правильным".

Pool.starmap(func, iterable[, chunksize]):

Метод Pool.starmap() аналогичен методу Pool.map(), за исключением того, что элементы iterable, как ожидается, будут то же итерируемыми, которые будут распаковываться в качестве аргументов.

Например, если iterable=[(1, 2), (3, 4)], то результат передачи аргументов в вызываемый объект func будет таким: [func(1, 2), func(3, 4)].

Pool.starmap_async(func, iterable[, chunksize[, callback[, error_callback]]]):

Метод Pool.starmap_async() представляет собой комбинацию методов Pool.starmap() и Pool.map_async().

Метод выполняет итерацию по iterable, распаковывает их и вызывает функцию func с распакованными аргументами из iterable. Возвращает объект результата.

Pool.close():

Метод Pool.close() предотвращает отправку задач в пул. Как только все задачи будут выполнены, рабочие процессы завершатся.

Pool.terminate():

Метод Pool.terminate() останавливает рабочие процессы немедленно, не давая завершить невыполненную работу. Метод будет вызван немедленно, когда объект пула будет обработан сборщиком мусора.

Pool.join():

Метод Pool.join() ждет, пока запущенные рабочие процессы закончатся. Перед использованием Pool.join() необходимо вызвать Pool.close() или Pool.terminate().


Объект AsyncResult.

Объект AsyncResult представляет собой результат, возвращаемый методами Pool.apply_async() и Pool.map_async().

Объект AsyncResult определяет следующие методы:

AsyncResult.get([timeout]):

Метод AsyncResult.get() возвращает результат, как только он придет.

Если тайм-аут timeout не равен None и результат не приходит в течение этого тайм-аута, то возникает исключение TimeoutError. Если удаленный вызов вызвал исключение, это исключение будет вызвано повторно.

AsyncResult.wait([timeout]):

Метод AsyncResult.wait() ждет, пока будет доступен результат или пока не пройдет время таймаута timeout.

AsyncResult.ready():

Метод AsyncResult.ready() проверяет, завершился ли вызов.

AsyncResult.successful():

Метод AsyncResult.successful() проверяет, был ли завершен вызов без возникновения исключения. Поднимет исключение ValueError, если результат не готов.

Изменено в Python 3.7: Если результат не готов, вместо AssertionError возникает ValueError.


Пример создания, запуска и получение результатов от пула процессов:

В следующем примере показано использование пула:

from multiprocessing import Pool, TimeoutError, current_process
import time, os

def worker(x):
    print('WORKER() => ', current_process().name)
    return x*x

if __name__ == '__main__':
    # запуск 4 рабочих процессов
    with Pool(processes=4) as pool:
        # результаты получим в 
        # порядке поступления задач
        res = pool.map(worker, range(10))
        print(res)

        # результаты получим в порядке их 
        # готовности (могут быть не по порядку)
        for i in pool.imap_unordered(worker, range(10)):
            print(i, end=', ')
        print()
        
        # вычислит "worker(20)" асинхронно
        # запустится только один процесс
        res = pool.apply_async(worker, (20,))
        # получение результата 
        async_worker = res.get(timeout=1)
        print('1 процесс, worker(20) => ', async_worker)

        # вычислит "os.getpid()" асинхронно
        # запустится только один процесс
        res = pool.apply_async(os.getpid, ())
        # получение результата 
        async_getpid = res.get(timeout=1)
        print('1 процесс, os.getpid()  => ', async_getpid)

        # запуск нескольких асинхронных вычислений 
        # *может* использовать больше процессов
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        # получение асинхронных результатов
        async_multi = [res.get(timeout=1) for res in multiple_results]
        print('4 асинхронных процесса, os.getpid():')
        print(async_multi)

        # заставим спать один рабочий в течение 10 секунд
        res = pool.apply_async(time.sleep, (10,))
        try:
            # получение результата 
            res_sleep = res.get(timeout=1)
            print(res_sleep)
        except TimeoutError:
            print("time.sleep(10) => получили multiprocessing.TimeoutError")

        print("На этот момент пул остается доступным для дополнительной работы")

    # выход из блока 'with' остановил пул
    print("Теперь пул закрыт и больше не доступен")
    
    
# WORKER() =>  ForkPoolWorker-1
# WORKER() =>  ForkPoolWorker-2
# WORKER() =>  ForkPoolWorker-1
# WORKER() =>  ForkPoolWorker-3
# WORKER() =>  ForkPoolWorker-2
# WORKER() =>  ForkPoolWorker-2
# WORKER() =>  ForkPoolWorker-4
# WORKER() =>  ForkPoolWorker-1
# WORKER() =>  ForkPoolWorker-3
# WORKER() =>  ForkPoolWorker-2
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# WORKER() =>  ForkPoolWorker-4
# WORKER() =>  ForkPoolWorker-3
# WORKER() =>  ForkPoolWorker-4
# WORKER() =>  ForkPoolWorker-1
# WORKER() =>  ForkPoolWorker-4
# WORKER() =>  ForkPoolWorker-2
# WORKER() =>  ForkPoolWorker-4
# WORKER() =>  ForkPoolWorker-1
# WORKER() =>  ForkPoolWorker-3
# WORKER() =>  ForkPoolWorker-2
# 0, 1, 4, 9, 16, 25, 49, 36, 81, 64, 
# WORKER() =>  ForkPoolWorker-4
# 1 процесс, worker(20) =>  400
# 1 процесс, os.getpid()  =>  30330
# 4 асинхронных процесса, os.getpid():
# [30330, 30330, 30333, 30330]
# time.sleep(10) => получили multiprocessing.TimeoutError
# На этот момент пул остается доступным для дополнительной работы
# Теперь пул закрыт и больше не доступен

Тестирование основной функциональности объекта Pool.

Вывод теста основной функциональности и методов объекта Pool очень длинный и приводиться не будет. Кому интересно, скопируйте код и запустите сами.

# test.py
import multiprocessing, random
import time, sys

########################################
# Функции, используемые тестовым кодом #
########################################

def calculate(func, args):
    proc_name = multiprocessing.current_process().name
    proc_name = proc_name.replace('Fork', '')
    result = func(*args)
    return f'Процесс {proc_name}, вывод функции {func.__name__}{args} = {result}'

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def zero(x):
    return 1.0 / (x - 5.0)

#########################################################
# Тестирование основной функциональности объекта `Pool` #
#########################################################

def test():
    PROCESSES = 4
    print(f'\nСОЗДАНИЕ ПУЛА С {PROCESSES} ПРОЦЕССАМИ...')
    with multiprocessing.Pool(PROCESSES) as pool:
        ##############################
        #            ТЕСТЫ           # 
        ##############################
        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('\nУпорядоченные результаты: pool.apply_async():')
        for r in results:
            print('\t', r.get())

        print('\nУпорядоченные результаты: pool.imap():')
        for x in imap_it:
            print('\t', x)

        print('\nНеупорядоченные результаты: pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)

        print('\nУпорядоченные результаты: pool.map() => блокировка до полного завершения:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)

        #################################
        # ТЕСТИРОВАНИЕ ОБРАБОТКИ ОШИБОК #
        #################################
        print('Тестирования обработки ошибок:')
        try:
            print(pool.apply(zero, (5,)))
        except ZeroDivisionError:
            print('\tПолучено `ZeroDivisionError`, как ожидалось от pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(zero, list(range(10))))
        except ZeroDivisionError:
            print('\tПолучено `ZeroDivisionError`, как ожидалось от pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(zero, list(range(10)))))
        except ZeroDivisionError:
            print('\tПолучено `ZeroDivisionError`, как ожидалось от list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(zero, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tПолучено `ZeroDivisionError`, как ожидалось от IMapIterator.next()\n')

        ###########################
        # ТЕСТИРОВАНИЕ ТАЙМ-АУТОВ #
        ###########################
        print('Тестирование объекта ApplyResult.get() с тайм-аутом:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write(f'\n\t{res.get(0.02)}')
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')

        print('\n\nТестирование объекта IMapIterator.next() с тайм-аутом:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write(f'\n\t{it.next(0.02)}')
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print('\n')

if __name__ == '__main__':
    test()