Пакет multiprocessing поддерживает порождение процессов с использованием API, аналогичного модулю threading.
Модуль многопроцессорной обработки данных предлагает как локальную, так и удаленную параллельную обработку данных, эффективно обходя GIL (глобальную блокировку интерпретатора) и используя ядра процессора вместо потоков. Благодаря этому, этот модуль позволяет программисту полностью использовать несколько процессоров на данной машине. Он работает как под Unix, так и под Windows.
Изменено в версии 3.14 На POSIX-платформах метод по умолчанию был изменён с
forkнаforkserver, чтобы сохранить производительность, но избежать распространённых проблем с многопоточностью.
multiprocessing и threading.multiprocessing.multiprocessing и threading.В модуле multiprocessing представлены API, не имеющие аналогов в модуле threading. Ярким примером этого является объект multiprocessing.Pool. Этот объект предлагает удобные средства параллельного выполнения функции для нескольких входных значений, автоматически распределяя их по ядрам процессора.
В следующем примере демонстрируется обычная практика определения таких функций в модуле, чтобы дочерние процессы могли успешно импортировать этот модуль. Этот базовый пример параллелизма данных с использованием пула ядер процессора.
import multiprocessing def worker(x): name_proc = multiprocessing.current_process().name res = x*x print(name_proc, res) return res data = range(3, 7) with multiprocessing.Pool(2) as pool: print('Результаты:') print(pool.map(worker, data)) # Результаты: # ForkPoolWorker-1 9 # ForkPoolWorker-2 16 # ForkPoolWorker-1 25 # ForkPoolWorker-1 36 # [9, 16, 25, 36]
Аналогичный пример с использованием API, аналогичного модулю threading:
import multiprocessing def worker(rear, write): while not read.empty(): name_proc = multiprocessing.current_process().name x = read.get() res = x*x print(name_proc, res) write.put(res) else: read.close() write.close() write = multiprocessing.Queue() read = multiprocessing.Queue() [read.put(x) for x in range(3, 7)] NUM_CORE = 2 procs = [] for i in range(NUM_CORE): p = multiprocessing.Process(target=worker, args=(read, write,)) procs.append(p) p.start() [proc.join() for proc in procs] print([write.get() for _ in range(write.qsize())]) # Process-1 9 # Process-1 16 # Process-2 25 # Process-1 36 # [9, 16, 36, 25]
В зависимости от платформы, модуль multiprocessing поддерживает три способа запуска процесса. Эти методы называются:
spawn:Изменено в версии 3.14. Метод
spawnбольше не используется по умолчанию ни на одной POSIX-платформе.
Родительский процесс запускает новый экземпляр интерпретатора Python. Дочерний процесс наследует только те ресурсы, которые необходимы для выполнения метода run() объекта процесса. В частности, ненужные дескрипторы файлов и дескрипторы системных объектов из родительского процесса не наследуются.
Запуск процесса этим методом работает медленнее по сравнению с fork или forkserver.
fork:Изменено в версии 3.14. Метод
forkбольше не является методом по умолчанию ни на одной платформе. Если он нужен, то его следует явно указывать черезget_context()илиset_start_method().
Родительский процесс использует функцию os.fork() для создания копии Python-интерпретатора. При запуске дочерний процесс практически идентичен родительскому. Все ресурсы родительского процесса наследуются дочерним.
Обратите внимание: безопасное использование fork в многопоточных приложениях может быть проблематичным.
Изменено в версии 3.12 Если Python определяет, что процесс содержит несколько потоков, внутренний вызов
os.fork()будет генерировать предупреждениеDeprecationWarning. Рекомендуется использовать другой метод запуска. Подробности см. в документации кos.fork().
forkserver:При запуске программы и выборе метода forkserver, создаётся серверный процесс. Затем, каждый раз, когда требуется создать новый процесс, родительский процесс соединяется с сервером и запрашивает создание нового процесса.
Серверный процесс однопоточный (если только системные библиотеки или импортированные модули не создают дополнительные потоки), поэтому использование os.fork() в нём считается безопасным. Ненужные ресурсы не наследуются.
Изменено в Python 3.8. На macOS метод
spawnстал использоваться по умолчанию. Использование методаforkтеперь считается небезопасным, так как может привести к сбоям дочерних процессов - системные библиотеки macOS могут запускать потоки.
Изменено в Python 3.14 На POSIX-платформах метод по умолчанию был изменён с
forkнаforkserver, чтобы сохранить производительность, но избежать распространённых проблем с многопоточностью.
С версии Python 3.14
"forkserver"вmultiprocessingаутентифицирует соединение через свой управляющий сокет, чтобы не полагаться только на права доступа к файлам в системе для ограничения возможности других процессов запускать дочерние процессы. Это улучшает безопасность.
В Unix использование методов запуска spawn или forkserver также запускает процесс отслеживания ресурсов, который отслеживает несвязанные именованные системные ресурсы (такие как именованные семафоры или объекты разделяемой памяти), созданные процессами программы. Когда все процессы завершены, трекер ресурсов отсоединяет все оставшиеся отслеживаемые объекты. Обычно их не должно быть, но если процесс был остановлен сигналом, могут быть "утечки" ресурсов. Ни семафоры, ни сегменты разделяемой памяти не будут автоматически разъединены до следующей перезагрузки. Это проблематично для обоих объектов, поскольку система допускает только ограниченное количество именованных семафоров, а сегменты разделяемой памяти занимают некоторое пространство в основной памяти.
Чтобы выбрать метод запуска, используете функцию модуля multiprocessing.set_start_method() в предложении if __name__ == '__main__' основного модуля. Функция multiprocessing.set_start_method() не должна использоваться в программе более одного раза.
import multiprocessing def worker(q): q.put('hello') if __name__ == '__main__': multiprocessing.set_start_method('spawn') q = multiprocessing.Queue() p = multiprocessing.Process(target=worker, args=(q,)) p.start() print(q.get()) p.join()
В качестве альтернативы можно использовать функцию multiprocessing.get_context() для получения объекта контекста. Объекты контекста имеют тот же API, что и модуль multiprocessing, и позволяют использовать несколько методов запуска в одной программе.
import multiprocessing def worker(q): q.put('hello') if __name__ == '__main__': ctx = multiprocessing.get_context('spawn') q = ctx.Queue() p = ctx.Process(target=worker, args=(q,)) p.start() print(q.get()) p.join()
Обратите внимание, что объекты, относящиеся к одному контексту, могут быть несовместимы с процессами для другого контекста. В частности, блокировки, созданные с использованием контекста fork, не могут быть переданы процессам, запущенным с помощью методов запуска spawn или forkserver.
Библиотека, которая хочет использовать определенный метод запуска, вероятно, должна использовать get_context(), чтобы не мешать выбору пользователя библиотеки.
Предупреждение В настоящее время методы запуска spawn и forkserver не могут использоваться с "замороженными" исполняемыми файлами. То есть с двоичными файлами, созданными такими пакетами, как pyInstaller и cx_Freeze в Unix. Метод запуска fork работает с такими файлами нормально.
При использовании нескольких процессов обычно используется передача сообщений для связи между процессами и избегается необходимости использования каких-либо примитивов синхронизации, таких как блокировки.
Модуль multiprocessing поддерживает два типа каналов связи между процессами.
Queues, в собственной реализации.Класс multiprocessing.Queue является почти клоном класса queue.Queue. Очереди безопасны для потоков в разных ядрах процессора.
import multiprocessing def worker(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = multiprocessing.Queue() p = multiprocessing.Process(target=worker, args=(q,)) p.start() print(q.get()) p.join() # "[42, None, 'hello']"
Pipes.Класс multiprocessing.Pipe() возвращает пару объектов, соединенных каналом, которые по умолчанию является duplex двусторонним.
from multiprocessing import def worker(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = multiprocessing.Pipe() p = multiprocessing.Process(target=worker, args=(child_conn,)) p.start() print(parent_conn.recv()) p.join() # "[42, None, 'hello']"
Два объекта соединения, возвращаемые multiprocessing.Pipe(), представляют два конца канала. Каждый объект подключения имеет методы Pipe.send() - посылает данные в канал и Pipe.recv() - читает данные из канала.
Обратите внимание, что данные в канале могут быть повреждены, если два процесса или потока попытаются читать или записывать в один и тот же конец канала одновременно. Конечно, нет риска повреждения из-за процессов, использующих разные концы канала одновременно.
Как правило, примитивы синхронизации не так необходимы в программе, использующей несколько ядер процессора, как в многопоточной.
Однако, модуль multiprocessing содержит эквиваленты всех примитивов синхронизации из модуля threading. Например, можно использовать блокировку Lock для обеспечения того, что только один процесс печатает на стандартный вывод за раз.
Без использования блокировки вывод различных процессов может все перемешать.
import multiprocessing def worker(lock, i): lock.acquire() try: print('hello world', i) finally: lock.release() if __name__ == '__main__': lock = multiprocessing.Lock() for num in range(10): multiprocessing.Process(target=worker, args=(lock, num)).start()
Как упоминалось выше, при параллельном программировании обычно лучше избегать использования общих ресурсов, насколько это возможно. Это особенно верно при использовании нескольких ядер процессора.
Но если все-же действительно необходимо использование каких-то общих данных, то модуль multiprocessing предоставляет несколько способов сделать это.
Данные могут быть сохранены на карте общей памяти с помощью multiprocessing.Value или multiprocessing.Array.
import multiprocessing def worker(num, arr): num.value = 3.1415927 for i in range(len(arr)): arr[i] = -arr[i] if __name__ == '__main__': num = multiprocessing.Value('d', 0.0) arr = multiprocessing.Array('i', range(10)) p = multiprocessing.Process(target=worker, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:]) # 3.1415927 # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
Аргументы 'd' и 'i', используемые при создании переменных num и arr, являются кодами типа, который используется модулем array: 'd' указывает на число с плавающей запятой двойной точности, а 'i' указывает на целое число со знаком. Эти общие объекты будут процессными и поточно-ориентированными.
Для большей гибкости в использовании разделяемой памяти можно использовать модуль multiprocessing.sharedctypes, который поддерживает создание произвольных объектов ctypes, выделенных из разделяемой памяти.
Объект SyncManager, возвращаемый multiprocessing.Manager(), управляет серверным процессом, который содержит объекты Python и позволяет другим процессам манипулировать ими с помощью прокси-объектов.
Например:
import multiprocessing def worker(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with multiprocessing.Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = multiprocessing.Process(target=worker, args=(d, l)) p.start() p.join() print(d) print(l) # {0.25: None, 1: '1', '2': 2} # [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Менеджеры серверных процессов более гибкие, чем использование объектов общей памяти, т. к. могут быть созданы для поддержки произвольных типов объектов. Кроме того, один менеджер может использоваться совместно процессами на разных компьютерах в сети. Однако они медленнее, чем при использовании общей памяти.
multiprocessing.Существуют определенные правила и идиомы, которых следует придерживаться при использовании многопроцессорной обработки данных.
Избегайте общих ресурсов.
Насколько это возможно, нужно стараться избегать перемещения больших объемов данных между процессами. Вероятно, лучше придерживаться использования очередей или каналов для связи между процессами, чем использовать примитивы синхронизации более низкого уровня.
Picklability.
Убедитесь,что аргументы методов прокси-объектов являются упакованы модулем pickle.
Потоковая безопасность прокси.
Не используйте прокси-объект из более чем одного потока, если вы не защитите его блокировкой. Никогда не возникает проблем с разными процессами, использующими один и тот же прокси.
Присоединение к зомби-процессам.
В Unix, когда процесс завершается, но к нему не присоединяются, он становится зомби. Их никогда не должно быть очень много, потому что каждый раз, когда запускается новый процесс или вызывается active_children(), все завершенные процессы, которые еще не были присоединены, будут объединены. Также вызов метода Process.is_alive() завершенного процесса присоединится к процессу. Тем не менее, хорошей практикой является явное присоединение ко всем процессам, которые запускаются.
Лучше наследовать, чем pickle/unpickle.
При использовании методов запуска spawn или forkserver многие типы из multiprocessing должны быть упакованы модулем pickle, чтобы дочерние процессы могли их использовать. Обычно следует избегать отправки общих объектов другим процессам с использованием каналов или очередей.
В общем необходимо организовать программу так, чтобы процесс, которому требуется доступ к совместно используемому ресурсу, созданному где-то еще, мог унаследовать его от процесса-предка.
Избегайте завершения процессов.
Использование метода Process.terminate() для остановки процесса может привести к тому, что любые общие ресурсы, такие как блокировки, семафоры, каналы и очереди, в настоящее время используемые процессом, станут сломанными или недоступными для других процессов. Поэтому, вероятно, лучше всего использовать этот метод только для процессов, которые никогда не используют общие ресурсы.
Присоединение к процессам, использующим очереди.
Имейте в виду, что процесс, который поместил элементы в очередь, будет ждать перед завершением, пока все буферизованные элементы не будут переданы потоком "питателя" в нижележащий канал. Дочерний процесс может вызвать метод очереди Queue.cancel_join_thread, чтобы избежать такого поведения.
Это означает, что всякий раз, когда используется очередь, необходимо убедиться, что все элементы, помещенные в очередь, в конечном итоге будут удалены до присоединения к процессу. В противном случае, никто не может быть уверен, что процессы, поместившие элементы в очередь, завершатся. Помните также, что не демонические процессы будут присоединяться автоматически.
Вот пример тупиковой ситуации:
import multiprocessing def worker(q): q.put('X' * 1000000) if __name__ == '__main__': queue = multiprocessing.Queue() p = multiprocessing.Process(target=worker, args=(queue,)) p.start() p.join() # это тупик obj = queue.get()
Что бы исправить ситуацию в примере выше, нужно поменять местами последние две строки или просто удалить строку p.join().
Явная передача ресурсов дочерним процессам.
В Unix, использующем метод запуска fork, дочерний процесс может использовать общий ресурс, созданный в родительском процессе с использованием глобального ресурса. Лучше передать объект в качестве аргумента конструктору дочернего процесса.
Помимо обеспечения совместимости кода (потенциально) с Windows и другими методами запуска, это также гарантирует, что, пока дочерний процесс все еще жив, объект не будет собираться сборщиком мусора в родительском процессе. Это может быть важно, если какой-то ресурс освобождается при сборке мусора в родительском процессе.
Так например:
import multiprocessing def worker(): ... do something using "lock" ... if __name__ == '__main__': lock = multiprocessing.Lock() for i in range(10): multiprocessing.Process(target=worker).start()
следует переписать как:
import multiprocessing def worker(l): ... do something using "l" ... if __name__ == '__main__': lock = multiprocessing.Lock() for i in range(10): multiprocessing.Process(target=worker, args=(lock,)).start()
Остерегайтесь замены sys.stdin на файловый объект.
Опасность заключается в том, что если несколько процессов вызовут file.close() для этого файлового объекта, то такое поведение может привести к тому, что одни и те же данные будут сброшены в него несколько раз, что приведет к повреждению.
spawn и forkserver.Есть несколько дополнительных ограничений, которые не применяются к методу запуска fork.
Больше picklability.
Убедитесь, что все аргументы конструктора Process.__init__() являются picklable. Кроме того, если создается подкласс multiprocessing.Process(), необходимо убедится, что экземпляры будут picklable при вызове метода Process.start().
Глобальные переменные.
Имейте в виду, если код, выполняемый в дочернем процессе, пытается получить доступ к глобальной переменной, то значение, которое он видит (если оно есть), может не совпадать со значением в родительском процессе во время вызова метода Process.start().
Однако глобальные переменные, которые являются просто константами уровня модуля, не вызывают проблем.
Безопасный импорт основного модуля.
Убедитесь, что основной модуль может быть безопасно импортирован новым интерпретатором Python, не вызывая нежелательных побочных эффектов, таких как запуск нового процесса.
Например, при использовании метода запуска spawn или forkserver, выполняющего следующий модуль, произойдет сбой с исключением RuntimeError:
from multiprocessing import Process def worker(): print('hello') p = multiprocessing.Process(target=worker) p.start()
Вместо этого следует защитить точку входа программы, используя if __name__ == '__main__':
import multiprocessing def worker(): print('hello') if __name__ == '__main__': multiprocessing.freeze_support() multiprocessing.set_start_method('spawn') p = multiprocessing.Process(target=worker) p.start()
Строку multiprocessing.freeze_support() можно не указывать, если программа будет запускаться в обычном режиме, а не будет заморожена.
Это позволяет вновь созданному интерпретатору Python безопасно импортировать модуль и затем запускать функцию модуля worker().
Подобные ограничения применяются, если пул или менеджер создается в основном модуле.