Сообщить об ошибке.

Параллелизм и многопоточность низкоуровнего кода asyncio в Python

Цикл событий выполняется в потоке (обычно в основном потоке) и выполняет все обратные вызовы и задачи в своем потоке. Пока задача Task выполняется в цикле событий, никакие другие задачи не могут выполняться в том же потоке. Когда задача Task выполняется с оператором await, то выполняющаяся задача приостанавливается, а цикл обработки событий выполняет следующую задачу.

Чтобы запланировать обратный вызов из другого потока ОС, следует использовать метод loop.call_soon_threadsafe(). Пример:

loop.call_soon_threadsafe(callback, *args)

Почти все объекты модуля asyncio не являются потокобезопасными, что обычно не является проблемой, если нет кода, который работает с ними извне. Если такой код необходим, то для вызова низкоуровневого API, следует использовать метод loop.call_soon_threadsafe(), например:

loop.call_soon_threadsafe(future.cancel)

Чтобы запланировать объект сопрограммы из другого потока ОС, следует использовать функцию asyncio.run_coroutine_threadsafe(). Она возвращает concurrent.futures.Future для доступа к результату:

async def coro_func():
     return await asyncio.sleep(1, 42)

# Позже в другом потоке ОС:
future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Ждите результата:
result = future.result()

Для обработки сигналов и выполнения подпроцессов, цикл событий должен выполняться в основном потоке.

Метод loop.run_in_executor() можно использовать с concurrent.futures.ThreadPoolExecutor для выполнения блокирующего кода в другом потоке ОС без блокировки основного потока ОС, в котором выполняется цикл событий.

В настоящее время нет возможности запланировать сопрограммы или обратные вызовы непосредственно из другого процесса (например, запущенного с многопроцессорной обработкой). Однако, API-интерфейс модуля asyncio для обслуживания subprocess позволяют запускать процесс и взаимодействовать с ним из цикла событий.

Наконец, вышеупомянутый метод loop.run_in_executor() также можно использовать с concurrent.futures.ProcessPoolExecutor для выполнения кода в другом процессе.

Пример запуска сопрограммы в другом потоке:

В примере функция worker() запускается явно в задаче в текущем цикле событий и отправляется при помощи функции asyncio.run_coroutine_threadsafe() в новый цикл событий, созданный в отдельном потоке.

import asyncio, threading

async def worker(name, delay):
    th_name = threading.current_thread().name
    print(f'Start {name}; ожидание {delay}; поток: {th_name}')
    res = await asyncio.sleep(delay, result=delay)
    print(f'Done {name}; ожидание {delay}')
    return name, res

async def main(new_loop):
    results = []
    # Функция `run_coroutine_threadsafe()` проталкивает 
    # `worker()` в поток с циклом событий `new_loop`
    future1 = asyncio.run_coroutine_threadsafe(worker('Thread', 1), new_loop)
    results.append(future1)
    task = asyncio.create_task(worker('Task', 1.5))
    future2 = await task
    results.append(future2)

    print('\nРезультаты:')
    for future in results:
        if type(future) == tuple:
            print(f'Задача {future[0]}; результат {future[1]}')        
        else:
            res = future.result()
            print(f'Задача {res[0]}; результат {res[1]}')
            # останавливаем цикл событий `new_loop` 
            new_loop.call_soon_threadsafe(new_loop.stop)

if __name__ == '__main__':
    # получаем новый цикл событий
    new_loop = asyncio.new_event_loop()
    # создаем поток с запущенным новым циклом событий
    thread = threading.Thread(target=new_loop.run_forever)
    thread.start()
    asyncio.run(main(new_loop))


# Start Task; ожидание 1.5; поток: MainThread
# Start Thread; ожидание 1; поток: Thread-1
# Done Thread; ожидание 1
# Done Task; ожидание 1.5

# Результаты:
# Задача Thread; результат 1
# Задача Task; результат 1.5

Пример запуска нескольких команд в терминале из асинхронного кода.

Так как все функции для запуска подпроцесса модуля asyncio являются асинхронными, то легко выполнять и контролировать несколько подпроцессов, выполняемых параллельно.

import asyncio

async def run(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

    stdout, stderr = await proc.communicate()
    print(f'[{cmd!r} exited with {proc.returncode}]')
    if stdout:
        print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')

async def main():
    await asyncio.gather(
        run('ls /zzz'),
        run('sleep 1; echo "hello"'))

asyncio.run(main())


# ['ls /zzz' завершилась с кодом 2]
# [stderr]
# ls: невозможно получить доступ к '/zzz': Нет такого файла или каталога

# ['sleep 1; echo "Привет"' завершилась с кодом 0]
# [stdout]
# Привет