Сообщить об ошибке.

Класс ProcessPoolExecutor() модуля concurrent.futures в Python

Создает пул из ядер процессора для асинхронного выполнения вызовов

Синтаксис:

import concurrent.futures as pool

ex = pool.ProcessPoolExecutor(max_workers=None, 
                              mp_context=None, 
                              initializer=None, 
                              initargs=())

Параметры:

  • max_workers=None - максимальное количество создаваемых процессов,
  • mp_context=None -
  • initializer=None - вызываемый объект (функция),
  • initargs=() - аргументы, передаваемые initializer.

Возвращаемое значение:

Описание:

Класс ProcessPoolExecutor() модуля concurrent.futures использует пул не более чем max_workers ядер процессора для асинхронного выполнения вызовов.

Аргумент max_workers:

  • Если max_workers равно None или не задано, то по умолчанию будет установлено количество ядер процессора на машине.
  • Если max_workers меньше или равно 0, то возникает ошибка ValueError.
  • В Windows max_workers должно быть меньше или равно 61. Если это не так, то возникает ошибка ValueError. Если в Windows max_workers равно None, то по умолчанию будет выбрано не более 61, даже если доступно больше процессоров.

Аргумент mp_context может быть контекстом многопроцессорности или None. Он будет использоваться для запуска рабочих процессов. Если mp_context=None или не задан, то используется контекст многопроцессорной обработки по умолчанию.

Аргумент initializer - это необязательный вызываемый объект, который вызывается в начале каждого рабочего процесса. Если вызываемый объект вызывает исключение, то все текущие ожидающие задания вызовут BrokenProcessPool, а также любые попытки отправить остальные задания в пул.

Аргумент initargs - это набор аргументов, переданных вызываемому объекту initializer.

При внезапном завершении одного из рабочих процессов возникает ошибка BrokenProcessPool. Раньше поведение было не определено, но операции с исполнителем или его объектами Future часто зависали или блокировались.

Изменено в Python 3.7:

  • добавлен аргумент mpcontext, позволяющий пользователям управлять startmethod для рабочих процессов, созданных пулом.
  • добавлены аргументы initializer и initargs.

Методы объекта Executor.


Executor.submit(fn, /, *args, **kwargs):

Метод Executor.submit() планирует выполнение вызываемого объекта fn как fn(* args ** kwargs) и возвращает объект Future, представляющий результаты выполнения вызываемого объекта.

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

Executor.map(func, *iterables, timeout=None, chunksize=1):

Метод Executor.map() работает аналогично функции map(func, *iterables) за исключением:

  • итерации собираются немедленно, а не лениво;
  • функции func выполняется асинхронно, и несколько вызовов func могут выполняться одновременно.

Возвращенный итератор вызывает ошибку TimeoutError, если вызывается __next__(), а результат недоступен по истечении секунд ожидания timeout в секундах после исходного вызова Executor.map().

Аргумент timeout может быть целым числом или числом с плавающей запятой. Если timeout=None или не указан, то время ожидания не ограничено.

Если вызов функции func вызывает исключение, то это исключение будет вызвано, когда его значение будет получено из итератора.

Метод Executor.map() разбивает итерируемые объекты на несколько фрагментов, которые он отправляет в пул как отдельные задачи. Приблизительный размер этих фрагментов можно указать, задав для chunksize положительное целое число. Для очень длинных итераций использование большого значения chunksize может значительно улучшить производительность по сравнению с размером по умолчанию равным 1.

Executor.shutdown(wait=True, *, cancel_futures=False):

Метод Executor.shutdown() дает команду исполнителю на освобождение всех используемых ресурсов, когда ожидающие в настоящее время объекты Future будут выполнены.

Вызовы методов Executor.submit() и Executor.map(), сделанные после завершения работы, вызовут исключение RuntimeError.

Аргумент wait:

  • Если аргумент wait имеет значение True, то этот метод не вернет результат, пока все ожидающие объекты Future не будут выполнены и ресурсы, связанные с исполнителем, не будут освобождены.
  • Если wait имеет значение False, то этот метод вернет результат немедленно и ресурсы, связанные с исполнителем, будут освобождены, когда все ожидающие Future будут выполнены.
  • Независимо от значения wait, вся программа Python не завершится, пока все ожидающие результата Future не будут выполнены.

Если аргумент cancel_futures имеет значение True, то этот метод отменит все отложенные объекты Future, выполнение которых исполнитель еще не начал. Любые завершенные или запущенные Future не будут отменены, независимо от значения cancel_futures.

Если и cancel_futures, и wait имеют значение True, то все объекты Future, запущенные исполнителем, будут завершены до возвращения результата этим методом. Остальные Future аннулируются.

Можно избежать явного вызова метода Executor.shutdown(), если использовать оператор with, который по завершении будет сам вызвать Executor.shutdown(wait=True)wait, установленным в True):

import shutil

with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

Изменено в Python 3.9: добавлено аргумент cancel_futures.


Пример использования пула ядер процессора для асинхронных вычислений.

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

with concurrent.futures.ProcessPoolExecutor() as executor:
    for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
        print(f'{number} is prime: {prime}')
        
# 112272535095293 is prime: True
# 112582705942171 is prime: True
# 112272535095293 is prime: True
# 115280095190773 is prime: True
# 115797848077099 is prime: True
# 1099726899285419 is prime: False