Сообщить об ошибке.

Обработка очереди в несколько потоков

Пример иллюстрирует ситуацию когда нужно распараллелить обработку элементов списка, при этом получить результат в котором положение элементов будут соответствовать изначальному. То есть элементы в результирующего списка располагаются в том же порядке.

В примере будем использовать очередь FIFO queue.Queue() и очередь с приоритетом queue.PriorityQueue().

import queue, threading, time, random

def get_time():
    return time.strftime('%H:%M:%S', time.localtime())

# данные для очереди
data = list(range(10,150,15))
print('--> Start list\n', data)

class Worker(threading.Thread):
    # counter - счетчик, являющийся индексом приоритетов выходной очереди
    counter=-1
    
    def __init__(self, queue_in, queue_out):
        super(Worker, self).__init__()
        self.setDaemon(True)
        self._queue_in = queue_in
        self._queue_out = queue_out
        
    def run(self):
        """
        Основной код здесь.
        """
        
        while True:
            # Получаем задание из входящей очереди
            job = self._queue_in.get()
            
            # Увеличиваем счетчик
            Worker.counter +=1
            # В num хранится порядок элемента массива, 
            # взятого в текущей момент из очереди.
            num = Worker.counter
            
            print(f'{get_time()} {self.getName()} получил {job}')

            # Иммитируем занятость на случайное значение секунд.
            # В результате порядок обрабатываемых задач очереди перемешается
            time.sleep(random.randint(0, 15))
            print(f'{get_time()} {self.getName()} выполнил {job}')

            # Помещаем кортеж из индекса и задания в результирующую очередь
            self._queue_out.put((num, job))
            # Сообщаем входящей очереди что задача выполнена
            self._queue_in.task_done()

# Создаем входящую FIFO очередь
q = queue.Queue()
# Создаем результирующую приоритетную очередь
rez = queue.PriorityQueue()

# Заполняем входящую очередь данными
for i in data:
    q.put(i)

# Создаем и запускаем потоки
for i in range(3):
    w = Worker(q, rez)
    w.start()

# Блокируем дальнейшее выполнение программы 
# до тех пор пока потоки не обслужат все элементы очереди
q.join()

# Формируем список как результат обработки изначального списка
out = []
while not rez.empty():
    out.append(rez.get()[1])

print('--> End list\n', out)

В результате выполнения получим вывод:

--> Start list
 [10, 25, 40, 55, 70, 85, 100, 115, 130, 145]

14:27:37 Thread-1 получил 10
14:27:37 Thread-2 получил 25
14:27:37 Thread-3 получил 40
14:27:43 Thread-3 выполнил 40
14:27:43 Thread-3 получил 55
14:27:44 Thread-1 выполнил 10
14:27:44 Thread-1 получил 70
14:27:44 Thread-2 выполнил 25
14:27:44 Thread-2 получил 85
14:27:44 Thread-3 выполнил 55
14:27:44 Thread-3 получил 100
14:27:47 Thread-2 выполнил 85
14:27:47 Thread-2 получил 115
14:27:58 Thread-2 выполнил 115
14:27:58 Thread-2 получил 130
14:27:59 Thread-1 выполнил 70
14:27:59 Thread-1 получил 145
14:27:59 Thread-3 выполнил 100
14:28:08 Thread-2 выполнил 130
14:28:12 Thread-1 выполнил 145

--> End list
 [10, 25, 40, 55, 70, 85, 100, 115, 130, 145]