Сообщить об ошибке.

Класс ThreadPoolExecutor() модуля concurrent.futures в Python

Создает пул потоков для асинхронного выполнения вызовов

Синтаксис:

import concurrent.futures as pool

ex = pool.ThreadPoolExecutor(max_workers=None, 
                             thread_name_prefix='', 
                             initializer=None, initargs=())

Параметры:

  • max_workers=None - максимальное количество создаваемых потоков,
  • thread_name_prefix='' - префикс для имени потока,
  • initializer=None - вызываемый объект (функция),
  • initargs=() - аргументы, передаваемые initializer.

Возвращаемое значение:

Описание:

Класс ThreadPoolExecutor() модуля concurrent.futures использует пул не более max_workers потоков для асинхронного выполнения вызовов.

Аргумент initializer - это необязательный вызываемый объект (функция), который вызывается в начале каждого рабочего потока;

Аргумент initargs - это набор аргументов, переданных вызываемому объекту initializer. Если initializer вызывает исключение, то все текущие ожидающие задания вызовут BrokenThreadPool, а также любые попытки отправить остальные задания в пул.

Если аргумент max_workers=None или не задан, то по умолчанию будет использоваться количество процессорных ядер на машине, умноженное на 5, при условии, что ThreadPoolExecutor часто используется для перекрытия ввода/вывода вместо работы ЦП, и количество потоков должно быть больше, чем количество потоков для ProcessPoolExecutor().

Новое в Python 3.6: добавлен аргумент thread_name_prefix, позволяющий пользователям управлять потоками, для упрощения отладки.

Изменено в Python 3.7: Добавлены аргументы initializer и initargs.

Изменено в Python 3.8: Значение max_workers по умолчанию изменено на min(32, os.cpu_count() + 4). Это значение по умолчанию сохраняет не менее 5 потоков для связанных задач ввода/вывода. Он использует не более 32 ядер ЦП для задач, связанных с ЦП, которые выпускают GIL, что позволяет избежать неявного использования очень больших ресурсов на многоядерных машинах.

Класс ThreadPoolExecutor() теперь повторно использует неиспользуемые потоки перед запуском рабочих потоков max_workers.

Примечание. Когда вызываемые объекты разных потоков ждут друг от друга результатов для дальнейшей работы, то могут возникать взаимоблокировки.

Пример возникновения взаимоблокировки:

import concurrent.futures as pool
import time

def wait_on_b():
    time.sleep(5)
    # поток B никогда не завершится, потому что ждет A.
    print(b.result()) 
    return 5

def wait_on_a():
    time.sleep(5)
    # поток A никогда не завершится, потому что ждет B.
    print(a.result())  
    return 6

executor = pool.ThreadPoolExecutor(max_workers=2)
A = executor.submit(wait_on_b)
B = executor.submit(wait_on_a)

Еще пример взаимоблокировки:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # Этот код никогда не будет завершен, потому
    # что существует только один рабочий поток 
    # и он выполняет эту функцию.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)

Методы объекта Executor.


Executor.submit(fn, /, *args, **kwargs):

Метод Executor.submit() планирует выполнение вызываемого объекта fn как fn(* args ** kwargs) и возвращает объект Future, представляющий результаты выполнения вызываемого объекта.

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

Executor.map(func, *iterables, timeout=None, chunksize=1):

Метод Executor.map() работает аналогично функции map(func, *iterables) за исключением:

  • итерации собираются немедленно, а не лениво;
  • функции func выполняется асинхронно, и несколько вызовов func могут выполняться одновременно.

Возвращенный итератор вызывает ошибку TimeoutError, если вызывается __next__(), а результат недоступен по истечении секунд ожидания timeout в секундах после исходного вызова Executor.map().

Аргумент timeout может быть целым числом или числом с плавающей запятой. Если timeout=None или не указан, то время ожидания не ограничено.

Если вызов функции func вызывает исключение, то это исключение будет вызвано, когда его значение будет получено из итератора.

Аргумент chunksize для использования с классом ThreadPoolExecutor не имеет никакого эффекта.

Executor.shutdown(wait=True, *, cancel_futures=False):

Метод Executor.shutdown() дает команду исполнителю на освобождение всех используемых ресурсов, когда ожидающие в настоящее время объекты Future будут выполнены.

Вызовы методов Executor.submit() и Executor.map(), сделанные после завершения работы, вызовут исключение RuntimeError.

Аргумент wait:

  • Если аргумент wait имеет значение True, то этот метод не вернет результат, пока все ожидающие объекты Future не будут выполнены и ресурсы, связанные с исполнителем, не будут освобождены.
  • Если wait имеет значение False, то этот метод вернет результат немедленно и ресурсы, связанные с исполнителем, будут освобождены, когда все ожидающие Future будут выполнены.
  • Независимо от значения wait, вся программа Python не завершится, пока все ожидающие результата Future не будут выполнены.

Если аргумент cancel_futures имеет значение True, то этот метод отменит все отложенные объекты Future, выполнение которых исполнитель еще не начал. Любые завершенные или запущенные Future не будут отменены, независимо от значения cancel_futures.

Если и cancel_futures, и wait имеют значение True, то все объекты Future, запущенные исполнителем, будут завершены до возвращения результата этим методом. Остальные Future аннулируются.

Можно избежать явного вызова метода Executor.shutdown(), если использовать оператор with, который по завершении будет сам вызвать Executor.shutdown(wait=True)wait, установленным в True):

import shutil

with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

Изменено в Python 3.9: добавлено аргумент cancel_futures.


Пример использования пула потоков для асинхронного выполнения HTTP запросов.

В примере асинхронно выполняется HTTP запросы к 7-ми сайтам в 4 потока.

import concurrent.futures
import urllib.request

URLS = ['https://vk.com',
        'https://russian.rt.com/',
        'https://www.youtube.com/',
        'https://mail.ru/',
        'https://www.google.ru/',
        'https://timeweb.com/ru/',
        'https://yandex.ru/']

# скачивает одну страницу 
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# используем оператор `with` для обеспечения быстрой очистки потоков
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    # загружаем и отмечаем каждый будущий результат своим URL-адресом
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print(f'{url} сгенерировано исключение: {exc}')
        else:
            print(f'{url} скачено {len(data)} bytes')
            
            
# https://www.youtube.com/ скачено 483363 bytes
# https://www.google.ru/ скачено 14518 bytes
# https://vk.com скачено 30625 bytes
# https://mail.ru/ скачено 312253 bytes
# https://yandex.ru/ скачено 181901 bytes
# https://timeweb.com/ru/ скачено 91670 bytes
# https://russian.rt.com/ скачено 149219 bytes