Сообщить об ошибке.

Класс BaseManager() модуля multiprocessing.managers в Python

Позволяет создать и запустить удаленный менеджер процессов

Класс multiprocessing.managers.BaseManager() позволяет создать и запустить сервер диспетчера на одной машине и заставить клиентов использовать его с других машин, при условии, что задействованные брандмауэры позволяют это.

Синтаксис:

import multiprocessing.managers

multiprocessing.managers.BaseManager([address[, authkey]])

Параметры:

  • address - адрес, на котором прослушиваются соединения,
  • authkey - ключ аутентификации, для проверки соединений.

Возвращаемое значение:

Описание:

Класс BaseManager() модуля multiprocessing создает объект BaseManager.

После создания необходимо вызвать методы BaseManager.start() или BaseManager.get_server(). чтобы гарантировать, что объект-менеджер ссылается на запущенный процесс-менеджер, необходимо вызвать метод Server.serve_forever().

Аргумент address - это адрес, на котором процесс-менеджер прослушивает новые соединения. Если адрес имеет значение None, то выбирается произвольный.

Аргумент authkey - это ключ аутентификации, который будет использоваться для проверки действительности входящих соединений с серверным процессом. Если authkey=None, то используется Process.аuthkey, в противном случае используется переданная строка байтов authkey.

Объект BaseManager, атрибуты и методы.


BaseManager.start([initializer[, initargs]]):

Метод BaseManager.start() запускает дочерний процесс, для запуска менеджера.

Если аргумент initializer не равен None, то при запуске дочерний процесс вызовет initializer(*initargs).

BaseManager.get_server():

Метод BaseManager.get_server() возвращает объект Server, который представляет фактический сервер под управлением Manager. Объект Server поддерживает метод Server.serve_forever():

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.address
# ('0.0.0.0', 50000)
>>> server.serve_forever()

Объект Server дополнительно имеет атрибут address.

BaseManager.connect():

Метод BaseManager.connect() подключает объект локального менеджера к процессу удаленного менеджера:

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()

BaseManager.shutdown():

Метод BaseManager.shutdown() останавливает процесс, используемый менеджером. Метод доступен только в том случае, если для запуска серверного процесса использовался метод BaseManager.start().

Метод BaseManager.shutdown() можно вызывать несколько раз.

BaseManager.register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]]):

Метод BaseManager.register() метод класса, который может быть использован для регистрации типа или вызываемого объекта с помощью класса менеджера.

Аргумент typeid это "идентификатор типа", который используется для идентификации определенного типа общего объекта. Значение должно быть строкой.

Аргумент callable - вызываемый объект, используемый для создания объектов для этого идентификатора типа. Если экземпляр менеджера будет подключен к серверу с помощью метода BaseManager.connect() или если аргумент create_method=False, то тогда можно оставить значение None.

Аргумент proxytype является подклассом BaseProxy, который используется для создания прокси для общих объектов с идентификатор типа typeid. Если proxytype=None, то прокси-класс создается автоматически.

Аргумент exposed используется для указания последовательности имен методов, к которым прокси-серверы для типа typeid должны иметь доступ с помощью BaseProxy._callmethod().

  • Если отображается значение None, то тогда вместо него используется proxytype._exposed_, если он существует.
  • В случае, если не указан открытый список, все "общедоступные методы" общего объекта будут доступны. Здесь "общедоступный метод" означает любой атрибут, который имеет метод __call__() и имя которого не начинается с '_'.

Аргумент method_to_typeid - это сопоставление (словарь), используемое для указания типа возвращаемого значения тех открытых методов, которые должны возвращать прокси. Он сопоставляет имена методов с типизированными строками.

Если method_to_typeid=None, тогда вместо него используется proxytype._method_to_typeid_, если он существует.Если имя метода не является ключом этого сопоставления или если сопоставление None, то объект, возвращаемый методом, будет скопирован по значению.

Аргумент create_method определяет, следует ли создавать метод с именем typeid, который можно использовать для сообщения серверному процессу о необходимости создания нового общего объекта и возврата для него прокси. По умолчанию это True.

BaseManager.address:

Атрибут BaseManager.address возвращает адрес, используемый менеджером для прослушивания соединений.


Пример создания пользовательского менеджера.

Чтобы создать своего собственного менеджера, нужно создать подкласс BaseManager и использовать метод класса .register() для регистрации новых типов или вызываемых объектов в классе менеджера.

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         
        print(maths.mul(7, 8))    

# 7
# 56

Примеры регистрация общих функций и классов в менеджере.

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

# Обычный класс
class Foo:
    def fff(self):
        print('Вызван метод: Foo.fff()')
    def ggg(self):
        print('Вызван метод: Foo.ggg()')
    def _hhh(self):
        """Закрытый метод"""
        print('Вызван метод: Foo._hhh()')

# Простая функция генератора
def baz():
    for i in range(10):
        yield i*i

# Тип прокси для объектов генератора
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Функция возвращает модуль `operator`
def get_operator_module():
    return operator


class MyManager(BaseManager):
    pass

# регистрация класса Foo как Foo1, тем самым делаем 
# общедоступные методы доступными через прокси
MyManager.register('Foo1', Foo)

# регистрация класса Foo как Foo2 и 
# методов _hhh()` и `ggg()` для работы через прокси
MyManager.register('Foo2', Foo, exposed=('ggg', '_hhh'))

# регистрация функции генератора baz. 
# Используем `GeneratorProxy` для создания прокси
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# регистрация get_operator_module(), тем самым делаем 
# общедоступные функции доступными через прокси
MyManager.register('operator', get_operator_module)


# ФУНКЦИЯ ЗАПУСКА
def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)
    print('Тест прокси класса Foo1 и его методов:')
    f1 = manager.Foo1()
    f1.fff()
    f1.ggg()
    print('Доступность закрытого метода Foo._hhh() =>', hasattr(f1, '_hhh'))
    assert sorted(f1._exposed_) == sorted(['fff', 'ggg'])

    print('-' * 20)
    print('Тест прокси класса Foo2 и его методов:')
    f2 = manager.Foo2()
    f2.ggg()
    f2._hhh()
    print('Доступность метода Foo.fff() =>', hasattr(f2, 'fff'))
    assert sorted(f2._exposed_) == sorted(['ggg', '_hhh'])

    print('-' * 20)
    print('Тест работы генератора baz через прокси:')
    it = manager.baz()
    for i in it:
        print(f'<{i}>', end=' ')
    print()

    print('-' * 20)
    print('Тест доступа к функциям модуля `operator` через прокси:')
    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    # print('op._exposed_ =', op._exposed_)


if __name__ == '__main__':
    freeze_support()
    test()
    
# --------------------
# Тест прокси класса Foo1 и его методов:
# Вызван метод: Foo.fff()
# Вызван метод: Foo.ggg()
# Доступность закрытого метода Foo._hhh() => False
# --------------------
# Тест прокси класса Foo2 и его методов:
# Вызван метод: Foo.ggg()
# Вызван метод: Foo._hhh()
# Доступность метода Foo.fff() => False
# --------------------
# Тест работы генератора baz через прокси:
# <0> <1> <4> <9> <16> <25> <36> <49> <64> <81> 
# --------------------
# Тест доступа к функциям модуля `operator` через прокси:
# op.add(23, 45) = 68
# op.pow(2, 94) = 19807040628566084398385987584

Примеры использования удаленного менеджера.

Можно запустить сервер диспетчера на одной машине и заставить клиентов использовать его с других машин, при условии, что задействованные брандмауэры позволяют это.

Выполнение следующих команд создает сервер для одной общей очереди, к которой могут получить доступ удаленные клиенты:

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Один клиент может получить доступ к серверу следующим образом:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

Другой клиент также может использовать его:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
# 'hello'

Локальные процессы также могут получить доступ к этой очереди, используя код сверху на клиенте для удаленного доступа к ней:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super(Worker, self).__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()