Менеджеры предоставляют способ создания данных, которые могут совместно использоваться разными процессами, включая обмен по сети между процессами, запущенными на разных машинах. Объект-менеджер управляет серверным процессом, который управляет общими объектами. Другие процессы могут получить доступ к общим объектам с помощью прокси
.
import multiprocessing proxy = multiprocessing.Manager()
Класс Manager()
модуля multiprocessing
возвращает запущенный объект SyncManager
, который можно использовать для совместного использования объектов между процессами.
Возвращенный объект-менеджер SyncManager
соответствует порожденному дочернему процессу и имеет методы, которые будут создавать общие объекты и возвращать соответствующие прокси объекты
.
Процессы диспетчера будут завершены, как только они будут собраны сборщиком мусора или их родительский процесс завершится.
SyncManager
.Объект-менеджер SyncManager
представляет собой подкласс BaseManager
, который может использоваться для синхронизации процессов. Объекты этого типа возвращаются классом multiprocessing.Manager()
.
Методы объект-менеджера SyncManager
создают и возвращают прокси-объекты
для ряда часто используемых типов данных, которые должны быть синхронизированы между процессами. Эти типы, в частности, включают в себя обычные списки и словари Python.
Изменено в Python 3.6: общие объекты могут быть вложенными. Например, общий контейнерный объект, такой как общий список, может содержать другие общие объекты, которые будут управляться и синхронизироваться SyncManager
.
SyncManager
.SyncManager.Barrier()
возвращает прокси для threading.Barrier
,SyncManager.BoundedSemaphore()
возвращает прокси для threading.BoundedSemaphore
,SyncManager.Condition()
возвращает прокси для threading.Condition
,SyncManager.Event()
возвращает прокси для threading.Event
,SyncManager.Lock()
возвращает прокси для threading.Lock
,SyncManager.RLock()
возвращает прокси для threading.RLock
,SyncManager.Semaphore()
возвращает прокси для threading.Semaphore
,SyncManager.Queue()
возвращает прокси для queue.Queue
,SyncManager.Array()
возвращает прокси для multiprocessing.Array
,SyncManager.Value()
возвращает прокси для multiprocessing.Value
,SyncManager.Namespace()
возвращает прокси для Namespace
,SyncManager.dict()
возвращает прокси обычного словаря Python,SyncManager.list()
возвращает прокси для обычного списка Python,SyncManager.Barrier(parties[, action[, timeout]])
:Объект SyncManager.Barrier()
создает общий объект синхронизации threading.Barrier
и возвращает для него прокси.
SyncManager.BoundedSemaphore([value])
:Объект SyncManager.BoundedSemaphore()
создает общий объект синхронизации threading.BoundedSemaphore
и возвращает для него прокси.
SyncManager.Condition([lock])
:Объект SyncManager.Condition()
создает общий объект синхронизации threading.Condition
и возвращает для него прокси.
Если предоставляется аргумент lock
, то это должен быть прокси-объект для объекта threading.Lock
или threading.RLock
.
SyncManager.Event()
:Объект SyncManager.Event()
создает общий объект синхронизации threading.Event
и возвращает для него прокси.
SyncManager.Lock()
:Объект SyncManager.Lock()
создает общий объект синхронизации threading.Lock
и возвращает для него прокси.
SyncManager.RLock()
:Объект SyncManager.RLock()
создает общий объект синхронизации threading.RLock
и возвращает для него прокси.
SyncManager.Semaphore([value])
:Объект SyncManager.Semaphore()
создает общий объект синхронизации threading.Semaphore
и возвращает для него прокси.
SyncManager.Queue([maxsize])
:Объект SyncManager.Queue()
создает общий объект очереди queue.Queue
и возвращает для него прокси.
SyncManager.Array(typecode, sequence)
:Объект SyncManager.Array()
создает синхронизированный массив multiprocessing.Array
и возвращает для него прокси.
SyncManager.Value(typecode, value)
:Объект SyncManager.Value()
создает синхронизированный объект с атрибутом записываемого значения multiprocessing.Value
и возвращает для него прокси.
SyncManager.Namespace()
:Объект SyncManager.Namespace()
создает общий объект пространства имен Namespace
и возвращает для него прокси.
Объект пространства имен Namespace
- это тип, который можно зарегистрировать в SyncManager
.
Объект Namespace
не имеет общедоступных методов, но имеет атрибуты с возможностью записи в них. Его представление показывает значения его атрибутов.
Однако при использовании прокси для объекта пространства имен атрибут, начинающийся с '_', будет атрибутом прокси, а не атрибутом референта:
>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' # это атрибут прокси-сервера >>> Global._z = 12.3 >>> print(Global) Namespace(x=10, y='hello')
SyncManager.dict()
SyncManager.dict(mapping)
SyncManager.dict(sequence)
:Объект SyncManager.dict()
создает общий объект словаря dict
и возвращает для него прокси.
SyncManager.list()
SyncManager.list(sequence)
:Объект SyncManager.list()
создает общий объект списка list
и возвращает для него прокси.
Примечание. Изменения значений или элементов прокси-объектов dict
и list
не будут распространяться через диспетчер на другие потоки, поскольку прокси-сервер не имеет возможности узнать, когда его значения или элементы были изменены. Чтобы изменения вступили в силу, необходимо повторно присвоить значение изменяемому объекту прокси контейнера:
# создаем прокси-объект списка и добавляем # изменяемый объект (словарь) lproxy = manager.list() lproxy.append({}) # теперь изменяем словарь d = lproxy[0] d['a'] = 1 d['b'] = 2 # на данный момент изменения в словаре `d` еще не # синхронизированы, но при переназначении словаря # прокси-сервер получает уведомление об этом изменении lproxy[0] = d
Если есть объект управляемого списка manager.list()
, то любые изменения в этом списке будут распространяться на все остальные процессы. Но если использовать список не управляемый менеджером (например список внутри управляемого списка), то любые его изменения не распространяются на другие процессы, потому что менеджер процессов не может обнаружить это изменение.
Чтобы изменения вступили в силу, необходимо повторно присвоить значение изменяемому объекту прокси контейнера:
import multiprocessing import time def worker(ns, lst_mgr, dict_mgr): # Изеняем пространство имен, # с ним все нормально ns.x += 1 ns.y[0] += 1 ns_z = ns.z ns_z[0] += 1 ns.z = ns_z # Меняем список lst_mgr[0] += 1 # прямые изменения простого списка # внутри прокси-списка не распространяться # на другие процессы lst_mgr[1][0] += 1 # изменения распространяться на другие # потоки только после прямого присвоения # нового списка элементу прокси-списка temp_list = lst_mgr[2] temp_list[0] += 10 temp_list.append('100') lst_mgr[2] = temp_list # а если изменять прокси-список внутри # прокси-списка, то будет все нормально lst_mgr[3][0] += 5 lst_mgr[3].insert(0, 'вложенный прокси-список') # то же самое будет и со словарями dict_mgr[0] += 1 # изменения не увидим dict_mgr[1][0] += 1 # изменяем простой список (значение 2 ключа словаря) dict_val_temp = dict_mgr[2] dict_val_temp[0] += 10 dict_val_temp.append('100') dict_mgr[2] = dict_val_temp dict_mgr[3][0] += 5 dict_mgr[3].insert(0, 'прокси-список, прокси-словаря') if __name__ == '__main__': manager = multiprocessing.Manager() ns_mgr = manager.Namespace() ns_mgr.x = 1 ns_mgr.y = [1] ns_mgr.z = [1] lst_mgr = manager.list([1, [1], [1], manager.list([1])]) dict_mgr = manager.dict({0: 1, 1: [1], 2: [1], 3: manager.list([1])}) print('\nДо запуска процесса:') print('-' * 25) print('Пространство имен:', ns_mgr) print('Прокси-список:', lst_mgr) print('lst_mgr[3]:', list(lst_mgr[3])) print('Прокси-словарь:', dict_mgr) print('dict_mgr[3]:', list(dict_mgr[3])) p = multiprocessing.Process(target=worker, args=(ns_mgr, lst_mgr, dict_mgr)) p.start() p.join() print('\nПосле запуска процесса:') print('-' * 50) print('Пространство имен:', ns_mgr) print('Прокси-список:', lst_mgr) print('lst_mgr[3]:', list(lst_mgr[3])) print('Прокси-словарь:', dict_mgr) print('dict_mgr[3]:', list(dict_mgr[3])) # До запуска процесса: # ------------------------- # Пространство имен: Namespace(x=1, y=[1], z=[1]) # Прокси-список: [1, [1], [1], <ListProxy object, typeid 'list' at 0x7f34cedde040>] # lst_mgr[3]: [1] # Прокси-словарь: {0: 1, 1: [1], 2: [1], 3: <ListProxy object, typeid 'list' at 0x7f34cedde3a0>} # dict_mgr[3]: [1] # После запуска процесса: # -------------------------------------------------- # Пространство имен: Namespace(x=2, y=[1], z=[2]) # Прокси-список: [2, [1], [11, '100'], <ListProxy object, typeid 'list' at 0x7f34cedde040>] # lst_mgr[3]: ['вложенный прокси-список', 6] # Прокси-словарь: {0: 2, 1: [1], 2: [11, '100'], 3: <ListProxy object, typeid 'list' at 0x7f34cedde3a0>} # dict_mgr[3]: ['прокси-список, прокси-словаря', 6]
В примере показывается, как создавать и использовать общие ресурсы на основе менеджеров.
Будем осуществлять поочередное (для процессов) добавление элементов в общий ресурс - синхронизируемый список, блокировать доступ к списку из других потоков на время добавления будем объектом блокировки SyncManager.Lock
, созданным при помощи менеджера процессов.
import multiprocessing, time, random def worker(lst, dkt, lock): pid_proc = multiprocessing.current_process().pid proc_name = multiprocessing.current_process().name # блокируем доступ к прокси-списку из других потоков lock.acquire() try: for _ in range(3): # имитируем нагрузку, для того, что бы была # конкуренция доступа к общему ресурсу (списку) time.sleep(random.uniform(0.01, 0.1)) # добавляем PID процесса 3 раза lst.append(pid_proc) # если в словаре нет записи о # процессе, то добавляем if not dkt.get(pid_proc, None): dkt[pid_proc] = proc_name finally: # завершаем процесс и разрешаем # доступ к массиву другим процессам lock.release() if __name__ == '__main__': manager = multiprocessing.Manager() # создаем прокси список lst = manager.list() # создаем прокси словарь dkt = manager.dict() # создаем прокси блокировщик lock = manager.Lock() procs = [] for _ in range(3): proc = multiprocessing.Process(target=worker, args=(lst, dkt, lock)) procs.append(proc) proc.start() # Ждем результатов [proc.join() for proc in procs] print('Вывод результатов:') print(lst) print(dkt) # очищаем используемые ресурсы [proc.close() for proc in procs] # Вывод результатов: # [86138, 86138, 86138, 86139, 86139, 86139, 86140, 86140, 86140] # {86138: 'Process-2', 86139: 'Process-3', 86140: 'Process-4'}