Сообщить об ошибке.

Создание пулов потоков и процессов из цикла событий asyncio в Python.

Запуск блокирующих операций в пулах потоков или процессов.

В разделе рассмотрены методы низкоуровнего API модуля asyncio, позволяющие запускать блокирующие операции в указанном исполнителе.

Например, файловые операции (например, ведение журнала), блокирующие цикл событий можно запустить в пуле потоков, а операции, связанные с процессором, которые так же могут заблокировать цикл событий - в пуле процессов.

Прежде чем что-то делать с циклом событий, его необходимо создать или получить функциями, описанными в разделе "Создание, запуск и получение цикла событий".

Содержание:


loop.run_in_executor(executor, func, *args):

Метод loop.run_in_executor() организовывает вызов функции func в указанном исполнителе executor. Представляет собой объект ожидания результатов awaitable.

Аргумент executor должен быть экземпляром concurrent.futures.Executor. Если исполнителем является None, то используется исполнитель по умолчанию.

Этот метод возвращает объект asyncio.Future.

Используйте функцию functools.partial(), чтобы передавать ключевые аргументы функцию func.

Метод loop.run_in_executor() не настраивает количество запускаемых потоков в пуле и оставляет его на усмотрение исполнителя пула потоков ThreadPoolExecutor.

loop.set_default_executor(executor):

Метод loop.set_default_executor() устанавливает executor в качестве исполнителя по умолчанию, используемого методом loop.run_in_executor().

Исполнитель должен быть экземпляром concurrent.futures.ThreadPoolExecutor.

Начиная с Python 3.8. не рекомендуется устанавливать исполнителя, не являющегося экземпляром ThreadPoolExecutor, устарело и вызовет ошибку в Python 3.9.

Пример запуска блокирующих операций из асинхронного кода.

import asyncio
import concurrent.futures

def blocking_io():
    # Файловые операции(например, ведение журнала) могут
    # блокировать цикл событий: запустим их в пуле потоков.
    with open('/dev/urandom', 'rb') as f:
        res = f.read(100)
        return res

def cpu_bound():
    # Операции, связанные с процессором, заблокируют цикл событий:
    # в общем случае предпочтительно запускать их в пуле процессов.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    # Запуск в исполнителе по умолчанию:
    result = await loop.run_in_executor(None, blocking_io)
    print('\nПул потоков (по умолчанию):', result)

    # Запуск в пользовательском пуле потоков:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, blocking_io)
        print('\nЯвно установленный пул потоков:', result)

    # Запуск в настраиваемом пуле процессов:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('\nПользовательский пул процессов:', result)

if __name__ == '__main__':
    asyncio.run(main())