Модуль concurrent.futures
предоставляет высокоуровневый интерфейс для асинхронного выполнения вызываемых объектов, с использованием пулов потоков threads
или ядер процессора process
.
Плюсы модуля concurrent.futures
:
ThreadPoolExecutor
, задачи выполняются в потоках;ProcessPoolExecutor
, задачи выполняются на ядрах процессора;concurrent.futures.Executor
(смотрите ниже по тексту), поэтому приложения могут переключаться между потоками и процессами с минимальными изменениями;asyncio
, для запуска блокирующих операции в отдельных потоках/процессах.Минусы модуля concurrent.futures
:
threading
и multiprocessing
.Чтобы использовать пул потоков/процессов с задачами, приложение создает экземпляр соответствующего класса исполнителя, а затем отправляет задачи для их запуска. При запуске каждой задачи создается экземпляр класса Future
этой задачи, который ставится в очередь на выполнение в пуле потоков/процессов.
Для получения результатов задач, приложение может использовать объект Future
для блокировки дальнейшего выполнения программы, пока результат задачи в пуле потоков/процессов не станет доступен. При этом не нужно напрямую управлять объектами Future
, для удобства ожидания обработки данных, модулем предоставляются различные API-интерфейсы.
Модуль concurrent.futures
определяет абстрактный класс concurrent.futures.Executor()
, который предоставляет основные методы для асинхронного выполнения вызовов. Его следует использовать не напрямую, а через его конкретные подклассы, о которых говорилось выше. В данной документации к модулю про этот класс больше говорится не будет, а его методы (т.к. они наследуются) будут рассмотрены в конкретных классах.
ThreadPoolExecutor.map()
, который ждет выполнения всех задач запущенных в пуле и по окончании возвращает результаты в виде итератора. Результаты обработки данных всегда будут возвращаться в том порядке, в каком они поступали при запуске пула потоков/процессов. Смотрите пример.as_completed(Future)
, а потом для каждого объекта Future
вызвать метод Future.result()
с результатами его работы. Используйте данный подход, если не имеет значения, в каком порядке будут поступать результаты обработки данных. Смотрите пример.ThreadPoolExecutor.map()
для получения результатов работы.Первый пример разберем на объект ThreadPoolExecutor
, который управляет набором рабочих потоков, передавая им задачи, когда они становятся доступными для дополнительной работы.
В примере используется метод ThreadPoolExecutor.map()
для одновременного создания множества результатов из итерируемого ввода. Для демонстрации того, что этот метод всегда возвращает значения в порядке, основанном на вводе, будем приостанавливать выполнение задач в потоках на разное количество времени при помощи функции time.sleep()
.
Возвращаемое значение из метода ThreadPoolExecutor.map()
на самом деле является особым типом итератора, который ждет результаты выполнения от всех запущенных задач в пуле потоков/процессов.
import concurrent.futures as pool import threading, time def worker(n): # имя конкретного рабочего потока th_name = threading.current_thread().name th_name = th_name.replace('PoolExecutor-0_', '-') print(f'{th_name}: обработка данных => {n}') result = n / 10 time.sleep(result) print(f'{th_name}: обработка закончена => {n}') return result # данные для обработки в пуле потоков data = list(range(1, 10, 2)) # имя основного потока программы th_name = threading.current_thread().name print(f'{th_name}: запущен...') # создаем и запускаем пул из 3 потоков with pool.ThreadPoolExecutor(max_workers=3) as executer: # в метод map передаем функцию # worker() и данные для обработки res = executer.map(worker, data) # получаем результаты в виде списка results = list(res) print(f'{th_name}: результаты => {results}') # MainThread: запущен... # Thread-0: обработка данных => 1 # Thread-1: обработка данных => 3 # Thread-2: обработка данных => 5 # Thread-0: обработка закончена => 1 # Thread-0: обработка данных => 7 # Thread-1: обработка закончена => 3 # Thread-1: обработка данных => 9 # Thread-2: обработка закончена => 5 # Thread-0: обработка закончена => 7 # Thread-1: обработка закончена => 9 # MainThread: результаты => [0.1, 0.3, 0.5, 0.7, 0.9]
Future.result()
для получения результатов работы.Этот пример аналогичен первому, только будет основываться на пуле ядер процессора (не потоков), а получение результатов обработки данных будет основываться на методе Future.result()
.
Вызов метода Future.result()
объекта Future
блокируется до тех пор, пока задача не будет завершена: либо путем возврата значения или возникновения исключения, либо до отмены самого задания. Функция модуля as_completed()
возвращает объекты Future
по мере их завершения, из которых уже можно получить результат работы методом Future.result()
.
Обратите внимание, что результаты будут возвращаться не по порядку поступления данных из data
или очередности запуска процессов в пуле, а по окончанию обработки поступившей порции данных в worker()
в каждом процессе. Для демонстрации этого поведения, функция worker()
останавливает выполнение задач в потоках на разное время (строка с time.sleep(result)
).
import concurrent.futures as pool import multiprocessing, time def worker(n): # имя конкретного рабочего процесса core_name = multiprocessing.current_process().name print(f'{core_name}: обработка данных => {n}') result = n / 10 time.sleep(result) print(f'{core_name}: обработка закончена => {n}') return result # данные для обработки в пуле процессов data = list(range(1, 10, 2)) # имя основного процесса программы core_name = multiprocessing.current_process().name print(f'{core_name}: запущен...') # создаем и запускаем пул с использованием # 3-х ядер процессора with pool.ProcessPoolExecutor(max_workers=3) as executer: wait_complete = [] # проходимся по списку с данными для обработки for task in data: # запускаем worker() для каждого элемента данных future = executer.submit(worker, task) # в список складываем объекты 'future' # с будущими результатами wait_complete.append(future) print(f'{core_name}: результаты работы пула:') # ждем окончания выполнения пула процессов for res in pool.as_completed(wait_complete): # возвращаем обработанные данные print(f':=> {res.result()}') # MainProcess: запущен... # Process-1: обработка данных => 1 # Process-2: обработка данных => 3 # Process-3: обработка данных => 5 # Process-1: обработка закончена => 1 # Process-1: обработка данных => 7 # Process-2: обработка закончена => 3 # Process-2: обработка данных => 9 # Process-3: обработка закончена => 5 # Process-1: обработка закончена => 7 # Process-2: обработка закончена => 9 # MainProcess: результаты работы пула: # :=> 0.3 # :=> 0.1 # :=> 0.9 # :=> 0.5 # :=> 0.7