Сообщить об ошибке.

Функция run_coroutine_threadsafe() модуля asyncio в Python

Запуск сопрограмм в другом, заданном цикле событий

Синтаксис:

import asyncio

await asyncio.run_coroutine_threadsafe(coro, loop)

Параметры:

Возвращаемое значение:

Описание:

Функция run_coroutine_threadsafe() модуля asyncio отправляет сопрограмму coro в заданный цикл событий loop. Функция поточно-ориентирована.

Возвращает объект concurrent.futures.Future(), результаты которого можно дождаться и получить с помощью метода future.result().

Функция asyncio.run_coroutine_threadsafe() предназначена для вызова сопрограмм в потоке, отличным от того, в котором запущен основной цикл событий. Для этой цели необходимо явное создание отдельного потока.

# Создать сопрограмму
coro = asyncio.sleep(1, result=3)

# Отправить сопрограмму в заданный цикл
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Ждать результата с необязательным аргументом тайм-аута
assert future.result(timeout) == 3

Если в сопрограмме возникает исключение, то будет уведомлено возвращенное concurrent.futures.Future. Функцию asyncio.run_coroutine_threadsafe() также можно использовать для отмены задачи в цикле событий:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('Сопрограмма заняла много времени, отменив задачу...')
    future.cancel()
except Exception as exc:
    print(f'Сопрограмма вызвала исключение: {exc:!r}')
else:
    print(f'Сопрограмма вернула результат: {result:!r}')

В отличие от других функций модуля asyncio, эта функция требует явной передачи аргумента цикла событий loop.

При помощи этой функции можно запустить сопрограмму в цикле событий, созданным в другом потоке, как это делает функция asyncio.to_thread(), доступная с версии Python 3.9. Только для рассматриваемой функции необходимо явное создание потока.

Запустить сопрограмму в отдельном потоке можно так же методом цикла событий loop.run_in_executor() при помощи низкоуровнего API.

Пример запуска асинхронной задачи в заданном цикле событий.

В примере функция worker() запускается явно в задаче в текущем цикле событий и отправляется при помощи функции asyncio.run_coroutine_threadsafe() в новый цикл событий, созданный в отдельном потоке.

import asyncio, threading

async def worker(name, delay):
    """
    Асинхронная функция, которую будем запускать 
    в основном и отдельном потоке
    """    
    # получаем имя потока 
    th_name = threading.current_thread().name
    print(f'Start {name}; ожидание {delay}; поток: {th_name}')
    res = await asyncio.sleep(delay, result=delay)
    print(f'Done {name}; ожидание {delay}')
    return name, res

async def main(new_loop):
    # список с будущими результатами
    results = []
    # запускаем сопрограмму worker('Thread', 1) в цикле событий 
    # `new_loop` другого потока. Функция `run_coroutine_threadsafe()`
    # проталкивает `worker()` в поток с циклом событий `new_loop`
    future1 = asyncio.run_coroutine_threadsafe(worker('Thread', 1), new_loop)
    # `future1` добавляем в список с результатами
    results.append(future1)
    # создаем асинхронную задачу в текущем цикле событий 
    task = asyncio.create_task(worker('Task', 1.5))
    # ожидаем результат от асинхронной задачи 
    future2 = await task
    # `future2` добавляем в список с результатами
    results.append(future2)

    print('\nРезультаты:')
    # проходимся по списку с результатами `results`
    for future in results:
        # результаты потоков и цикла событий не 
        # совместимы, по этому извлекаем их по разному
        if type(future) == tuple:
            # результаты цикла событий 
            print(f'Задача {future[0]}; результат {future[1]}')        
        else:
            # результаты потока
            res = future.result()
            print(f'Задача {res[0]}; результат {res[1]}')
            # останавливаем цикл событий `new_loop` 
            # в отдельном потоке
            new_loop.call_soon_threadsafe(new_loop.stop)

if __name__ == '__main__':
    # получаем новый цикл событий
    new_loop = asyncio.new_event_loop()
    # создаем поток с запущенным новым циклом событий
    thread = threading.Thread(target=new_loop.run_forever)
    # запускаем поток
    thread.start()
    # Запускаем основной цикл событий и передаем 
    # в точку входа `main()` новый цикл событий `new_loop`
    # для использования в `run_coroutine_threadsafe()`
    asyncio.run(main(new_loop))


# Start Task; ожидание 1.5; поток: MainThread
# Start Thread; ожидание 1; поток: Thread-1
# Done Thread; ожидание 1
# Done Task; ожидание 1.5

# Результаты:
# Задача Thread; результат 1
# Задача Task; результат 1.5