Пример иллюстрирует ситуацию когда нужно распараллелить обработку элементов списка, при этом получить результат в котором положение элементов будут соответствовать изначальному. То есть элементы в результирующего списка располагаются в том же порядке.
В примере будем использовать очередь FIFO queue.Queue()
и очередь с приоритетом queue.PriorityQueue()
.
import queue, threading, time, random def get_time(): return time.strftime('%H:%M:%S', time.localtime()) # данные для очереди data = list(range(10,150,15)) print('--> Start list\n', data) class Worker(threading.Thread): # counter - счетчик, являющийся индексом приоритетов выходной очереди counter=-1 def __init__(self, queue_in, queue_out): super(Worker, self).__init__() self.setDaemon(True) self._queue_in = queue_in self._queue_out = queue_out def run(self): """ Основной код здесь. """ while True: # Получаем задание из входящей очереди job = self._queue_in.get() # Увеличиваем счетчик Worker.counter +=1 # В num хранится порядок элемента массива, # взятого в текущей момент из очереди. num = Worker.counter print(f'{get_time()} {self.getName()} получил {job}') # Иммитируем занятость на случайное значение секунд. # В результате порядок обрабатываемых задач очереди перемешается time.sleep(random.randint(0, 15)) print(f'{get_time()} {self.getName()} выполнил {job}') # Помещаем кортеж из индекса и задания в результирующую очередь self._queue_out.put((num, job)) # Сообщаем входящей очереди что задача выполнена self._queue_in.task_done() # Создаем входящую FIFO очередь q = queue.Queue() # Создаем результирующую приоритетную очередь rez = queue.PriorityQueue() # Заполняем входящую очередь данными for i in data: q.put(i) # Создаем и запускаем потоки for i in range(3): w = Worker(q, rez) w.start() # Блокируем дальнейшее выполнение программы # до тех пор пока потоки не обслужат все элементы очереди q.join() # Формируем список как результат обработки изначального списка out = [] while not rez.empty(): out.append(rez.get()[1]) print('--> End list\n', out)
В результате выполнения получим вывод:
--> Start list [10, 25, 40, 55, 70, 85, 100, 115, 130, 145] 14:27:37 Thread-1 получил 10 14:27:37 Thread-2 получил 25 14:27:37 Thread-3 получил 40 14:27:43 Thread-3 выполнил 40 14:27:43 Thread-3 получил 55 14:27:44 Thread-1 выполнил 10 14:27:44 Thread-1 получил 70 14:27:44 Thread-2 выполнил 25 14:27:44 Thread-2 получил 85 14:27:44 Thread-3 выполнил 55 14:27:44 Thread-3 получил 100 14:27:47 Thread-2 выполнил 85 14:27:47 Thread-2 получил 115 14:27:58 Thread-2 выполнил 115 14:27:58 Thread-2 получил 130 14:27:59 Thread-1 выполнил 70 14:27:59 Thread-1 получил 145 14:27:59 Thread-3 выполнил 100 14:28:08 Thread-2 выполнил 130 14:28:12 Thread-1 выполнил 145 --> End list [10, 25, 40, 55, 70, 85, 100, 115, 130, 145]