import concurrent.futures as pool import time def worker(n): s = n / 10 time.sleep(s) return s with pool.ThreadPoolExecutor(max_workers=2) as executor: # создается объект `Future` future = executor.submit(worker, 5) print(future) # получение результата print(future.result()) # <Future at 0x7f11380b2fd0 state=running> # 0.5
Класс Future
модуля concurrent.futures
инкапсулирует асинхронное выполнение вызываемого объекта.
Экземпляры Future
создаются методом Executor.submit()
объектов ThreadPoolExecutor
или ProcessPoolExecutor
и не должны создаваться непосредственно, за исключением тестирования.
Объект Future
содержит в себе будущие результаты выполнения вызываемых объектов, запущенных в потоках/процессах и методы, которые с ними работают.
Future
.Future.cancel()
- пытается отменить вызов,Future.cancelled()
- проверка отмены вызова,Future.running()
- проверка состояния вызова,Future.done()
- проверка завершения вызова,Future.result()
- возвращает результаты каждого вызова,Future.exception()
- возвращает исключение,Future.add_done_callback()
- прикрепляет вызываемый объект к объекту `Future.Future.cancel()
:Метод Future.cancel()
пытается отменить вызов. Если вызов в данный момент выполняется или завершен и не может быть отменен, то метод возвращает False
, в противном случае вызов будет отменен, а метод вернет True
(пример).
Future.cancelled()
:Метод Future.cancelled()
возвращает True
, если вызов был успешно отменен (пример).
Future.running()
:Метод Future.running()
возвращает True
, если вызов в данный момент выполняется и не может быть отменен.
Future.done()
:Метод Future.done()
возвращает True
, если вызов был успешно отменен или завершен (пример).
Future.result(timeout=None)
:Метод Future.result()
возвращает значение, возвращаемое вызовом (пример).
Если вызов еще не завершен, то этот метод будет ждать до timeout
секунд.Если вызов не завершился за время ожидания в секундах, возникнет ошибка TimeoutError
.
Аргумент timeout
может быть int
или float
. Если тайм-аут timeout
не указан или равен None
, то время ожидания результатов не ограничено (до завершения всех вызовов во всех потоках).
Если Future
будет отменено Future.cancel()
до завершения, то будет вызвано исключение CancelledError
.
Если при вызове возникнет исключение, то метод Future.result()
вызовет то же исключение.
Future.exception(timeout=None)
:Метод Future.exception()
возвращает исключение, вызванное вызовом (пример).
timeout
секунд. TimeoutError
. Аргумент timeout
может быть int
или float
. Если тайм-аут timeout
не указан или равен None
, то время ожидания результатов не ограничено
Если Future
будет отменено Future.cancel()
до завершения, то будет вызвано исключение CancelledError
.
Если при вызове возникнет исключение, то метод Future.exception()
вызовет то же исключение.
Future.add_done_callback(fn)
:Метод Future.add_done_callback()
прикрепляет вызываемый объект fn
к объекту Future
каждого вызова в каждом потоке/процессе (пример).
Когда объект Future
завершит работу или будет отменен, то функция fn
будет вызвана с объектом Future
в качестве единственного аргумента.
Добавленные вызываемые объекты fn
вызываются в том порядке, в котором они были добавлены к объектам Future
, и всегда вызываются в потоке, принадлежащем процессу/потоку, который их добавил.
fn
вызывает подкласс исключения, то оно будет зарегистрировано и проигнорировано. fn
вызывает подкласс BaseException
, то его поведение не определено.Если объект Future
уже завершился с результатом или был отменен, то fn
вызывается немедленно.
Future
.Future.result()
,Future
.Future.result()
.Вызов метода Future.result()
блокируется до тех пор, пока задача не будет завершена, либо путем возврата значения или возникновения исключения, либо до отмены вызова.
Функция модуля as_completed()
используется для ожидания результатов работы функции worker()
по завершении каждого потока/процесса.
import concurrent.futures as pool import time def worker(a, b): time.sleep(0.5) return a / b # данные для worker() data = [(1, 0.3), (2, 0.7), (3, 9), (4, 3)] # создаем пул из 2-х потоков with pool.ThreadPoolExecutor(max_workers=3) as executor: # создаем список объектов 'Future' # (вызовы worker() с аргументами из data) futures = [executor.submit(worker, a, b) for a, b in data] # при помощи `as_completed` ждем # результатов от каждого 'Future' for future in pool.as_completed(futures): result = future.result() print(f'Результат {result}') # Результат 3.3333333333333335 # Результат 2.857142857142857 # Результат 0.3333333333333333 # Результат 1.3333333333333333
Future
.Чтобы выполнить какое-либо действие после завершения задачи, не дожидаясь явного результата, используйте метод Future.add_done_callback()
, чтобы указать новую функцию для вызова после завершения задачи в Future
.
Функция обратного вызова принимает только один аргумент - это экземпляр Future
. При помощи функции обратного вызова можно вести логи/журналы процессов/потоков. Если в ней ведутся дополнительные расчеты, то их можно складывать в потокобезопасную очередь, что бы потом где-то использовать.
import concurrent.futures as pool import time def worker(item): a, b = item time.sleep(0.1) return a / b # функция обратного вызова, принимает # только один аргумент - объект 'Future' def done(future): # с помощью атрибута мы # передали еще 2 аргумента a, b = future.attribute action = f'{a} / {b}' # теперь методами объекта 'Future' # проверяем его состояние и в зависимости, что # они возвращают, определяем что дальше делать if future.cancelled(): print(f'DONE: Действие {action} отменено.') elif future.done(): error = future.exception() if error: print(f'DONE: Действие {action} вызвало ошибку: {error}!') else: result = future.result() print(f'DONE: Действие {action} = {result}') # данные для worker() data = [(1, 0), (2, 1), (5, 2), (10, 3)] # создаем пул из 2-х потоков with pool.ThreadPoolExecutor(max_workers=2) as executor: # создаем список объектов 'Future' вызовами `executor.submit()` # (вызовы worker() с аргументами из data) futures = [executor.submit(worker, item) for item in data] # проходимся по объектам 'Future', # что бы управлять вызовами for i, future in enumerate(futures): # отменяем вызов с данными (10, 3) if data[i][0] == 10: future.cancel() # В Python, у объектов можно создавать атрибуты на лету # в учебных целях создадим на лету атрибуты у всех объекта # 'Future', что бы потом использовать их в функции done() future.attribute = data[i] # добавляем функцию обратного # вызова done() к каждому 'Future' future.add_done_callback(done) # ждем результатов for future in pool.as_completed(futures): try: # если нет исключения или вызов не отменен if future.result(): # получаем результат result = future.result() except Exception: pass else: print(f'Полученный результат {result}') # DONE: Действие 10 / 3 отменено. # DONE: Действие 1 / 0 вызвало ошибку: division by zero! # DONE: Действие 2 / 1 = 2.0 # Полученный результат 2.0 # DONE: Действие 5 / 2 = 2.5 # Полученный результат 2.5
Если задача вызывает необработанное исключение, то оно сохраняется в объекте Future
для этой задачи и становится доступным с помощью методов Future.result()
или Future.exception()
.
Если метод Future.result()
вызывается после возникновения необработанного исключения в функции задачи, то же исключение повторно возникает в текущем контексте.
import concurrent.futures as pool import time def worker(): time.sleep(1) raise ValueError(f'WORKER: Подняли ValueError!') # создаем потоки with pool.ThreadPoolExecutor(max_workers=2) as executor: print('Старт потоков') future = executor.submit(worker) error = future.exception() print(error) # обрабатываем исключение try: result = future.result() except ValueError as exc: print(f'Поймали ошибку "{exc}" при получении результата') # Старт потоков # WORKER: Подняли ValueError! # Поймали ошибку "WORKER: Подняли ValueError!" при получении результата
Задачу в объекте Future
можно отменить, если оно было отправлено, но не запущено, путем вызова его метода Future.cancel()
.
Метод Future.cancel()
возвращает логическое значение, указывающее, удалось ли отменить задачу.
import concurrent.futures as pool import time def worker(n): time.sleep(0.5) return n data = range(3, 10) # создаем потоки with pool.ThreadPoolExecutor(max_workers=2) as executor: print('Старт потоков') tasks = [ (n, executor.submit(worker, n)) for n in data ] for n, future in reversed(tasks): # отменяем задачи if future.cancel(): print(f'Задача с аргументом {n} отменена') else: # усли не получилось отменить result = future.result() print(f'Задача с аргументом {result} выполнена') # Задача с аргументом 9 отменена # Задача с аргументом 8 отменена # Задача с аргументом 7 отменена # Задача с аргументом 6 отменена # Задача с аргументом 5 отменена # Задача с аргументом 4 выполнена # Задача с аргументом 3 выполнена