Сообщить об ошибке.

Класс Manager() модуля multiprocessing в Python

Создание данных, которые могут использовать разные процессы

Менеджеры предоставляют способ создания данных, которые могут совместно использоваться разными процессами, включая обмен по сети между процессами, запущенными на разных машинах. Объект-менеджер управляет серверным процессом, который управляет общими объектами. Другие процессы могут получить доступ к общим объектам с помощью прокси.

Синтаксис:

import multiprocessing

proxy = multiprocessing.Manager()

Параметры:

  • нет.

Возвращаемое значение:

Описание:

Класс Manager() модуля multiprocessing возвращает запущенный объект SyncManager, который можно использовать для совместного использования объектов между процессами.

Возвращенный объект-менеджер SyncManager соответствует порожденному дочернему процессу и имеет методы, которые будут создавать общие объекты и возвращать соответствующие прокси объекты.

Процессы диспетчера будут завершены, как только они будут собраны сборщиком мусора или их родительский процесс завершится.

Объект-менеджер SyncManager.

Объект-менеджер SyncManager представляет собой подкласс BaseManager, который может использоваться для синхронизации процессов. Объекты этого типа возвращаются классом multiprocessing.Manager().

Методы объект-менеджера SyncManager создают и возвращают прокси-объекты для ряда часто используемых типов данных, которые должны быть синхронизированы между процессами. Эти типы, в частности, включают в себя обычные списки и словари Python.

Изменено в Python 3.6: общие объекты могут быть вложенными. Например, общий контейнерный объект, такой как общий список, может содержать другие общие объекты, которые будут управляться и синхронизироваться SyncManager.

Прокси-объекты, возвращаемые объект-менеджером SyncManager.


SyncManager.Barrier(parties[, action[, timeout]]):

Объект SyncManager.Barrier() создает общий объект синхронизации threading.Barrier и возвращает для него прокси.

SyncManager.BoundedSemaphore([value]):

Объект SyncManager.BoundedSemaphore() создает общий объект синхронизации threading.BoundedSemaphore и возвращает для него прокси.

SyncManager.Condition([lock]):

Объект SyncManager.Condition() создает общий объект синхронизации threading.Condition и возвращает для него прокси.

Если предоставляется аргумент lock, то это должен быть прокси-объект для объекта threading.Lock или threading.RLock.

SyncManager.Event():

Объект SyncManager.Event() создает общий объект синхронизации threading.Event и возвращает для него прокси.

SyncManager.Lock():

Объект SyncManager.Lock() создает общий объект синхронизации threading.Lock и возвращает для него прокси.

SyncManager.RLock():

Объект SyncManager.RLock() создает общий объект синхронизации threading.RLock и возвращает для него прокси.

SyncManager.Semaphore([value]):

Объект SyncManager.Semaphore() создает общий объект синхронизации threading.Semaphore и возвращает для него прокси.

SyncManager.Queue([maxsize]):

Объект SyncManager.Queue() создает общий объект очереди queue.Queue и возвращает для него прокси.

SyncManager.Array(typecode, sequence):

Объект SyncManager.Array() создает синхронизированный массив multiprocessing.Array и возвращает для него прокси.

SyncManager.Value(typecode, value):

Объект SyncManager.Value() создает синхронизированный объект с атрибутом записываемого значения multiprocessing.Value и возвращает для него прокси.

SyncManager.Namespace():

Объект SyncManager.Namespace() создает общий объект пространства имен Namespace и возвращает для него прокси.

Объект пространства имен Namespace - это тип, который можно зарегистрировать в SyncManager.

Объект Namespace не имеет общедоступных методов, но имеет атрибуты с возможностью записи в них. Его представление показывает значения его атрибутов.

Однако при использовании прокси для объекта пространства имен атрибут, начинающийся с '_', будет атрибутом прокси, а не атрибутом референта:

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
# это атрибут прокси-сервера
>>> Global._z = 12.3
>>> print(Global)
Namespace(x=10, y='hello')

SyncManager.dict()
SyncManager.dict(mapping)
SyncManager.dict(sequence):

Объект SyncManager.dict() создает общий объект словаря dict и возвращает для него прокси.

SyncManager.list()
SyncManager.list(sequence):

Объект SyncManager.list() создает общий объект списка list и возвращает для него прокси.

Примечание. Изменения значений или элементов прокси-объектов dict и list не будут распространяться через диспетчер на другие потоки, поскольку прокси-сервер не имеет возможности узнать, когда его значения или элементы были изменены. Чтобы изменения вступили в силу, необходимо повторно присвоить значение изменяемому объекту прокси контейнера:

# создаем прокси-объект списка и добавляем
# изменяемый объект (словарь)
lproxy = manager.list()
lproxy.append({})
# теперь изменяем словарь
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# на данный момент изменения в словаре `d` еще не
# синхронизированы, но при переназначении словаря 
# прокси-сервер получает уведомление об этом изменении
lproxy[0] = d

Тонкости изменения вложенных списков у прокси-списков.

Если есть объект управляемого списка manager.list(), то любые изменения в этом списке будут распространяться на все остальные процессы. Но если использовать список не управляемый менеджером (например список внутри управляемого списка), то любые его изменения не распространяются на другие процессы, потому что менеджер процессов не может обнаружить это изменение.

Чтобы изменения вступили в силу, необходимо повторно присвоить значение изменяемому объекту прокси контейнера:

import multiprocessing
import time

def worker(ns, lst_mgr, dict_mgr):
    # Изеняем пространство имен, 
    # с ним все нормально
    ns.x += 1
    ns.y[0] += 1
    ns_z = ns.z
    ns_z[0] += 1
    ns.z = ns_z

    # Меняем список
    lst_mgr[0] += 1
    # прямые изменения простого списка 
    # внутри прокси-списка не распространяться 
    # на другие процессы
    lst_mgr[1][0] += 1
    # изменения распространяться на другие
    # потоки только после прямого присвоения 
    # нового списка элементу прокси-списка
    temp_list = lst_mgr[2]
    temp_list[0] += 10
    temp_list.append('100')
    lst_mgr[2] = temp_list
    # а если изменять прокси-список внутри 
    # прокси-списка, то будет все нормально 
    lst_mgr[3][0] += 5
    lst_mgr[3].insert(0, 'вложенный прокси-список')

    # то же самое будет и со словарями
    dict_mgr[0] += 1
    # изменения не увидим
    dict_mgr[1][0] += 1
    # изменяем простой список (значение 2 ключа словаря)
    dict_val_temp = dict_mgr[2]
    dict_val_temp[0] += 10
    dict_val_temp.append('100')
    dict_mgr[2] = dict_val_temp
    dict_mgr[3][0] += 5
    dict_mgr[3].insert(0, 'прокси-список, прокси-словаря')

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    ns_mgr = manager.Namespace()
    ns_mgr.x = 1
    ns_mgr.y = [1]
    ns_mgr.z = [1]
    lst_mgr = manager.list([1, [1], [1], manager.list([1])])
    dict_mgr = manager.dict({0: 1, 1: [1], 2: [1], 3: manager.list([1])})

    print('\nДо запуска процесса:')
    print('-' * 25)
    print('Пространство имен:', ns_mgr)
    print('Прокси-список:', lst_mgr)
    print('lst_mgr[3]:', list(lst_mgr[3]))
    print('Прокси-словарь:', dict_mgr)
    print('dict_mgr[3]:', list(dict_mgr[3]))
    p = multiprocessing.Process(target=worker, args=(ns_mgr, lst_mgr, dict_mgr))
    p.start()
    p.join()
    print('\nПосле запуска процесса:')
    print('-' * 50)
    print('Пространство имен:', ns_mgr)
    print('Прокси-список:', lst_mgr)
    print('lst_mgr[3]:', list(lst_mgr[3]))
    print('Прокси-словарь:', dict_mgr)
    print('dict_mgr[3]:', list(dict_mgr[3]))


# До запуска процесса:
# -------------------------
# Пространство имен: Namespace(x=1, y=[1], z=[1])
# Прокси-список: [1, [1], [1], <ListProxy object, typeid 'list' at 0x7f34cedde040>]
# lst_mgr[3]: [1]
# Прокси-словарь: {0: 1, 1: [1], 2: [1], 3: <ListProxy object, typeid 'list' at 0x7f34cedde3a0>}
# dict_mgr[3]: [1]

# После запуска процесса:
# --------------------------------------------------
# Пространство имен: Namespace(x=2, y=[1], z=[2])
# Прокси-список: [2, [1], [11, '100'], <ListProxy object, typeid 'list' at 0x7f34cedde040>]
# lst_mgr[3]: ['вложенный прокси-список', 6]
# Прокси-словарь: {0: 2, 1: [1], 2: [11, '100'], 3: <ListProxy object, typeid 'list' at 0x7f34cedde3a0>}
# dict_mgr[3]: ['прокси-список, прокси-словаря', 6]

Примеры взаимодействия процессов через менеджер прокси-сервера.

В примере показывается, как создавать и использовать общие ресурсы на основе менеджеров.

Будем осуществлять поочередное (для процессов) добавление элементов в общий ресурс - синхронизируемый список, блокировать доступ к списку из других потоков на время добавления будем объектом блокировки SyncManager.Lock, созданным при помощи менеджера процессов.

import multiprocessing, time, random

def worker(lst, dkt, lock):
    pid_proc = multiprocessing.current_process().pid
    proc_name = multiprocessing.current_process().name
    # блокируем доступ к прокси-списку из других потоков 
    lock.acquire()
    try:
        for _ in range(3):
            # имитируем нагрузку, для того, что бы была 
            # конкуренция доступа к общему ресурсу (списку)
            time.sleep(random.uniform(0.01, 0.1))
            # добавляем PID процесса 3 раза
            lst.append(pid_proc) 
        
        # если в словаре нет записи о 
        # процессе, то добавляем
        if not dkt.get(pid_proc, None):
            dkt[pid_proc] = proc_name

    finally:
        # завершаем процесс и разрешаем 
        # доступ к массиву другим процессам
        lock.release()

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    # создаем прокси список
    lst = manager.list()
    # создаем прокси словарь
    dkt = manager.dict()
    # создаем прокси блокировщик
    lock = manager.Lock()
    
    procs = []
    for _ in range(3):
        proc = multiprocessing.Process(target=worker, args=(lst, dkt, lock))
        procs.append(proc)
        proc.start()

    # Ждем результатов
    [proc.join() for proc in procs]
    print('Вывод результатов:')
    print(lst)
    print(dkt)
    # очищаем используемые ресурсы
    [proc.close() for proc in procs]
    
    
# Вывод результатов:
# [86138, 86138, 86138, 86139, 86139, 86139, 86140, 86140, 86140]
# {86138: 'Process-2', 86139: 'Process-3', 86140: 'Process-4'}