Сообщить об ошибке.

Объект Future модуля concurrent.futures в Python

Получение результатов из потоков/процессов, управление потоками

Синтаксис:

import concurrent.futures as pool
import time

def worker(n):
    s = n / 10
    time.sleep(s)
    return s

with pool.ThreadPoolExecutor(max_workers=2) as executor:
    # создается объект `Future`
    future = executor.submit(worker, 5)
    print(future)
    # получение результата
    print(future.result())

# <Future at 0x7f11380b2fd0 state=running>
# 0.5

Параметры:

  • нет.

Возвращаемое значение:

  • результаты выполнения вызываемых объектов (функций), запущенных в потоках/процессах.

Описание:

Класс Future модуля concurrent.futures инкапсулирует асинхронное выполнение вызываемого объекта.

Экземпляры Future создаются методом Executor.submit() объектов ThreadPoolExecutor или ProcessPoolExecutor и не должны создаваться непосредственно, за исключением тестирования.

Объект Future содержит в себе будущие результаты выполнения вызываемых объектов, запущенных в потоках/процессах и методы, которые с ними работают.

Методы объекта Future.


Future.cancel():

Метод Future.cancel() пытается отменить вызов. Если вызов в данный момент выполняется или завершен и не может быть отменен, то метод возвращает False, в противном случае вызов будет отменен, а метод вернет True (пример).

Future.cancelled():

Метод Future.cancelled() возвращает True, если вызов был успешно отменен (пример).

Future.running():

Метод Future.running() возвращает True, если вызов в данный момент выполняется и не может быть отменен.

Future.done():

Метод Future.done() возвращает True, если вызов был успешно отменен или завершен (пример).

Future.result(timeout=None):

Метод Future.result() возвращает значение, возвращаемое вызовом (пример).

Если вызов еще не завершен, то этот метод будет ждать до timeout секунд.Если вызов не завершился за время ожидания в секундах, возникнет ошибка TimeoutError.

Аргумент timeout может быть int или float. Если тайм-аут timeout не указан или равен None, то время ожидания результатов не ограничено (до завершения всех вызовов во всех потоках).

Если Future будет отменено Future.cancel() до завершения, то будет вызвано исключение CancelledError.

Если при вызове возникнет исключение, то метод Future.result() вызовет то же исключение.

Future.exception(timeout=None):

Метод Future.exception() возвращает исключение, вызванное вызовом (пример).

  • Если вызов еще не завершен, то этот метод будет ждать до timeout секунд.
  • Если вызов не завершился за время ожидания в секундах, возникнет ошибка TimeoutError.

Аргумент timeout может быть int или float. Если тайм-аут timeout не указан или равен None, то время ожидания результатов не ограничено

Если Future будет отменено Future.cancel() до завершения, то будет вызвано исключение CancelledError.

Если при вызове возникнет исключение, то метод Future.exception() вызовет то же исключение.

Future.add_done_callback(fn):

Метод Future.add_done_callback() прикрепляет вызываемый объект fn к объекту Future каждого вызова в каждом потоке/процессе (пример).

Когда объект Future завершит работу или будет отменен, то функция fn будет вызвана с объектом Future в качестве единственного аргумента.

Добавленные вызываемые объекты fn вызываются в том порядке, в котором они были добавлены к объектам Future, и всегда вызываются в потоке, принадлежащем процессу/потоку, который их добавил.

  • Если fn вызывает подкласс исключения, то оно будет зарегистрировано и проигнорировано.
  • Если fn вызывает подкласс BaseException, то его поведение не определено.

Если объект Future уже завершился с результатом или был отменен, то fn вызывается немедленно.


Примеры использования методов объекта Future.


Пример ожидания результатов методом Future.result().

Вызов метода Future.result() блокируется до тех пор, пока задача не будет завершена, либо путем возврата значения или возникновения исключения, либо до отмены вызова.

Функция модуля as_completed() используется для ожидания результатов работы функции worker() по завершении каждого потока/процесса.

import concurrent.futures as pool
import time

def worker(a, b):
    time.sleep(0.5)
    return a / b 

# данные для worker()
data = [(1, 0.3), (2, 0.7), (3, 9), (4, 3)]

# создаем пул из 2-х потоков
with pool.ThreadPoolExecutor(max_workers=3) as executor:
    # создаем список объектов 'Future' 
    # (вызовы worker() с аргументами из data)
    futures = [executor.submit(worker, a, b) for a, b in data]

    # при помощи `as_completed` ждем 
    # результатов от каждого 'Future'
    for future in pool.as_completed(futures):
        result = future.result()
        print(f'Результат {result}')
            
# Результат 3.3333333333333335
# Результат 2.857142857142857
# Результат 0.3333333333333333
# Результат 1.3333333333333333

Пример с обратным вызовом объекта Future.

Чтобы выполнить какое-либо действие после завершения задачи, не дожидаясь явного результата, используйте метод Future.add_done_callback(), чтобы указать новую функцию для вызова после завершения задачи в Future.

Функция обратного вызова принимает только один аргумент - это экземпляр Future. При помощи функции обратного вызова можно вести логи/журналы процессов/потоков. Если в ней ведутся дополнительные расчеты, то их можно складывать в потокобезопасную очередь, что бы потом где-то использовать.

import concurrent.futures as pool
import time

def worker(item):
    a, b = item
    time.sleep(0.1)
    return a / b 

# функция обратного вызова, принимает 
# только один аргумент - объект 'Future'
def done(future):
    # с помощью атрибута мы 
    # передали еще 2 аргумента
    a, b = future.attribute
    action = f'{a} / {b}'
    # теперь методами объекта 'Future'
    # проверяем его состояние и в зависимости, что 
    # они возвращают, определяем что дальше делать
    if future.cancelled():
        print(f'DONE: Действие {action} отменено.')
    elif future.done():
        error = future.exception()
        if error:
            print(f'DONE: Действие {action} вызвало ошибку: {error}!')
        else:
            result = future.result()
            print(f'DONE: Действие {action} = {result}')

# данные для worker()
data = [(1, 0), (2, 1), (5, 2), (10, 3)]

# создаем пул из 2-х потоков
with pool.ThreadPoolExecutor(max_workers=2) as executor:
    # создаем список объектов 'Future' вызовами `executor.submit()`
    # (вызовы worker() с аргументами из data)
    futures = [executor.submit(worker, item) for item in data]
    # проходимся по объектам 'Future', 
    # что бы управлять вызовами
    for i, future in enumerate(futures):
        # отменяем вызов с данными (10, 3)
        if data[i][0] == 10:
            future.cancel()    
        # В Python, у объектов можно создавать атрибуты на лету
        # в учебных целях создадим на лету атрибуты у всех объекта
        # 'Future', что бы потом использовать их в функции done()
        future.attribute = data[i]
        # добавляем функцию обратного
        # вызова done() к каждому 'Future'
        future.add_done_callback(done)

    # ждем результатов        
    for future in pool.as_completed(futures):
        try:
            # если нет исключения или вызов не отменен
            if future.result():
                # получаем результат
                result = future.result()
        except Exception:
            pass
        else:
            print(f'Полученный результат {result}')

# DONE: Действие 10 / 3 отменено.
# DONE: Действие 1 / 0 вызвало ошибку: division by zero!
# DONE: Действие 2 / 1 = 2.0
# Полученный результат 2.0
# DONE: Действие 5 / 2 = 2.5
# Полученный результат 2.5

Пример с исключением в задачах потоков/процессов.

Если задача вызывает необработанное исключение, то оно сохраняется в объекте Future для этой задачи и становится доступным с помощью методов Future.result() или Future.exception().

Если метод Future.result() вызывается после возникновения необработанного исключения в функции задачи, то же исключение повторно возникает в текущем контексте.

import concurrent.futures as pool
import time

def worker():
    time.sleep(1)
    raise ValueError(f'WORKER: Подняли ValueError!')

# создаем потоки
with pool.ThreadPoolExecutor(max_workers=2) as executor:
    print('Старт потоков')
    future = executor.submit(worker)
    error = future.exception()
    print(error)

# обрабатываем исключение
try:
    result = future.result()
except ValueError as exc:
    print(f'Поймали ошибку "{exc}" при получении результата')
    

# Старт потоков
# WORKER: Подняли ValueError!
# Поймали ошибку "WORKER: Подняли ValueError!" при получении результата

Пример с отменой заданий в потоках/процессах.

Задачу в объекте Future можно отменить, если оно было отправлено, но не запущено, путем вызова его метода Future.cancel().

Метод Future.cancel() возвращает логическое значение, указывающее, удалось ли отменить задачу.

import concurrent.futures as pool
import time

def worker(n):
    time.sleep(0.5)
    return n

data = range(3, 10)

# создаем потоки
with pool.ThreadPoolExecutor(max_workers=2) as executor:
    print('Старт потоков')
    tasks = [
        (n, executor.submit(worker, n)) 
        for n in data
        ]

    for n, future in reversed(tasks):
        # отменяем задачи
        if future.cancel():
            print(f'Задача с аргументом {n} отменена')
        else:
            # усли не получилось отменить
            result = future.result()
            print(f'Задача с аргументом {result} выполнена')

# Задача с аргументом 9 отменена
# Задача с аргументом 8 отменена
# Задача с аргументом 7 отменена
# Задача с аргументом 6 отменена
# Задача с аргументом 5 отменена
# Задача с аргументом 4 выполнена
# Задача с аргументом 3 выполнена