Менеджеры предоставляют способ создания данных, которые могут совместно использоваться разными процессами, включая обмен по сети между процессами, запущенными на разных машинах. Объект-менеджер управляет серверным процессом, который управляет общими объектами. Другие процессы могут получить доступ к общим объектам с помощью прокси.
import multiprocessing proxy = multiprocessing.Manager()
Класс Manager() модуля multiprocessing возвращает запущенный объект SyncManager, который можно использовать для совместного использования объектов между процессами.
Возвращенный объект-менеджер SyncManager соответствует порожденному дочернему процессу и имеет методы, которые будут создавать общие объекты и возвращать соответствующие прокси объекты.
Процессы диспетчера будут завершены, как только они будут собраны сборщиком мусора или их родительский процесс завершится.
SyncManager.Объект-менеджер SyncManager представляет собой подкласс BaseManager, который может использоваться для синхронизации процессов. Объекты этого типа возвращаются классом multiprocessing.Manager().
Методы объект-менеджера SyncManager создают и возвращают прокси-объекты для ряда часто используемых типов данных, которые должны быть синхронизированы между процессами. Эти типы, в частности, включают в себя обычные списки и словари Python.
Изменено в Python 3.6: общие объекты могут быть вложенными. Например, общий контейнерный объект, такой как общий список, может содержать другие общие объекты, которые будут управляться и синхронизироваться SyncManager.
SyncManager.SyncManager.Barrier() возвращает прокси для threading.Barrier,SyncManager.BoundedSemaphore() возвращает прокси для threading.BoundedSemaphore,SyncManager.Condition() возвращает прокси для threading.Condition,SyncManager.Event() возвращает прокси для threading.Event,SyncManager.Lock() возвращает прокси для threading.Lock,SyncManager.RLock() возвращает прокси для threading.RLock,SyncManager.Semaphore() возвращает прокси для threading.Semaphore,SyncManager.Queue() возвращает прокси для queue.Queue,SyncManager.Array() возвращает прокси для multiprocessing.Array,SyncManager.Value() возвращает прокси для multiprocessing.Value,SyncManager.Namespace() возвращает прокси для Namespace,SyncManager.dict() возвращает прокси обычного словаря Python,SyncManager.list() возвращает прокси для обычного списка Python,SyncManager.set() возвращает прокси для обычного множества Python (добавлено в Python 3.14),SyncManager.Barrier(parties[, action[, timeout]]):Объект SyncManager.Barrier() создает общий объект синхронизации threading.Barrier и возвращает для него прокси.
SyncManager.BoundedSemaphore([value]):Объект SyncManager.BoundedSemaphore() создает общий объект синхронизации threading.BoundedSemaphore и возвращает для него прокси.
SyncManager.Condition([lock]):Объект SyncManager.Condition() создает общий объект синхронизации threading.Condition и возвращает для него прокси.
Если предоставляется аргумент lock, то это должен быть прокси-объект для объекта threading.Lock или threading.RLock.
SyncManager.Event():Объект SyncManager.Event() создает общий объект синхронизации threading.Event и возвращает для него прокси.
SyncManager.Lock():Объект SyncManager.Lock() создает общий объект синхронизации threading.Lock и возвращает для него прокси.
SyncManager.RLock():Объект SyncManager.RLock() создает общий объект синхронизации threading.RLock и возвращает для него прокси.
SyncManager.Semaphore([value]):Объект SyncManager.Semaphore() создает общий объект синхронизации threading.Semaphore и возвращает для него прокси.
SyncManager.Queue([maxsize]):Объект SyncManager.Queue() создает общий объект очереди queue.Queue и возвращает для него прокси.
SyncManager.Array(typecode, sequence):Объект SyncManager.Array() создает синхронизированный массив multiprocessing.Array и возвращает для него прокси.
SyncManager.Value(typecode, value):Объект SyncManager.Value() создает синхронизированный объект с атрибутом записываемого значения multiprocessing.Value и возвращает для него прокси.
SyncManager.Namespace():SyncManager.Namespace() создает прокси-объект пространства имен, который позволяет безопасно обмениваться данными между процессами через атрибуты объекта.
Создание пространства имен
with Manager() as manager: # Пустое пространство имен shared_ns = manager.Namespace() # С начальными атрибутами shared_ns = manager.Namespace(x=1, y=2, data=[1,2,3])
Пример использования
def worker(ns, value): ns.result = value * 2 # Добавляем новый атрибут ns.x += 1 # Модифицируем существующий if __name__ == '__main__': with Manager() as manager: ns = manager.Namespace(x=0, y=0) procs = [Process(target=worker, args=(ns, i)) for i in range(5)] for p in procs: p.start() for p in procs: p.join() print(f"Результат: {ns.result}, x={ns.x}") # Возможный вывод: Результат: 8, x=5
Характеристики
| Тип | multiprocessing.managers.NamespaceProxy |
| Синхронизация | Автоматическая между процессами |
| Доступ к атрибутам | Через точку: ns.attribute |
| Изменяемость | Атрибуты можно изменять/добавлять |
| Вложенность | Может содержать другие прокси-объекты |
| Сериализация | Поддержка pickle |
| Производительность | Медленнее обычных объектов |
Важные особенности
Динамические атрибуты:
ns = manager.Namespace() ns.new_attr = "value" # Можно добавлять новые атрибуты
Ограничения типов:
ns.valid = manager.list() # OK ns.invalid = lambda x: x+1 # Несериализуемые объекты
Сравнение объектов:
ns1 = manager.Namespace(x=1) ns2 = manager.Namespace(x=1) ns1 == ns2 # False (сравниваются прокси, не данные)
Пример конфигурации процессов
config = manager.Namespace( debug=True, timeout=30, services=manager.list(['auth', 'db']) ) def worker(conf): if conf.debug: print(f"Timeout: {conf.timeout}")
Пример сбора статистики
stats = manager.Namespace( requests=0, errors=0, processing_time=0.0 ) def update_stats(s, time_taken): s.requests += 1 s.processing_time += time_taken
Советы по использованию
Manager.dict()with Manager() as manager: # Автоматическое закрытие ns = manager.Namespace() # работа с пространством имен
Ограничения
del ns.attrSyncManager.dict()SyncManager.dict(mapping)SyncManager.dict(sequence):Объект SyncManager.dict() создаёт прокси-объект, ссылающийся на общий словарь dict, находящийся в другом процессе (внутри SyncManager). Этот словарь синхронизирован, то есть его можно безопасно использовать из нескольких процессов.
Пример использования SyncManager.dict()
from multiprocessing import Manager, Process def worker(d): d['count'] += 1 if __name__ == '__main__': with Manager() as manager: shared_dict = manager.dict({'count': 0}) # Создаём общий словарь print("До:", shared_dict) # До: {'count': 0} # Запускаем несколько процессов processes = [Process(target=worker, args=(shared_dict,)) for _ in range(5)] for p in processes: p.start() for p in processes: p.join() print("После:", shared_dict) # После: {'count': 5}
Характеристики SyncManager.dict()
| Параметр | Описание |
|---|---|
| Создание | manager.dict() или manager.dict({'a': 1}) |
| Тип | multiprocessing.managers.DictProxy |
| Синхронизация | Автоматически между процессами |
| Изменяемость | Да, можно добавлять/удалять/изменять |
| Поддержка методов | Полная, включая Python 3.14: fromkeys(), reversed(), |, |= |
| Передача между процессами | Да, через аргументы |
| Сериализация | Поддерживается через pickle |
Вложенные структуры
Можно хранить в словаре и другие прокси-объекты:
with Manager() as manager: d = manager.dict() d['list'] = manager.list([1, 2, 3]) d['name'] = 'Alice' print(d) # {'list': [1, 2, 3], 'name': 'Alice'} d['list'].append(4) print(d['list']) # [1, 2, 3, 4]
Важно. Изменения вложенных обычных объектов не синхронизируются автоматически. Например, если ты сохранишь обычный словарь в прокси-словарь, изменение его полей не будет отражено в других процессах, пока ты не обновишь его через
__setitem__.
SyncManager.list()SyncManager.list(sequence):Объект SyncManager.list() создает общий объект списка list и возвращает для него прокси. Это способ создать общий список, который работает между процессами в multiprocessing. Этот список синхронизируется через SyncManager - специальный серверный процесс, который управляет общими данными.
Как создать?
from multiprocessing import Manager with Manager() as manager: # пустой список shared_list = manager.list() # или с начальными данными shared_list = manager.list([1, 2, 3])
Пример использования
from multiprocessing import Process, Manager def add_item(lst, value): lst.append(value) if __name__ == '__main__': with Manager() as manager: data = manager.list() # Запускаем несколько процессов processes = [Process(target=add_item, args=(data, i)) for i in range(5)] for p in processes: p.start() for p in processes: p.join() print(data) # Вывод: [0, 1, 2, 3, 4]
Характеристики
| Параметр | Описание |
|---|---|
| Создание | manager.list() или manager.list([1, 2, 3]) |
| Тип | multiprocessing.managers.ListProxy |
| Синхронизация | Автоматически между процессами |
| Изменяемость | Да, можно добавлять/удалять/изменять |
| Поддержка методов | Полная, включая Python 3.14: clear(), copy() |
| Передача между процессами | Да, через аргументы |
| Сериализация | Поддерживается через pickle |
Важно знать
Это не обычный список, это прокси-объект.
Все изменения синхронизируются между процессами.
Можно вкладывать другие proxy-объекты (manager.dict(), manager.list() и т.д.).
Нельзя сравнивать напрямую с обычными списками:
shared_list == [1, 2, 3] # всегда False
Нужно делать копию:
list(shared_list) == [1, 2, 3] # True
SyncManager.set()SyncManager.set(sequence)SyncManager.set(mapping):Добавлено в Python 3.14: добавлена поддержка
set.
Объект SyncManager.list() создает общий объект множества set и возвращает для него прокси.
SyncManager.set() создает прокси-объект множества, который может безопасно использоваться между процессами. Это аналог обычного set(), но с автоматической синхронизацией.
Создание множества
with Manager() as manager: # Пустое множество shared_set = manager.set() # С начальными значениями (Python 3.14+) shared_set = manager.set({1, 2, 3}) # или manager.set([1, 2, 3])
Пример использования
def process_task(s, item): s.add(item ** 2) # Добавляем квадрат числа if len(s) > 5: s.clear() # Очищаем при превышении размера if __name__ == '__main__': with Manager() as manager: numbers = manager.set() # Запускаем процессы procs = [Process(target=process_task, args=(numbers, i)) for i in range(10)] for p in procs: p.start() for p in procs: p.join() print(f"Итоговый набор: {numbers}") # Возможный вывод: {0, 1, 4, 9, 16, 25, 36, 49, 64, 81}
Таблица характеристик
| Тип | multiprocessing.managers.SetProxy |
| Синхронизация | Автоматическая между процессами |
| Методы | add(), remove(), pop(), clear() и др. |
| Изменяемость | Да, из любого процесса |
| Вложенность | Только с другими прокси |
| Сериализация | Через pickle |
| Производительность | Медленнее обычного set |
| Сравнение | set(proxy_set) == {1,2,3} |
| Потокобезопасность | Все операции атомарны |
| Python 3.14+ | Поддержка |, &, - |
Важные особенности
Не является обычным множеством:
type(manager.set()) # <class 'multiprocessing.managers.SetProxy'>
Сравнение требует преобразования:
shared = manager.set({1, 2, 3}) shared == {1, 2, 3} # False (сравнивается прокси, а не данные) set(shared) == {1, 2, 3} # True (правильный способ)
Потокобезопасные операции:
# Все операции атомарны: shared.add(x) # ✓ Безопасно shared.remove(x) # ✓ Безопасно if x in shared: # ✓ Безопасно shared.discard(x)
Ограничения вложенности:
# Можно: shared.add(manager.list([1, 2])) # ✓ # Нельзя (не будет синхронизироваться): shared.add([1, 2]) # ×
Советы по использованию
Всегда используйте менеджер как контекст:
with Manager() as manager: # Автоматическое закрытие shared = manager.set() # работа с множеством
Для частых операций используйте Queue или Pipe:
from multiprocessing import Queue q = Queue() # Быстрее для частого обмена данными
Альтернативы для сложных случаев:
from multiprocessing.shared_memory import ShareableList # Или внешние решения: Redis, Memcached
Новые методы в Python 3.14+
# Поддержка операторов множеств s1 = manager.set({1, 2}) s2 = manager.set({2, 3}) s1 |= s2 # Объединение (аналог update()) s1 & s2 # Пересечение s1 - s2 # Разность
Если есть объект управляемого списка manager.list(), то любые изменения в этом списке будут распространяться на все остальные процессы. Но если использовать список не управляемый менеджером (например список внутри управляемого списка), то любые его изменения не распространяются на другие процессы, потому что менеджер процессов не может обнаружить это изменение.
Чтобы изменения вступили в силу, необходимо повторно присвоить значение изменяемому объекту прокси контейнера:
import multiprocessing import time def worker(ns, lst_mgr, dict_mgr): # Изеняем пространство имен, # с ним все нормально ns.x += 1 ns.y[0] += 1 ns_z = ns.z ns_z[0] += 1 ns.z = ns_z # Меняем список lst_mgr[0] += 1 # прямые изменения простого списка # внутри прокси-списка не распространяться # на другие процессы lst_mgr[1][0] += 1 # изменения распространяться на другие # потоки только после прямого присвоения # нового списка элементу прокси-списка temp_list = lst_mgr[2] temp_list[0] += 10 temp_list.append('100') lst_mgr[2] = temp_list # а если изменять прокси-список внутри # прокси-списка, то будет все нормально lst_mgr[3][0] += 5 lst_mgr[3].insert(0, 'вложенный прокси-список') # то же самое будет и со словарями dict_mgr[0] += 1 # изменения не увидим dict_mgr[1][0] += 1 # изменяем простой список (значение 2 ключа словаря) dict_val_temp = dict_mgr[2] dict_val_temp[0] += 10 dict_val_temp.append('100') dict_mgr[2] = dict_val_temp dict_mgr[3][0] += 5 dict_mgr[3].insert(0, 'прокси-список, прокси-словаря') if __name__ == '__main__': manager = multiprocessing.Manager() ns_mgr = manager.Namespace() ns_mgr.x = 1 ns_mgr.y = [1] ns_mgr.z = [1] lst_mgr = manager.list([1, [1], [1], manager.list([1])]) dict_mgr = manager.dict({0: 1, 1: [1], 2: [1], 3: manager.list([1])}) print('\nДо запуска процесса:') print('-' * 25) print('Пространство имен:', ns_mgr) print('Прокси-список:', lst_mgr) print('lst_mgr[3]:', list(lst_mgr[3])) print('Прокси-словарь:', dict_mgr) print('dict_mgr[3]:', list(dict_mgr[3])) p = multiprocessing.Process(target=worker, args=(ns_mgr, lst_mgr, dict_mgr)) p.start() p.join() print('\nПосле запуска процесса:') print('-' * 50) print('Пространство имен:', ns_mgr) print('Прокси-список:', lst_mgr) print('lst_mgr[3]:', list(lst_mgr[3])) print('Прокси-словарь:', dict_mgr) print('dict_mgr[3]:', list(dict_mgr[3])) # До запуска процесса: # ------------------------- # Пространство имен: Namespace(x=1, y=[1], z=[1]) # Прокси-список: [1, [1], [1], <ListProxy object, typeid 'list' at 0x7f34cedde040>] # lst_mgr[3]: [1] # Прокси-словарь: {0: 1, 1: [1], 2: [1], 3: <ListProxy object, typeid 'list' at 0x7f34cedde3a0>} # dict_mgr[3]: [1] # После запуска процесса: # -------------------------------------------------- # Пространство имен: Namespace(x=2, y=[1], z=[2]) # Прокси-список: [2, [1], [11, '100'], <ListProxy object, typeid 'list' at 0x7f34cedde040>] # lst_mgr[3]: ['вложенный прокси-список', 6] # Прокси-словарь: {0: 2, 1: [1], 2: [11, '100'], 3: <ListProxy object, typeid 'list' at 0x7f34cedde3a0>} # dict_mgr[3]: ['прокси-список, прокси-словаря', 6]
В примере показывается, как создавать и использовать общие ресурсы на основе менеджеров.
Будем осуществлять поочередное (для процессов) добавление элементов в общий ресурс - синхронизируемый список, блокировать доступ к списку из других потоков на время добавления будем объектом блокировки SyncManager.Lock, созданным при помощи менеджера процессов.
import multiprocessing, time, random def worker(lst, dkt, lock): pid_proc = multiprocessing.current_process().pid proc_name = multiprocessing.current_process().name # блокируем доступ к прокси-списку из других потоков lock.acquire() try: for _ in range(3): # имитируем нагрузку, для того, что бы была # конкуренция доступа к общему ресурсу (списку) time.sleep(random.uniform(0.01, 0.1)) # добавляем PID процесса 3 раза lst.append(pid_proc) # если в словаре нет записи о # процессе, то добавляем if not dkt.get(pid_proc, None): dkt[pid_proc] = proc_name finally: # завершаем процесс и разрешаем # доступ к массиву другим процессам lock.release() if __name__ == '__main__': manager = multiprocessing.Manager() # создаем прокси список lst = manager.list() # создаем прокси словарь dkt = manager.dict() # создаем прокси блокировщик lock = manager.Lock() procs = [] for _ in range(3): proc = multiprocessing.Process(target=worker, args=(lst, dkt, lock)) procs.append(proc) proc.start() # Ждем результатов [proc.join() for proc in procs] print('Вывод результатов:') print(lst) print(dkt) # очищаем используемые ресурсы [proc.close() for proc in procs] # Вывод результатов: # [86138, 86138, 86138, 86139, 86139, 86139, 86140, 86140, 86140] # {86138: 'Process-2', 86139: 'Process-3', 86140: 'Process-4'}