Пакет multiprocessing
поддерживает порождение процессов с использованием API, аналогичного модулю threading
.
Модуль многопроцессорной обработки данных предлагает как локальную, так и удаленную параллельную обработку данных, эффективно обходя GIL (глобальную блокировку интерпретатора) и используя ядра процессора вместо потоков. Благодаря этому, этот модуль позволяет программисту полностью использовать несколько процессоров на данной машине. Он работает как под Unix, так и под Windows.
В Python 3.14 метод запуска многопроцессорной обработки по умолчанию будет изменен на более безопасный в Linux, BSDs и других платформах, отличных от macOS POSIX, где в настоящее время по умолчанию используется
"fork"
. Добавление предупреждения об этом во время выполнения было сочтено слишком разрушительным, так как ожидается, что большинству кода это будет безразлично.
multiprocessing
и threading
.multiprocessing
.multiprocessing
и threading
.В модуле multiprocessing
представлены API, не имеющие аналогов в модуле threading
. Ярким примером этого является объект multiprocessing.Pool
. Этот объект предлагает удобные средства параллельного выполнения функции для нескольких входных значений, автоматически распределяя их по ядрам процессора.
В следующем примере демонстрируется обычная практика определения таких функций в модуле, чтобы дочерние процессы могли успешно импортировать этот модуль. Этот базовый пример параллелизма данных с использованием пула ядер процессора.
import multiprocessing def worker(x): name_proc = multiprocessing.current_process().name res = x*x print(name_proc, res) return res data = range(3, 7) with multiprocessing.Pool(2) as pool: print('Результаты:') print(pool.map(worker, data)) # Результаты: # ForkPoolWorker-1 9 # ForkPoolWorker-2 16 # ForkPoolWorker-1 25 # ForkPoolWorker-1 36 # [9, 16, 25, 36]
Аналогичный пример с использованием API, аналогичного модулю threading
:
import multiprocessing def worker(rear, write): while not read.empty(): name_proc = multiprocessing.current_process().name x = read.get() res = x*x print(name_proc, res) write.put(res) else: read.close() write.close() write = multiprocessing.Queue() read = multiprocessing.Queue() [read.put(x) for x in range(3, 7)] NUM_CORE = 2 procs = [] for i in range(NUM_CORE): p = multiprocessing.Process(target=worker, args=(read, write,)) procs.append(p) p.start() [proc.join() for proc in procs] print([write.get() for _ in range(write.qsize())]) # Process-1 9 # Process-1 16 # Process-2 25 # Process-1 36 # [9, 16, 36, 25]
В зависимости от платформы модуль multiprocessing
поддерживает три способа запуска процесса.
Методы запуска:
spawn
:Родительский процесс запускает новый процесс интерпретатора Python. Дочерний процесс унаследует только те ресурсы, которые необходимы для запуска метода Process.run()
объекта multiprocessing.Process
. В частности, ненужные файловые дескрипторы и дескрипторы родительского процесса не будут унаследованы. Запуск процесса с использованием этого метода довольно медленный по сравнению с использованием fork
или forkserver
.
Изменено в Python 3.8: В macOS метод запуска spawn
теперь используется по умолчанию. Метод запуска fork
следует считать небезопасным, так как он может привести к сбоям подпроцесса.
Доступно в Unix и Windows. По умолчанию в Windows и macOS.
fork
:Изменено в Python 3.12: Если Python способен обнаружить, что процесс имеет несколько потоков, то функция
os.fork()
, которую этот метод запуска вызывает внутренне, выдаст предупреждение об устареванииDeprecationWarning
. Когда это предупреждение появляется, то исправление заключается в использовании другого метода запуска многопроцессорной обработки, такого как"spawn"
или"forkserver"
.
Родительский процесс использует os.fork()
для разветвления интерпретатора Python. Дочерний процесс, когда он начинается, фактически идентичен родительскому процессу. Все ресурсы родительского процесса наследуются дочерним процессом. Обратите внимание, что безопасное разветвление многопоточного процесса проблематично.
Обратите внимание, что метод запуска по умолчанию будет изменен с fork
в Python 3.14. Код, требующий fork
, должен явно указывать это с помощью get_context()
или set_start_method()
.
Доступно в системах POSIX. В настоящее время используется по умолчанию в POSIX, за исключением macOS.
forkserver
:Когда программа запускается и выбирает метод запуска forkserver
, запускается процесс сервера. С этого момента всякий раз, когда программе требуется новый процесс, родительский процесс подключается к серверу и запрашивает его разветвление для нового процесса. Процесс сервера является однопоточным, поэтому использование os.fork()
безопасно. Никакие ненужные ресурсы не наследуются.
Доступно на платформах Unix, которые поддерживают передачу дескрипторов файлов по каналам Unix.
В Unix использование методов запуска spawn
или forkserver
также запускает процесс отслеживания ресурсов, который отслеживает несвязанные именованные системные ресурсы (такие как именованные семафоры или объекты разделяемой памяти), созданные процессами программы. Когда все процессы завершены, трекер ресурсов отсоединяет все оставшиеся отслеживаемые объекты. Обычно их не должно быть, но если процесс был остановлен сигналом, могут быть "утечки" ресурсов. Ни семафоры, ни сегменты разделяемой памяти не будут автоматически разъединены до следующей перезагрузки. Это проблематично для обоих объектов, поскольку система допускает только ограниченное количество именованных семафоров, а сегменты разделяемой памяти занимают некоторое пространство в основной памяти.
Чтобы выбрать метод запуска, используете функцию модуля multiprocessing.set_start_method()
в предложении if __name__ == '__main__'
основного модуля. Функция multiprocessing.set_start_method()
не должна использоваться в программе более одного раза.
import multiprocessing def worker(q): q.put('hello') if __name__ == '__main__': multiprocessing.set_start_method('spawn') q = multiprocessing.Queue() p = multiprocessing.Process(target=worker, args=(q,)) p.start() print(q.get()) p.join()
В качестве альтернативы можно использовать функцию multiprocessing.get_context()
для получения объекта контекста. Объекты контекста имеют тот же API, что и модуль multiprocessing
, и позволяют использовать несколько методов запуска в одной программе.
import multiprocessing def worker(q): q.put('hello') if __name__ == '__main__': ctx = multiprocessing.get_context('spawn') q = ctx.Queue() p = ctx.Process(target=worker, args=(q,)) p.start() print(q.get()) p.join()
Обратите внимание, что объекты, относящиеся к одному контексту, могут быть несовместимы с процессами для другого контекста. В частности, блокировки, созданные с использованием контекста fork
, не могут быть переданы процессам, запущенным с помощью методов запуска spawn
или forkserver
.
Библиотека, которая хочет использовать определенный метод запуска, вероятно, должна использовать get_context()
, чтобы не мешать выбору пользователя библиотеки.
Предупреждение В настоящее время методы запуска spawn
и forkserver
не могут использоваться с "замороженными" исполняемыми файлами. То есть с двоичными файлами, созданными такими пакетами, как pyInstaller
и cx_Freeze
в Unix. Метод запуска fork
работает с такими файлами нормально.
При использовании нескольких процессов обычно используется передача сообщений для связи между процессами и избегается необходимости использования каких-либо примитивов синхронизации, таких как блокировки.
Модуль multiprocessing
поддерживает два типа каналов связи между процессами.
Queues
, в собственной реализации.Класс multiprocessing.Queue
является почти клоном класса queue.Queue
. Очереди безопасны для потоков в разных ядрах процессора.
import multiprocessing def worker(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = multiprocessing.Queue() p = multiprocessing.Process(target=worker, args=(q,)) p.start() print(q.get()) p.join() # "[42, None, 'hello']"
Pipes
.Класс multiprocessing.Pipe()
возвращает пару объектов, соединенных каналом, которые по умолчанию является duplex
двусторонним.
from multiprocessing import def worker(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = multiprocessing.Pipe() p = multiprocessing.Process(target=worker, args=(child_conn,)) p.start() print(parent_conn.recv()) p.join() # "[42, None, 'hello']"
Два объекта соединения, возвращаемые multiprocessing.Pipe()
, представляют два конца канала. Каждый объект подключения имеет методы Pipe.send()
- посылает данные в канал и Pipe.recv()
- читает данные из канала.
Обратите внимание, что данные в канале могут быть повреждены, если два процесса или потока попытаются читать или записывать в один и тот же конец канала одновременно. Конечно, нет риска повреждения из-за процессов, использующих разные концы канала одновременно.
Как правило, примитивы синхронизации не так необходимы в программе, использующей несколько ядер процессора, как в многопоточной.
Однако, модуль multiprocessing
содержит эквиваленты всех примитивов синхронизации из модуля threading
. Например, можно использовать блокировку Lock
для обеспечения того, что только один процесс печатает на стандартный вывод за раз.
Без использования блокировки вывод различных процессов может все перемешать.
import multiprocessing def worker(lock, i): lock.acquire() try: print('hello world', i) finally: lock.release() if __name__ == '__main__': lock = multiprocessing.Lock() for num in range(10): multiprocessing.Process(target=worker, args=(lock, num)).start()
Как упоминалось выше, при параллельном программировании обычно лучше избегать использования общих ресурсов, насколько это возможно. Это особенно верно при использовании нескольких ядер процессора.
Но если все-же действительно необходимо использование каких-то общих данных, то модуль multiprocessing
предоставляет несколько способов сделать это.
Данные могут быть сохранены на карте общей памяти с помощью multiprocessing.Value
или multiprocessing.Array
.
import multiprocessing def worker(num, arr): num.value = 3.1415927 for i in range(len(arr)): arr[i] = -arr[i] if __name__ == '__main__': num = multiprocessing.Value('d', 0.0) arr = multiprocessing.Array('i', range(10)) p = multiprocessing.Process(target=worker, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:]) # 3.1415927 # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
Аргументы 'd'
и 'i'
, используемые при создании переменных num
и arr
, являются кодами типа, который используется модулем array
: 'd'
указывает на число с плавающей запятой двойной точности, а 'i'
указывает на целое число со знаком. Эти общие объекты будут процессными и поточно-ориентированными.
Для большей гибкости в использовании разделяемой памяти можно использовать модуль multiprocessing.sharedctypes
, который поддерживает создание произвольных объектов ctypes
, выделенных из разделяемой памяти.
Объект SyncManager
, возвращаемый multiprocessing.Manager()
, управляет серверным процессом, который содержит объекты Python и позволяет другим процессам манипулировать ими с помощью прокси-объектов
.
Например:
import multiprocessing def worker(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with multiprocessing.Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = multiprocessing.Process(target=worker, args=(d, l)) p.start() p.join() print(d) print(l) # {0.25: None, 1: '1', '2': 2} # [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Менеджеры серверных процессов более гибкие, чем использование объектов общей памяти, т. к. могут быть созданы для поддержки произвольных типов объектов. Кроме того, один менеджер может использоваться совместно процессами на разных компьютерах в сети. Однако они медленнее, чем при использовании общей памяти.
multiprocessing
.Существуют определенные правила и идиомы, которых следует придерживаться при использовании многопроцессорной обработки данных.
Избегайте общих ресурсов.
Насколько это возможно, нужно стараться избегать перемещения больших объемов данных между процессами. Вероятно, лучше придерживаться использования очередей или каналов для связи между процессами, чем использовать примитивы синхронизации более низкого уровня.
Picklability.
Убедитесь,что аргументы методов прокси-объектов
являются упакованы модулем pickle
.
Потоковая безопасность прокси.
Не используйте прокси-объект из более чем одного потока, если вы не защитите его блокировкой. Никогда не возникает проблем с разными процессами, использующими один и тот же прокси.
Присоединение к зомби-процессам.
В Unix, когда процесс завершается, но к нему не присоединяются, он становится зомби. Их никогда не должно быть очень много, потому что каждый раз, когда запускается новый процесс или вызывается active_children()
, все завершенные процессы, которые еще не были присоединены, будут объединены. Также вызов метода Process.is_alive()
завершенного процесса присоединится к процессу. Тем не менее, хорошей практикой является явное присоединение ко всем процессам, которые запускаются.
Лучше наследовать, чем pickle
/unpickle
.
При использовании методов запуска spawn
или forkserver
многие типы из multiprocessing
должны быть упакованы модулем pickle
, чтобы дочерние процессы могли их использовать. Обычно следует избегать отправки общих объектов другим процессам с использованием каналов или очередей.
В общем необходимо организовать программу так, чтобы процесс, которому требуется доступ к совместно используемому ресурсу, созданному где-то еще, мог унаследовать его от процесса-предка.
Избегайте завершения процессов.
Использование метода Process.terminate()
для остановки процесса может привести к тому, что любые общие ресурсы, такие как блокировки, семафоры, каналы и очереди, в настоящее время используемые процессом, станут сломанными или недоступными для других процессов. Поэтому, вероятно, лучше всего использовать этот метод только для процессов, которые никогда не используют общие ресурсы.
Присоединение к процессам, использующим очереди.
Имейте в виду, что процесс, который поместил элементы в очередь, будет ждать перед завершением, пока все буферизованные элементы не будут переданы потоком "питателя" в нижележащий канал. Дочерний процесс может вызвать метод очереди Queue.cancel_join_thread
, чтобы избежать такого поведения.
Это означает, что всякий раз, когда используется очередь, необходимо убедиться, что все элементы, помещенные в очередь, в конечном итоге будут удалены до присоединения к процессу. В противном случае, никто не может быть уверен, что процессы, поместившие элементы в очередь, завершатся. Помните также, что не демонические процессы будут присоединяться автоматически.
Вот пример тупиковой ситуации:
import multiprocessing def worker(q): q.put('X' * 1000000) if __name__ == '__main__': queue = multiprocessing.Queue() p = multiprocessing.Process(target=worker, args=(queue,)) p.start() p.join() # это тупик obj = queue.get()
Что бы исправить ситуацию в примере выше, нужно поменять местами последние две строки или просто удалить строку p.join()
.
Явная передача ресурсов дочерним процессам.
В Unix, использующем метод запуска fork
, дочерний процесс может использовать общий ресурс, созданный в родительском процессе с использованием глобального ресурса. Лучше передать объект в качестве аргумента конструктору дочернего процесса.
Помимо обеспечения совместимости кода (потенциально) с Windows и другими методами запуска, это также гарантирует, что, пока дочерний процесс все еще жив, объект не будет собираться сборщиком мусора в родительском процессе. Это может быть важно, если какой-то ресурс освобождается при сборке мусора в родительском процессе.
Так например:
import multiprocessing def worker(): ... do something using "lock" ... if __name__ == '__main__': lock = multiprocessing.Lock() for i in range(10): multiprocessing.Process(target=worker).start()
следует переписать как:
import multiprocessing def worker(l): ... do something using "l" ... if __name__ == '__main__': lock = multiprocessing.Lock() for i in range(10): multiprocessing.Process(target=worker, args=(lock,)).start()
Остерегайтесь замены sys.stdin
на файловый объект.
Опасность заключается в том, что если несколько процессов вызовут file.close()
для этого файлового объекта, то такое поведение может привести к тому, что одни и те же данные будут сброшены в него несколько раз, что приведет к повреждению.
spawn
и forkserver
.Есть несколько дополнительных ограничений, которые не применяются к методу запуска fork
.
Больше picklability
.
Убедитесь, что все аргументы конструктора Process.__init__()
являются picklable
. Кроме того, если создается подкласс multiprocessing.Process()
, необходимо убедится, что экземпляры будут picklable
при вызове метода Process.start()
.
Глобальные переменные.
Имейте в виду, если код, выполняемый в дочернем процессе, пытается получить доступ к глобальной переменной, то значение, которое он видит (если оно есть), может не совпадать со значением в родительском процессе во время вызова метода Process.start()
.
Однако глобальные переменные, которые являются просто константами уровня модуля, не вызывают проблем.
Безопасный импорт основного модуля.
Убедитесь, что основной модуль может быть безопасно импортирован новым интерпретатором Python, не вызывая нежелательных побочных эффектов, таких как запуск нового процесса.
Например, при использовании метода запуска spawn
или forkserver
, выполняющего следующий модуль, произойдет сбой с исключением RuntimeError
:
from multiprocessing import Process def worker(): print('hello') p = multiprocessing.Process(target=worker) p.start()
Вместо этого следует защитить точку входа программы, используя if __name__ == '__main__'
:
import multiprocessing def worker(): print('hello') if __name__ == '__main__': multiprocessing.freeze_support() multiprocessing.set_start_method('spawn') p = multiprocessing.Process(target=worker) p.start()
Строку multiprocessing.freeze_support()
можно не указывать, если программа будет запускаться в обычном режиме, а не будет заморожена.
Это позволяет вновь созданному интерпретатору Python безопасно импортировать модуль и затем запускать функцию модуля worker()
.
Подобные ограничения применяются, если пул или менеджер создается в основном модуле.