Сообщить об ошибке.

Использование объекта очереди asyncio.Queue

Очереди queues модуля asyncio разработаны так, чтобы быть похожими на классы модуля queue. Хотя очереди asyncio не являются потокобезопасными, они предназначены специально для использования в коде, использующим синтаксис async/await.

Обратите внимание, что методы очереди asyncio не имеют параметра timeout тайм-аута. Используйте функцию asyncio.wait_for() для выполнения операций для очереди Queue с таймаутом.

Очереди можно использовать для распределения нагрузки между несколькими параллельными задачами.

Смотрите примеры с очередью Queue ниже.

Содержание:


Простая очередь FIFO модуля asyncio.

asyncio.Queue(maxsize=0, *, loop=None):

Класс asyncio.Queue() представляет из себя простую очередь FIFO "первым пришел - первым вышел".

Если maxsize меньше или равен нулю, то размер очереди бесконечен. Если maxsize целое число больше нуля, то вызов метода await queue.put() блокируется, когда очередь достигает maxsize, пока элемент не будет удален методом queue.get().

В отличие от стандартного потокобезопасного модуля queue, размер очереди всегда известен и может быть возвращен путем вызова метода queue.qsize().

Параметр цикла loop устарел и не рекомендуется к использованию с версии Python 3.8, будет удален в версии Python 3.10

Атрибуты и методы очереди asyncio.Queue.

queue.maxsize:

Атрибут queue.maxsize возвращает количество элементов, которые могут поместится в очередь.

queue.empty():

Метод queue.empty() возвращает True, если очередь пуста и False в противном случае.

queue.full():

Метод queue.full() возвращает True, если очередь заполнена, в противном случае - False.

Если queue.full() возвращает True, это не гарантирует, что последующий вызов queue.get() не заблокируется. Аналогично, если queue.full() возвращает False, это не гарантирует, что последующий вызов queue.put() не заблокируется.

queue.get():

Метод queue.get() удаляет и возвращает удаленный элемент из очереди. Если очередь пуста, то ждет, пока не станет доступен элемент.

Представляет собой сопрограмму, используется с оператором await и может быть обернут в задачу.

queue.get_nowait():

Метод queue.get_nowait() возвращает элемент, если он доступен сразу, иначе поднимает исключение asyncio.QueueEmpty.

queue.join():

Метод queue.join() блокирует выполнение программы до тех пор, пока все элементы в очереди не будут получены и обработаны.

Представляет собой сопрограмму, используется с оператором await и может быть обернута в задачу.

  • Количество незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь.
  • Счетчик уменьшается всякий раз, когда сопрограмма-потребитель вызывает queue.task_done() для указания, что элемент был получен и вся работа над ним завершена.
  • Когда количество незавершенных задач падает до нуля, то метод queue.join() разблокирует ход выполнения программы.

queue.put(item):

Метод queue.put() размещает задачу item в очереди. Если очередь заполнена, то дожидается, пока освободится место, прежде чем добавлять элемент.

Представляет собой сопрограмму, используется с оператором await и может быть обернута в задачу.

queue.put_nowait(item):

Метод queue.put_nowait() размещает задачу item в очереди без блокировки. Если нет свободного места, то поднимает исключение asyncio.QueueFull.

queue.qsize():

Метод queue.qsize() возвращает приблизительный размер очереди. Обратите внимание: queue.qsize() > 0 не гарантирует, что последующий вызов queue.get() не будет блокироваться, а queue.qsize() < queue.maxsize не гарантирует, что queue.put() не будет блокироваться.

queue.task_done():

Метод queue.task_done() указывает, что задача, ранее поставленная в очередь, завершена. Используется потребителями очереди.

Для каждого метода queue.get(), используемого для выборки задачи, последующий вызов queue.task_done() сообщает очереди, что обработка задачи завершена.

Если queue.join() в настоящее время блокируется, то он возобновится, когда все элементы будут обработаны (это означает, что вызов queue.task_done() был получен для каждого элемента, который был queue.put() в очереди).

Вызывает ошибку ValueError, если вызывается больше раз, чем было помещено в очередь.

Приоритетная очередь модуля asyncio.

asyncio.PriorityQueue:

Класс asyncio.PriorityQueue представляет собой вариант очереди Queue, которая извлекает записи в порядке приоритета (самый низкий приоритет - первый).

Записи обычно представляют собой кортежи формы (priority_number, data).

API очереди, такой же как и у простой очереди FIFO.

Очередь LIFO модуля asyncio.

asyncio.LifoQueue:

Класс asyncio.LifoQueue представляет собой вариант очереди Queue, которая сначала извлекает самые последние добавленные записи (last in, first out).

API очереди, такой же как и у простой очереди FIFO.

Ошибки и исключения при работе с очередями модуля asyncio.

asyncio.QueueEmpty:

Исключение asyncio.QueueEmpty возникает, когда метод queue.get_nowait() вызывается в пустой очередью.

asyncio.QueueFull:

Исключение asyncio.QueueFull возникает, когда метод queue.put_nowait() вызывается в очереди, которая достигла своего максимального размера.


Пример передачи параметров задачам из очереди:

Очереди можно использовать для распределения нагрузки между несколькими параллельными задачами:

import asyncio
import random
import time

async def worker(name, queue):
    while True:
        # Вытаскиваем 'рабочий элемент' из очереди.
        sleep_for = await queue.get()
        # Задержка на 'sleep_for' секунд.
        await asyncio.sleep(sleep_for)
        # Сообщаем очереди, что 'рабочий элемент' обработан.
        queue.task_done()
        print(f'{name} has slept for {sleep_for:.2f} seconds')

async def main():
    # Создаем очередь, которую будем использовать
    # для хранения рабочей нагрузки.
    queue = asyncio.Queue()
    total_sleep_time = 0
    # создание данных для очереди
    for _ in range(20):
        # Генерируем случайные тайминги
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        # заполняем очередь.
        queue.put_nowait(sleep_for)

    # Создаем три рабочие задачи для одновременной обработки очереди.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    started_at = time.monotonic()
    # Запускаем обработку очереди и ожидаем, 
    # пока элементы не закончатся.
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # После того как очередь израсходована
    # останавливаем задачи
    for task in tasks:
        task.cancel()
    # Ждем, остановку задач.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')

asyncio.run(main())


# worker-1 has slept for 0.15 seconds
# worker-0 has slept for 0.29 seconds
# worker-2 has slept for 0.38 seconds
# worker-0 has slept for 0.32 seconds
# worker-1 has slept for 0.48 seconds
# worker-0 has slept for 0.30 seconds
# worker-2 has slept for 0.73 seconds
# worker-1 has slept for 0.50 seconds
# worker-2 has slept for 0.11 seconds
# worker-0 has slept for 0.54 seconds
# worker-1 has slept for 0.45 seconds
# worker-2 has slept for 0.43 seconds
# worker-0 has slept for 0.20 seconds
# worker-0 has slept for 0.31 seconds
# worker-2 has slept for 0.33 seconds
# worker-1 has slept for 0.80 seconds
# worker-2 has slept for 0.58 seconds
# worker-1 has slept for 0.28 seconds
# worker-0 has slept for 0.98 seconds
# worker-2 has slept for 0.77 seconds
# ====
# 3 workers slept in parallel for 3.34 seconds
# total expected sleep time: 8.95 seconds