Очереди queues
модуля asyncio
разработаны так, чтобы быть похожими на классы модуля queue
. Хотя очереди asyncio
не являются потокобезопасными, они предназначены специально для использования в коде, использующим синтаксис async/await
.
Обратите внимание, что методы очереди asyncio
не имеют параметра timeout
тайм-аута. Используйте функцию asyncio.wait_for()
для выполнения операций для очереди Queue
с таймаутом.
Очереди можно использовать для распределения нагрузки между несколькими параллельными задачами.
Смотрите примеры с очередью Queue
ниже.
asyncio.Queue()
,asyncio.PriorityQueue
,asyncio.LifoQueue
,asyncio.Queue
,asyncio
.asyncio.Queue(maxsize=0, *, loop=None)
:Класс asyncio.Queue()
представляет из себя простую очередь FIFO "первым пришел - первым вышел".
Если maxsize
меньше или равен нулю, то размер очереди бесконечен. Если maxsize
целое число больше нуля, то вызов метода await queue.put()
блокируется, когда очередь достигает maxsize
, пока элемент не будет удален методом queue.get()
.
В отличие от стандартного потокобезопасного модуля queue
, размер очереди всегда известен и может быть возвращен путем вызова метода queue.qsize()
.
Параметр цикла loop
устарел и не рекомендуется к использованию с версии Python 3.8, будет удален в версии Python 3.10
asyncio.Queue
.queue.maxsize
:Атрибут queue.maxsize
возвращает количество элементов, которые могут поместится в очередь.
queue.empty()
:Метод queue.empty()
возвращает True
, если очередь пуста и False
в противном случае.
queue.full()
:Метод queue.full()
возвращает True
, если в очереди превышает максимальный размер queue.maxsize
.
Если очередь была инициализирована с maxsize=0
(по умолчанию), то метод queue.full()
никогда не возвращает True
.
queue.get()
:Метод queue.get()
удаляет и возвращает элемент из очереди. Если очередь пуста, то ждет, пока не станет доступен элемент.
Представляет собой сопрограмму, используется с оператором await
и может быть обернута в задачу.
queue.get_nowait()
:Метод queue.get_nowait()
возвращает элемент, если он доступен сразу, иначе поднимает исключение asyncio.QueueEmpty
.
queue.join()
:Метод queue.join()
блокирует выполнение программы до тех пор, пока все элементы в очереди не будут получены и обработаны.
Представляет собой сопрограмму, используется с оператором await
и может быть обернута в задачу.
queue.task_done()
для указания, что элемент был получен и вся работа над ним завершена. queue.join()
разблокирует ход выполнения программы.queue.put(item)
:Метод queue.put()
размещает задачу item
в очереди. Если очередь заполнена, то дожидается, пока освободится место, прежде чем добавлять элемент.
Представляет собой сопрограмму, используется с оператором await
и может быть обернута в задачу.
queue.put_nowait(item)
:Метод queue.put_nowait()
размещает задачу item
в очереди без блокировки. Если нет свободного места, то поднимает исключение asyncio.QueueFull
.
queue.qsize()
:Метод queue.qsize()
возвращает количество элементов в очереди.
queue.task_done()
:Метод queue.task_done()
указывает, что задача, ранее поставленная в очередь, завершена. Используется потребителями очереди.
Для каждого метода queue.get()
, используемого для выборки задачи, последующий вызов queue.task_done()
сообщает очереди, что обработка задачи завершена.
Если queue.join()
в настоящее время блокируется, то он возобновится, когда все элементы будут обработаны (это означает, что вызов queue.task_done()
был получен для каждого элемента, который был queue.put()
в очереди).
Вызывает ошибку ValueError
, если вызывается больше раз, чем было помещено в очередь.
asyncio
.asyncio.PriorityQueue
:Класс asyncio.PriorityQueue
представляет собой вариант очереди Queue
, которая извлекает записи в порядке приоритета (самый низкий приоритет - первый).
Записи обычно представляют собой кортежи формы (priority_number, data)
.
API очереди, такой же как и у простой очереди FIFO.
asyncio
.asyncio.LifoQueue
:Класс asyncio.LifoQueue
представляет собой вариант очереди Queue
, которая сначала извлекает самые последние добавленные записи (last in, first out)
.
API очереди, такой же как и у простой очереди FIFO.
asyncio
.asyncio.QueueEmpty
:Исключение asyncio.QueueEmpty
возникает, когда метод queue.get_nowait()
вызывается в пустой очередью.
asyncio.QueueFull
:Исключение asyncio.QueueFull
возникает, когда метод queue.put_nowait()
вызывается в очереди, которая достигла своего максимального размера.
Очереди можно использовать для распределения нагрузки между несколькими параллельными задачами:
import asyncio import random import time async def worker(name, queue): while True: # Вытаскиваем 'рабочий элемент' из очереди. sleep_for = await queue.get() # Задержка на 'sleep_for' секунд. await asyncio.sleep(sleep_for) # Сообщаем очереди, что 'рабочий элемент' обработан. queue.task_done() print(f'{name} has slept for {sleep_for:.2f} seconds') async def main(): # Создаем очередь, которую будем использовать # для хранения рабочей нагрузки. queue = asyncio.Queue() total_sleep_time = 0 # создание данных для очереди for _ in range(20): # Генерируем случайные тайминги sleep_for = random.uniform(0.05, 1.0) total_sleep_time += sleep_for # заполняем очередь. queue.put_nowait(sleep_for) # Создаем три рабочие задачи для одновременной обработки очереди. tasks = [] for i in range(3): task = asyncio.create_task(worker(f'worker-{i}', queue)) tasks.append(task) started_at = time.monotonic() # Запускаем обработку очереди и ожидаем, # пока элементы не закончатся. await queue.join() total_slept_for = time.monotonic() - started_at # После того как очередь израсходована # останавливаем задачи for task in tasks: task.cancel() # Ждем, остановку задач. await asyncio.gather(*tasks, return_exceptions=True) print('====') print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') print(f'total expected sleep time: {total_sleep_time:.2f} seconds') asyncio.run(main()) # worker-1 has slept for 0.15 seconds # worker-0 has slept for 0.29 seconds # worker-2 has slept for 0.38 seconds # worker-0 has slept for 0.32 seconds # worker-1 has slept for 0.48 seconds # worker-0 has slept for 0.30 seconds # worker-2 has slept for 0.73 seconds # worker-1 has slept for 0.50 seconds # worker-2 has slept for 0.11 seconds # worker-0 has slept for 0.54 seconds # worker-1 has slept for 0.45 seconds # worker-2 has slept for 0.43 seconds # worker-0 has slept for 0.20 seconds # worker-0 has slept for 0.31 seconds # worker-2 has slept for 0.33 seconds # worker-1 has slept for 0.80 seconds # worker-2 has slept for 0.58 seconds # worker-1 has slept for 0.28 seconds # worker-0 has slept for 0.98 seconds # worker-2 has slept for 0.77 seconds # ==== # 3 workers slept in parallel for 3.34 seconds # total expected sleep time: 8.95 seconds