Сообщить об ошибке.

Класс Manager() модуля multiprocessing в Python

Создание данных, которые могут использовать разные процессы

Менеджеры предоставляют способ создания данных, которые могут совместно использоваться разными процессами, включая обмен по сети между процессами, запущенными на разных машинах. Объект-менеджер управляет серверным процессом, который управляет общими объектами. Другие процессы могут получить доступ к общим объектам с помощью прокси.

Синтаксис:

import multiprocessing

proxy = multiprocessing.Manager()

Параметры:

  • нет.

Возвращаемое значение:

Описание:

Класс Manager() модуля multiprocessing возвращает запущенный объект SyncManager, который можно использовать для совместного использования объектов между процессами.

Возвращенный объект-менеджер SyncManager соответствует порожденному дочернему процессу и имеет методы, которые будут создавать общие объекты и возвращать соответствующие прокси объекты.

Процессы диспетчера будут завершены, как только они будут собраны сборщиком мусора или их родительский процесс завершится.

Объект-менеджер SyncManager.

Объект-менеджер SyncManager представляет собой подкласс BaseManager, который может использоваться для синхронизации процессов. Объекты этого типа возвращаются классом multiprocessing.Manager().

Методы объект-менеджера SyncManager создают и возвращают прокси-объекты для ряда часто используемых типов данных, которые должны быть синхронизированы между процессами. Эти типы, в частности, включают в себя обычные списки и словари Python.

Изменено в Python 3.6: общие объекты могут быть вложенными. Например, общий контейнерный объект, такой как общий список, может содержать другие общие объекты, которые будут управляться и синхронизироваться SyncManager.

Прокси-объекты, возвращаемые объект-менеджером SyncManager.


SyncManager.Barrier(parties[, action[, timeout]]):

Объект SyncManager.Barrier() создает общий объект синхронизации threading.Barrier и возвращает для него прокси.

SyncManager.BoundedSemaphore([value]):

Объект SyncManager.BoundedSemaphore() создает общий объект синхронизации threading.BoundedSemaphore и возвращает для него прокси.

SyncManager.Condition([lock]):

Объект SyncManager.Condition() создает общий объект синхронизации threading.Condition и возвращает для него прокси.

Если предоставляется аргумент lock, то это должен быть прокси-объект для объекта threading.Lock или threading.RLock.

SyncManager.Event():

Объект SyncManager.Event() создает общий объект синхронизации threading.Event и возвращает для него прокси.

SyncManager.Lock():

Объект SyncManager.Lock() создает общий объект синхронизации threading.Lock и возвращает для него прокси.

SyncManager.RLock():

Объект SyncManager.RLock() создает общий объект синхронизации threading.RLock и возвращает для него прокси.

SyncManager.Semaphore([value]):

Объект SyncManager.Semaphore() создает общий объект синхронизации threading.Semaphore и возвращает для него прокси.

SyncManager.Queue([maxsize]):

Объект SyncManager.Queue() создает общий объект очереди queue.Queue и возвращает для него прокси.

SyncManager.Array(typecode, sequence):

Объект SyncManager.Array() создает синхронизированный массив multiprocessing.Array и возвращает для него прокси.

SyncManager.Value(typecode, value):

Объект SyncManager.Value() создает синхронизированный объект с атрибутом записываемого значения multiprocessing.Value и возвращает для него прокси.

SyncManager.Namespace():

SyncManager.Namespace() создает прокси-объект пространства имен, который позволяет безопасно обмениваться данными между процессами через атрибуты объекта.

Создание пространства имен

with Manager() as manager:
    # Пустое пространство имен
    shared_ns = manager.Namespace()
    
    # С начальными атрибутами
    shared_ns = manager.Namespace(x=1, y=2, data=[1,2,3])

Пример использования

def worker(ns, value):
    ns.result = value * 2  # Добавляем новый атрибут
    ns.x += 1  # Модифицируем существующий

if __name__ == '__main__':
    with Manager() as manager:
        ns = manager.Namespace(x=0, y=0)
        
        procs = [Process(target=worker, args=(ns, i)) 
                for i in range(5)]
        
        for p in procs:
            p.start()
        for p in procs:
            p.join()
            
        print(f"Результат: {ns.result}, x={ns.x}")
        # Возможный вывод: Результат: 8, x=5

Характеристики

Типmultiprocessing.managers.NamespaceProxy
СинхронизацияАвтоматическая между процессами
Доступ к атрибутамЧерез точку: ns.attribute
ИзменяемостьАтрибуты можно изменять/добавлять
ВложенностьМожет содержать другие прокси-объекты
СериализацияПоддержка pickle
ПроизводительностьМедленнее обычных объектов

Важные особенности

  1. Динамические атрибуты:

    ns = manager.Namespace()
    ns.new_attr = "value"  # Можно добавлять новые атрибуты
    
  2. Ограничения типов:

    ns.valid = manager.list()  # OK
    ns.invalid = lambda x: x+1  # Несериализуемые объекты
    
  3. Сравнение объектов:

    ns1 = manager.Namespace(x=1)
    ns2 = manager.Namespace(x=1)
    ns1 == ns2  # False (сравниваются прокси, не данные)
    

Пример конфигурации процессов

config = manager.Namespace(
    debug=True,
    timeout=30,
    services=manager.list(['auth', 'db'])
)

def worker(conf):
    if conf.debug:
        print(f"Timeout: {conf.timeout}")

Пример сбора статистики

stats = manager.Namespace(
    requests=0,
    errors=0,
    processing_time=0.0
)

def update_stats(s, time_taken):
    s.requests += 1
    s.processing_time += time_taken

Советы по использованию

  1. Для простых данных предпочтительнее Manager.dict()
  2. Для сложных структур Namespace обеспечивает лучшую читаемость
  3. Избегайте частых обновлений атрибутов в циклах
  4. Закрывайте менеджер после использования:
with Manager() as manager:  # Автоматическое закрытие
    ns = manager.Namespace()
    # работа с пространством имен

Ограничения

  • Нет поддержки операций срезов (как у list/dict)
  • Нельзя удалять атрибуты через del ns.attr
  • Медленнее, чем специализированные структуры (Queue, Pipe)

SyncManager.dict()
SyncManager.dict(mapping)
SyncManager.dict(sequence):

Объект SyncManager.dict() создаёт прокси-объект, ссылающийся на общий словарь dict, находящийся в другом процессе (внутри SyncManager). Этот словарь синхронизирован, то есть его можно безопасно использовать из нескольких процессов.

Пример использования SyncManager.dict()

from multiprocessing import Manager, Process

def worker(d):
    d['count'] += 1

if __name__ == '__main__':
    with Manager() as manager:
        shared_dict = manager.dict({'count': 0})  # Создаём общий словарь

        print("До:", shared_dict)  # До: {'count': 0}

        # Запускаем несколько процессов
        processes = [Process(target=worker, args=(shared_dict,)) for _ in range(5)]
        for p in processes:
            p.start()
        for p in processes:
            p.join()

        print("После:", shared_dict)  # После: {'count': 5}

Характеристики SyncManager.dict()

ПараметрОписание
Созданиеmanager.dict() или manager.dict({'a': 1})
Типmultiprocessing.managers.DictProxy
СинхронизацияАвтоматически между процессами
ИзменяемостьДа, можно добавлять/удалять/изменять
Поддержка методовПолная, включая Python 3.14: fromkeys(), reversed(), |, |=
Передача между процессамиДа, через аргументы
СериализацияПоддерживается через pickle

Вложенные структуры

Можно хранить в словаре и другие прокси-объекты:

with Manager() as manager:
    d = manager.dict()
    d['list'] = manager.list([1, 2, 3])
    d['name'] = 'Alice'

    print(d)  # {'list': [1, 2, 3], 'name': 'Alice'}

    d['list'].append(4)
    print(d['list'])  # [1, 2, 3, 4]

Важно. Изменения вложенных обычных объектов не синхронизируются автоматически. Например, если ты сохранишь обычный словарь в прокси-словарь, изменение его полей не будет отражено в других процессах, пока ты не обновишь его через __setitem__.

SyncManager.list()
SyncManager.list(sequence):

Объект SyncManager.list() создает общий объект списка list и возвращает для него прокси. Это способ создать общий список, который работает между процессами в multiprocessing. Этот список синхронизируется через SyncManager - специальный серверный процесс, который управляет общими данными.

Как создать?

from multiprocessing import Manager

with Manager() as manager:
    # пустой список
    shared_list = manager.list() 
    # или с начальными данными
    shared_list = manager.list([1, 2, 3])

Пример использования

from multiprocessing import Process, Manager

def add_item(lst, value):
    lst.append(value)

if __name__ == '__main__':
    with Manager() as manager:
        data = manager.list()

        # Запускаем несколько процессов
        processes = [Process(target=add_item, args=(data, i)) for i in range(5)]
        for p in processes:
            p.start()
        for p in processes:
            p.join()

        print(data)  # Вывод: [0, 1, 2, 3, 4]

Характеристики

ПараметрОписание
Созданиеmanager.list() или manager.list([1, 2, 3])
Типmultiprocessing.managers.ListProxy
СинхронизацияАвтоматически между процессами
ИзменяемостьДа, можно добавлять/удалять/изменять
Поддержка методовПолная, включая Python 3.14: clear(), copy()
Передача между процессамиДа, через аргументы
СериализацияПоддерживается через pickle

Важно знать

  • Это не обычный список, это прокси-объект.

  • Все изменения синхронизируются между процессами.

  • Можно вкладывать другие proxy-объекты (manager.dict(), manager.list() и т.д.).

  • Нельзя сравнивать напрямую с обычными списками:

    shared_list == [1, 2, 3]  # всегда False
    

    Нужно делать копию:

    list(shared_list) == [1, 2, 3]  # True
    

SyncManager.set()
SyncManager.set(sequence)
SyncManager.set(mapping):

Добавлено в Python 3.14: добавлена поддержка set.

Объект SyncManager.list() создает общий объект множества set и возвращает для него прокси.

SyncManager.set() создает прокси-объект множества, который может безопасно использоваться между процессами. Это аналог обычного set(), но с автоматической синхронизацией.

Создание множества

with Manager() as manager:
    # Пустое множество
    shared_set = manager.set()
    
    # С начальными значениями (Python 3.14+)
    shared_set = manager.set({1, 2, 3})  # или manager.set([1, 2, 3])

Пример использования

def process_task(s, item):
    s.add(item ** 2)  # Добавляем квадрат числа
    if len(s) > 5:
        s.clear()  # Очищаем при превышении размера

if __name__ == '__main__':
    with Manager() as manager:
        numbers = manager.set()
        
        # Запускаем процессы
        procs = [Process(target=process_task, args=(numbers, i)) 
                for i in range(10)]
        
        for p in procs:
            p.start()
        for p in procs:
            p.join()
            
        print(f"Итоговый набор: {numbers}")
        # Возможный вывод: {0, 1, 4, 9, 16, 25, 36, 49, 64, 81}

Таблица характеристик

Типmultiprocessing.managers.SetProxy
СинхронизацияАвтоматическая между процессами
Методыadd(), remove(), pop(), clear() и др.
ИзменяемостьДа, из любого процесса
ВложенностьТолько с другими прокси
СериализацияЧерез pickle
ПроизводительностьМедленнее обычного set
Сравнениеset(proxy_set) == {1,2,3}
ПотокобезопасностьВсе операции атомарны
Python 3.14+Поддержка |, &, -

Важные особенности

  1. Не является обычным множеством:

    type(manager.set())  # <class 'multiprocessing.managers.SetProxy'>
    
  2. Сравнение требует преобразования:

    shared = manager.set({1, 2, 3})
    shared == {1, 2, 3}          # False (сравнивается прокси, а не данные)
    set(shared) == {1, 2, 3}     # True (правильный способ)
    
  3. Потокобезопасные операции:

    # Все операции атомарны:
    shared.add(x)       # ✓ Безопасно
    shared.remove(x)    # ✓ Безопасно
    if x in shared:     # ✓ Безопасно
        shared.discard(x)
    
  4. Ограничения вложенности:

    # Можно:
    shared.add(manager.list([1, 2]))  # ✓
    
    # Нельзя (не будет синхронизироваться):
    shared.add([1, 2])                # ×
    

Советы по использованию

  1. Всегда используйте менеджер как контекст:

    with Manager() as manager:  # Автоматическое закрытие
        shared = manager.set()
        # работа с множеством
    
  2. Для частых операций используйте Queue или Pipe:

    from multiprocessing import Queue
    q = Queue()  # Быстрее для частого обмена данными
    
  3. Альтернативы для сложных случаев:

    from multiprocessing.shared_memory import ShareableList
    # Или внешние решения: Redis, Memcached
    

Новые методы в Python 3.14+

# Поддержка операторов множеств
s1 = manager.set({1, 2})
s2 = manager.set({2, 3})

s1 |= s2       # Объединение (аналог update())
s1 & s2        # Пересечение
s1 - s2        # Разность

Тонкости изменения вложенных списков у прокси-списков.

Если есть объект управляемого списка manager.list(), то любые изменения в этом списке будут распространяться на все остальные процессы. Но если использовать список не управляемый менеджером (например список внутри управляемого списка), то любые его изменения не распространяются на другие процессы, потому что менеджер процессов не может обнаружить это изменение.

Чтобы изменения вступили в силу, необходимо повторно присвоить значение изменяемому объекту прокси контейнера:

import multiprocessing
import time

def worker(ns, lst_mgr, dict_mgr):
    # Изеняем пространство имен, 
    # с ним все нормально
    ns.x += 1
    ns.y[0] += 1
    ns_z = ns.z
    ns_z[0] += 1
    ns.z = ns_z

    # Меняем список
    lst_mgr[0] += 1
    # прямые изменения простого списка 
    # внутри прокси-списка не распространяться 
    # на другие процессы
    lst_mgr[1][0] += 1
    # изменения распространяться на другие
    # потоки только после прямого присвоения 
    # нового списка элементу прокси-списка
    temp_list = lst_mgr[2]
    temp_list[0] += 10
    temp_list.append('100')
    lst_mgr[2] = temp_list
    # а если изменять прокси-список внутри 
    # прокси-списка, то будет все нормально 
    lst_mgr[3][0] += 5
    lst_mgr[3].insert(0, 'вложенный прокси-список')

    # то же самое будет и со словарями
    dict_mgr[0] += 1
    # изменения не увидим
    dict_mgr[1][0] += 1
    # изменяем простой список (значение 2 ключа словаря)
    dict_val_temp = dict_mgr[2]
    dict_val_temp[0] += 10
    dict_val_temp.append('100')
    dict_mgr[2] = dict_val_temp
    dict_mgr[3][0] += 5
    dict_mgr[3].insert(0, 'прокси-список, прокси-словаря')

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    ns_mgr = manager.Namespace()
    ns_mgr.x = 1
    ns_mgr.y = [1]
    ns_mgr.z = [1]
    lst_mgr = manager.list([1, [1], [1], manager.list([1])])
    dict_mgr = manager.dict({0: 1, 1: [1], 2: [1], 3: manager.list([1])})

    print('\nДо запуска процесса:')
    print('-' * 25)
    print('Пространство имен:', ns_mgr)
    print('Прокси-список:', lst_mgr)
    print('lst_mgr[3]:', list(lst_mgr[3]))
    print('Прокси-словарь:', dict_mgr)
    print('dict_mgr[3]:', list(dict_mgr[3]))
    p = multiprocessing.Process(target=worker, args=(ns_mgr, lst_mgr, dict_mgr))
    p.start()
    p.join()
    print('\nПосле запуска процесса:')
    print('-' * 50)
    print('Пространство имен:', ns_mgr)
    print('Прокси-список:', lst_mgr)
    print('lst_mgr[3]:', list(lst_mgr[3]))
    print('Прокси-словарь:', dict_mgr)
    print('dict_mgr[3]:', list(dict_mgr[3]))


# До запуска процесса:
# -------------------------
# Пространство имен: Namespace(x=1, y=[1], z=[1])
# Прокси-список: [1, [1], [1], <ListProxy object, typeid 'list' at 0x7f34cedde040>]
# lst_mgr[3]: [1]
# Прокси-словарь: {0: 1, 1: [1], 2: [1], 3: <ListProxy object, typeid 'list' at 0x7f34cedde3a0>}
# dict_mgr[3]: [1]

# После запуска процесса:
# --------------------------------------------------
# Пространство имен: Namespace(x=2, y=[1], z=[2])
# Прокси-список: [2, [1], [11, '100'], <ListProxy object, typeid 'list' at 0x7f34cedde040>]
# lst_mgr[3]: ['вложенный прокси-список', 6]
# Прокси-словарь: {0: 2, 1: [1], 2: [11, '100'], 3: <ListProxy object, typeid 'list' at 0x7f34cedde3a0>}
# dict_mgr[3]: ['прокси-список, прокси-словаря', 6]

Примеры взаимодействия процессов через менеджер прокси-сервера.

В примере показывается, как создавать и использовать общие ресурсы на основе менеджеров.

Будем осуществлять поочередное (для процессов) добавление элементов в общий ресурс - синхронизируемый список, блокировать доступ к списку из других потоков на время добавления будем объектом блокировки SyncManager.Lock, созданным при помощи менеджера процессов.

import multiprocessing, time, random

def worker(lst, dkt, lock):
    pid_proc = multiprocessing.current_process().pid
    proc_name = multiprocessing.current_process().name
    # блокируем доступ к прокси-списку из других потоков 
    lock.acquire()
    try:
        for _ in range(3):
            # имитируем нагрузку, для того, что бы была 
            # конкуренция доступа к общему ресурсу (списку)
            time.sleep(random.uniform(0.01, 0.1))
            # добавляем PID процесса 3 раза
            lst.append(pid_proc) 
        
        # если в словаре нет записи о 
        # процессе, то добавляем
        if not dkt.get(pid_proc, None):
            dkt[pid_proc] = proc_name

    finally:
        # завершаем процесс и разрешаем 
        # доступ к массиву другим процессам
        lock.release()

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    # создаем прокси список
    lst = manager.list()
    # создаем прокси словарь
    dkt = manager.dict()
    # создаем прокси блокировщик
    lock = manager.Lock()
    
    procs = []
    for _ in range(3):
        proc = multiprocessing.Process(target=worker, args=(lst, dkt, lock))
        procs.append(proc)
        proc.start()

    # Ждем результатов
    [proc.join() for proc in procs]
    print('Вывод результатов:')
    print(lst)
    print(dkt)
    # очищаем используемые ресурсы
    [proc.close() for proc in procs]
    
    
# Вывод результатов:
# [86138, 86138, 86138, 86139, 86139, 86139, 86140, 86140, 86140]
# {86138: 'Process-2', 86139: 'Process-3', 86140: 'Process-4'}