import asyncio await asyncio.run_coroutine_threadsafe(coro, loop)
coro
- сопрограмма,loop
- цикл событий.concurrent.futures.Future()
.Функция run_coroutine_threadsafe()
модуля asyncio
отправляет сопрограмму coro
в заданный цикл событий loop
. Функция поточно-ориентирована.
Возвращает объект concurrent.futures.Future()
, результаты которого можно дождаться и получить с помощью метода future.result()
.
Функция asyncio.run_coroutine_threadsafe()
предназначена для вызова сопрограмм в потоке, отличным от того, в котором запущен основной цикл событий. Для этой цели необходимо явное создание отдельного потока.
# Создать сопрограмму coro = asyncio.sleep(1, result=3) # Отправить сопрограмму в заданный цикл future = asyncio.run_coroutine_threadsafe(coro, loop) # Ждать результата с необязательным аргументом тайм-аута assert future.result(timeout) == 3
Если в сопрограмме возникает исключение, то будет уведомлено возвращенное concurrent.futures.Future
. Функцию asyncio.run_coroutine_threadsafe()
также можно использовать для отмены задачи в цикле событий:
try: result = future.result(timeout) except asyncio.TimeoutError: print('Сопрограмма заняла много времени, отменив задачу...') future.cancel() except Exception as exc: print(f'Сопрограмма вызвала исключение: {exc:!r}') else: print(f'Сопрограмма вернула результат: {result:!r}')
В отличие от других функций модуля asyncio
, эта функция требует явной передачи аргумента цикла событий loop
.
При помощи этой функции можно запустить сопрограмму в цикле событий, созданным в другом потоке, как это делает функция asyncio.to_thread()
, доступная с версии Python 3.9. Только для рассматриваемой функции необходимо явное создание потока.
Запустить сопрограмму в отдельном потоке можно так же методом цикла событий loop.run_in_executor()
при помощи низкоуровнего API.
В примере функция worker()
запускается явно в задаче в текущем цикле событий и отправляется при помощи функции asyncio.run_coroutine_threadsafe()
в новый цикл событий, созданный в отдельном потоке.
import asyncio, threading async def worker(name, delay): """ Асинхронная функция, которую будем запускать в основном и отдельном потоке """ # получаем имя потока th_name = threading.current_thread().name print(f'Start {name}; ожидание {delay}; поток: {th_name}') res = await asyncio.sleep(delay, result=delay) print(f'Done {name}; ожидание {delay}') return name, res async def main(new_loop): # список с будущими результатами results = [] # запускаем сопрограмму worker('Thread', 1) в цикле событий # `new_loop` другого потока. Функция `run_coroutine_threadsafe()` # проталкивает `worker()` в поток с циклом событий `new_loop` future1 = asyncio.run_coroutine_threadsafe(worker('Thread', 1), new_loop) # `future1` добавляем в список с результатами results.append(future1) # создаем асинхронную задачу в текущем цикле событий task = asyncio.create_task(worker('Task', 1.5)) # ожидаем результат от асинхронной задачи future2 = await task # `future2` добавляем в список с результатами results.append(future2) print('\nРезультаты:') # проходимся по списку с результатами `results` for future in results: # результаты потоков и цикла событий не # совместимы, по этому извлекаем их по разному if type(future) == tuple: # результаты цикла событий print(f'Задача {future[0]}; результат {future[1]}') else: # результаты потока res = future.result() print(f'Задача {res[0]}; результат {res[1]}') # останавливаем цикл событий `new_loop` # в отдельном потоке new_loop.call_soon_threadsafe(new_loop.stop) if __name__ == '__main__': # получаем новый цикл событий new_loop = asyncio.new_event_loop() # создаем поток с запущенным новым циклом событий thread = threading.Thread(target=new_loop.run_forever) # запускаем поток thread.start() # Запускаем основной цикл событий и передаем # в точку входа `main()` новый цикл событий `new_loop` # для использования в `run_coroutine_threadsafe()` asyncio.run(main(new_loop)) # Start Task; ожидание 1.5; поток: MainThread # Start Thread; ожидание 1; поток: Thread-1 # Done Thread; ожидание 1 # Done Task; ожидание 1.5 # Результаты: # Задача Thread; результат 1 # Задача Task; результат 1.5