Цикл событий выполняется в потоке (обычно в основном потоке) и выполняет все обратные вызовы и задачи в своем потоке. Пока задача Task
выполняется в цикле событий, никакие другие задачи не могут выполняться в том же потоке. Когда задача Task
выполняется с оператором await
, то выполняющаяся задача приостанавливается, а цикл обработки событий выполняет следующую задачу.
Чтобы запланировать обратный вызов из другого потока ОС, следует использовать метод loop.call_soon_threadsafe()
. Пример:
loop.call_soon_threadsafe(callback, *args)
Почти все объекты модуля asyncio
не являются потокобезопасными, что обычно не является проблемой, если нет кода, который работает с ними извне. Если такой код необходим, то для вызова низкоуровневого API, следует использовать метод loop.call_soon_threadsafe()
, например:
loop.call_soon_threadsafe(future.cancel)
Чтобы запланировать объект сопрограммы из другого потока ОС, следует использовать функцию asyncio.run_coroutine_threadsafe()
. Она возвращает concurrent.futures.Future
для доступа к результату:
async def coro_func(): return await asyncio.sleep(1, 42) # Позже в другом потоке ОС: future = asyncio.run_coroutine_threadsafe(coro_func(), loop) # Ждите результата: result = future.result()
Для обработки сигналов и выполнения подпроцессов, цикл событий должен выполняться в основном потоке.
Метод loop.run_in_executor()
можно использовать с concurrent.futures.ThreadPoolExecutor
для выполнения блокирующего кода в другом потоке ОС без блокировки основного потока ОС, в котором выполняется цикл событий.
В настоящее время нет возможности запланировать сопрограммы или обратные вызовы непосредственно из другого процесса (например, запущенного с многопроцессорной обработкой). Однако, API-интерфейс модуля asyncio
для обслуживания subprocess позволяют запускать процесс и взаимодействовать с ним из цикла событий.
Наконец, вышеупомянутый метод loop.run_in_executor()
также можно использовать с concurrent.futures.ProcessPoolExecutor
для выполнения кода в другом процессе.
В примере функция worker() запускается явно в задаче в текущем цикле событий и отправляется при помощи функции asyncio.run_coroutine_threadsafe()
в новый цикл событий, созданный в отдельном потоке.
import asyncio, threading async def worker(name, delay): th_name = threading.current_thread().name print(f'Start {name}; ожидание {delay}; поток: {th_name}') res = await asyncio.sleep(delay, result=delay) print(f'Done {name}; ожидание {delay}') return name, res async def main(new_loop): results = [] # Функция `run_coroutine_threadsafe()` проталкивает # `worker()` в поток с циклом событий `new_loop` future1 = asyncio.run_coroutine_threadsafe(worker('Thread', 1), new_loop) results.append(future1) task = asyncio.create_task(worker('Task', 1.5)) future2 = await task results.append(future2) print('\nРезультаты:') for future in results: if type(future) == tuple: print(f'Задача {future[0]}; результат {future[1]}') else: res = future.result() print(f'Задача {res[0]}; результат {res[1]}') # останавливаем цикл событий `new_loop` new_loop.call_soon_threadsafe(new_loop.stop) if __name__ == '__main__': # получаем новый цикл событий new_loop = asyncio.new_event_loop() # создаем поток с запущенным новым циклом событий thread = threading.Thread(target=new_loop.run_forever) thread.start() asyncio.run(main(new_loop)) # Start Task; ожидание 1.5; поток: MainThread # Start Thread; ожидание 1; поток: Thread-1 # Done Thread; ожидание 1 # Done Task; ожидание 1.5 # Результаты: # Задача Thread; результат 1 # Задача Task; результат 1.5
Так как все функции для запуска подпроцесса модуля asyncio
являются асинхронными, то легко выполнять и контролировать несколько подпроцессов, выполняемых параллельно.
import asyncio async def run(cmd): proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) stdout, stderr = await proc.communicate() print(f'[{cmd!r} exited with {proc.returncode}]') if stdout: print(f'[stdout]\n{stdout.decode()}') if stderr: print(f'[stderr]\n{stderr.decode()}') async def main(): await asyncio.gather( run('ls /zzz'), run('sleep 1; echo "hello"')) asyncio.run(main()) # ['ls /zzz' завершилась с кодом 2] # [stderr] # ls: невозможно получить доступ к '/zzz': Нет такого файла или каталога # ['sleep 1; echo "Привет"' завершилась с кодом 0] # [stdout] # Привет