Сообщить об ошибке.

Класс PriorityQueue() модуля queue в Python

Многопоточная очередь с приоритетом

Синтаксис:

import queue

q = queue.PriorityQueue(maxsize=0)

Параметры:

  • maxsize - максимальное количество элементов в очереди.

Возвращаемое значение:

Описание:

Класс PriorityQueue() модуля queue представляет собой конструктор для очереди с приоритетом.

Аргумент maxsize - это целое число, которое устанавливает верхний предел для количества элементов, которые могут быть помещены в очередь. Вставка будет блокироваться после достижения этого размера, пока элементы очереди не будут использованы.

Если maxsize меньше или равен нулю, то размер очереди будет бесконечен.

Записи с наименьшим значением извлекаются первыми. Запись с наименьшим значением - это запись, возвращенная sorted(list(entries))[0]). Типичным шаблоном для записей является кортеж в форме: (priority_number, data).

Если элементы данных несопоставимы, то данные могут быть помещены в класс, который игнорирует элемент данных и сравнивает только номер приоритета:

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)

Примеры использования класса queue.PriorityQueue():

Симулирование процесса обновления программного обеспечения на серверах. Особенность в том что обновления происходит в порядке определенной очереди, дабы исключить простой в обслуживании клиентов.

import queue, threading, time, random

def get_time():
    return time.strftime('%H:%M:%S', time.localtime())

workers=[(0, 'Балансировщик нагрузки'),
         (1, 'Сервер БД master'),
         (1, 'Сервер БД slave'),
         (50, 'Сервер APP-1'),
         (60, 'Сервер APP-2'),
         (70, 'Сервер web-1'),
         (75, 'Сервер web-2'),
         (99, 'Сервер логов')]

# Перемешиваем список серверов в случайном порядке
random.shuffle(workers)

def worker(queue):
    """
    основной код
    """
    while True:
        # получаем задание из очереди в виде кортежа (приоритет, сервер)
        job = queue.get()
        # Выводим информацию о начале обслуживания
        print (f'[+] {get_time()} START upgrade {job[1]}')
        # Иммитируем процесс обновления на случайное значение секунд.
        time.sleep(random.randint(5, 30))
        # Выводим информацию об окончании обслуживания сотрудника
        print (f'[-] {get_time()} END upgrade {job[1]}')
        # Сообщаем что задание выполнено
        queue.task_done()

# Создаем приоритетную очередь и наполним ее заданиями
q = queue.PriorityQueue()
for item in workers:
    q.put(item)

# Создаем 2 потока которые будут обслуживать очередь.
for i in range(2):
    # В конструктор потока передаем функцию worker() 
    # и последовательность аргументов этой функции.
    t = threading.Thread(target=worker, args=(q,))
    t.setDaemon(True)
    t.start()

# Блокируем выполнение программы до 
# выполнения всех заданий в очереди
q.join()

В результате получим вывод:

[+] 14:02:38 START upgrade Балансировщик нагрузки
[+] 14:02:38 START upgrade Сервер БД master
[-] 14:02:43 END upgrade Сервер БД master
[+] 14:02:43 START upgrade Сервер БД slave
[-] 14:02:48 END upgrade Балансировщик нагрузки
[+] 14:02:48 START upgrade Сервер APP-1
[-] 14:03:00 END upgrade Сервер БД slave
[+] 14:03:00 START upgrade Сервер APP-2
[-] 14:03:02 END upgrade Сервер APP-1
[+] 14:03:02 START upgrade Сервер web-1
[-] 14:03:11 END upgrade Сервер APP-2
[+] 14:03:11 START upgrade Сервер web-2
[-] 14:03:15 END upgrade Сервер web-1
[+] 14:03:15 START upgrade Сервер логов
[-] 14:03:19 END upgrade Сервер web-2
[-] 14:03:24 END upgrade Сервер логов