Сообщить об ошибке.

Функции current_task() и all_tasks() модуля asyncio в Python

Получение текущей задачи или всех не завершенных задач

Синтаксис:

import asyncio

# получение текущей задачи
task = asyncio.current_task(loop=None)

# получение всех незавершенных задач
task_set = asyncio.all_tasks(loop=None)

Параметры:

  • loop=None - параметры цикла.

Возвращаемое значение:

  • asyncio.current_task(): текущий запущенный экземпляр Task
  • asyncio.all_tasks(): множество еще не завершенных задач.

Описание:

Функция asyncio.current_task() возвращает текущий запущенный экземпляр задачи asyncio.Task или None, если ни одна задача не запущена.

Функция asyncio.all_tasks() возвращает множество еще не завершенных задач asyncio.Task, запущенных циклом loop.

Если цикл loop=None, то для получения текущего цикла событий эти функции используют вызов низкоуровнего API asyncio.get_running_loop().

Данные функции были введены в Python 3.7

Пример использования asyncio.all_tasks:

Пример завершения всех незавершенных задач.

# псевдокод
...
pending = asyncio.all_tasks(loop)
for task in pending:
    task.cancel()
...

# или

...
tasks = [task for task in asyncio.all_tasks(loop) if not task.done()]
for task in tasks:
    task.cancel()
...

Пример получения текущий запущенной задачи:

Пример получения индификатора id текущий запущенной задачи.

loop = asyncio.get_event_loop()
if loop.is_running():
    task = asyncio.current_task()
    task_id = id(task)
    return task_id
else:
    return 0

Кросс-версионная реализация функции asyncio.current_task(). Возвращает None, если нет задачи.

def get_current_asyncio_task():
    try:
        if hasattr(asyncio, "current_task"):
            # Python 3.7 и выше
            return asyncio.current_task()
        else:
            # Python 3.6
            return asyncio.Task.current_task()
    except RuntimeError:
        return None