import concurrent.futures as pool ex = pool.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
max_workers=None
- максимальное количество создаваемых потоков,thread_name_prefix=''
- префикс для имени потока,initializer=None
- вызываемый объект (функция),initargs=()
- аргументы, передаваемые initializer
.Класс ThreadPoolExecutor()
модуля concurrent.futures
использует пул не более max_workers
потоков для асинхронного выполнения вызовов.
Аргумент initializer
- это необязательный вызываемый объект (функция), который вызывается в начале каждого рабочего потока;
Аргумент initargs
- это набор аргументов, переданных вызываемому объекту initializer
. Если initializer
вызывает исключение, то все текущие ожидающие задания вызовут BrokenThreadPool
, а также любые попытки отправить остальные задания в пул.
Если аргумент max_workers=None
или не задан, то по умолчанию будет использоваться количество процессорных ядер на машине, умноженное на 5, при условии, что ThreadPoolExecutor
часто используется для перекрытия ввода/вывода вместо работы ЦП, и количество потоков должно быть больше, чем количество потоков для ProcessPoolExecutor()
.
Новое в Python 3.6: добавлен аргумент thread_name_prefix
, позволяющий пользователям управлять потоками, для упрощения отладки.
Изменено в Python 3.7: Добавлены аргументы initializer
и initargs
.
Изменено в Python 3.8: Значение max_workers
по умолчанию изменено на min(32, os.cpu_count() + 4)
. Это значение по умолчанию сохраняет не менее 5 потоков для связанных задач ввода/вывода. Он использует не более 32 ядер ЦП для задач, связанных с ЦП, которые выпускают GIL, что позволяет избежать неявного использования очень больших ресурсов на многоядерных машинах.
Класс ThreadPoolExecutor()
теперь повторно использует неиспользуемые потоки перед запуском рабочих потоков max_workers
.
Примечание. Когда вызываемые объекты разных потоков ждут друг от друга результатов для дальнейшей работы, то могут возникать взаимоблокировки.
Пример возникновения взаимоблокировки:
import concurrent.futures as pool import time def wait_on_b(): time.sleep(5) # поток B никогда не завершится, потому что ждет A. print(b.result()) return 5 def wait_on_a(): time.sleep(5) # поток A никогда не завершится, потому что ждет B. print(a.result()) return 6 executor = pool.ThreadPoolExecutor(max_workers=2) A = executor.submit(wait_on_b) B = executor.submit(wait_on_a)
Еще пример взаимоблокировки:
def wait_on_future(): f = executor.submit(pow, 5, 2) # Этот код никогда не будет завершен, потому # что существует только один рабочий поток # и он выполняет эту функцию. print(f.result()) executor = ThreadPoolExecutor(max_workers=1) executor.submit(wait_on_future)
Executor
.Executor.submit()
планирует выполнение потоков,Executor.map()
работает аналогично функции map
,Executor.shutdown()
дает команду на освобождение ресурсов,Executor.submit(fn, /, *args, **kwargs)
:Метод Executor.submit()
планирует выполнение вызываемого объекта fn
как fn(* args ** kwargs)
и возвращает объект Future
, представляющий результаты выполнения вызываемого объекта.
with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
Executor.map(func, *iterables, timeout=None, chunksize=1)
:Метод Executor.map()
работает аналогично функции map(func, *iterables)
за исключением:
func
выполняется асинхронно, и несколько вызовов func
могут выполняться одновременно.Возвращенный итератор вызывает ошибку TimeoutError
, если вызывается __next__()
, а результат недоступен по истечении секунд ожидания timeout
в секундах после исходного вызова Executor.map()
.
Аргумент timeout
может быть целым числом или числом с плавающей запятой. Если timeout=None
или не указан, то время ожидания не ограничено.
Если вызов функции func
вызывает исключение, то это исключение будет вызвано, когда его значение будет получено из итератора.
Аргумент chunksize
для использования с классом ThreadPoolExecutor
не имеет никакого эффекта.
Executor.shutdown(wait=True, *, cancel_futures=False)
:Метод Executor.shutdown()
дает команду исполнителю на освобождение всех используемых ресурсов, когда ожидающие в настоящее время объекты Future
будут выполнены.
Вызовы методов Executor.submit()
и Executor.map()
, сделанные после завершения работы, вызовут исключение RuntimeError
.
Аргумент wait
:
wait
имеет значение True
, то этот метод не вернет результат, пока все ожидающие объекты Future
не будут выполнены и ресурсы, связанные с исполнителем, не будут освобождены. wait
имеет значение False
, то этот метод вернет результат немедленно и ресурсы, связанные с исполнителем, будут освобождены, когда все ожидающие Future
будут выполнены. wait
, вся программа Python не завершится, пока все ожидающие результата Future
не будут выполнены.Если аргумент cancel_futures
имеет значение True
, то этот метод отменит все отложенные объекты Future
, выполнение которых исполнитель еще не начал. Любые завершенные или запущенные Future
не будут отменены, независимо от значения cancel_futures
.
Если и cancel_futures
, и wait
имеют значение True
, то все объекты Future
, запущенные исполнителем, будут завершены до возвращения результата этим методом. Остальные Future
аннулируются.
Можно избежать явного вызова метода Executor.shutdown()
, если использовать оператор with
, который по завершении будет сам вызвать Executor.shutdown(wait=True)
(с wait
, установленным в True):
import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
Изменено в Python 3.9: добавлено аргумент cancel_futures
.
В примере асинхронно выполняется HTTP запросы к 7-ми сайтам в 4 потока.
import concurrent.futures import urllib.request URLS = ['https://vk.com', 'https://russian.rt.com/', 'https://www.youtube.com/', 'https://mail.ru/', 'https://www.google.ru/', 'https://timeweb.com/ru/', 'https://yandex.ru/'] # скачивает одну страницу def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # используем оператор `with` для обеспечения быстрой очистки потоков with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: # загружаем и отмечаем каждый будущий результат своим URL-адресом future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print(f'{url} сгенерировано исключение: {exc}') else: print(f'{url} скачено {len(data)} bytes') # https://www.youtube.com/ скачено 483363 bytes # https://www.google.ru/ скачено 14518 bytes # https://vk.com скачено 30625 bytes # https://mail.ru/ скачено 312253 bytes # https://yandex.ru/ скачено 181901 bytes # https://timeweb.com/ru/ скачено 91670 bytes # https://russian.rt.com/ скачено 149219 bytes