Класс multiprocessing.managers.BaseManager() позволяет создать и запустить сервер диспетчера на одной машине и заставить клиентов использовать его с других машин, при условии, что задействованные брандмауэры позволяют это.
import multiprocessing.managers multiprocessing.managers.BaseManager([address[, authkey]])
address - адрес, на котором прослушиваются соединения,authkey - ключ аутентификации, для проверки соединений.Класс BaseManager() модуля multiprocessing создает объект BaseManager.
После создания необходимо вызвать методы BaseManager.start() или BaseManager.get_server(). чтобы гарантировать, что объект-менеджер ссылается на запущенный процесс-менеджер, необходимо вызвать метод Server.serve_forever().
Аргумент address - это адрес, на котором процесс-менеджер прослушивает новые соединения. Если адрес имеет значение None, то выбирается произвольный.
Аргумент authkey - это ключ аутентификации, который будет использоваться для проверки действительности входящих соединений с серверным процессом. Если authkey=None, то используется Process.аuthkey, в противном случае используется переданная строка байтов authkey.
BaseManager, атрибуты и методы.BaseManager.start() запускает менеджер,BaseManager.get_server() возвращает объект Server,BaseManager.connect() подключает локальный менеджер к удаленному менеджеру,BaseManager.shutdown() останавливает менеджер,BaseManager.register() регистрирует вызываемый объект в менеджере,BaseManager.address адрес, менеджера в сети,BaseManager.start([initializer[, initargs]]):Метод BaseManager.start() запускает дочерний процесс, для запуска менеджера.
Если аргумент initializer не равен None, то при запуске дочерний процесс вызовет initializer(*initargs).
BaseManager.get_server():Метод BaseManager.get_server() возвращает объект Server, который представляет фактический сервер под управлением Manager. Объект Server поддерживает метод Server.serve_forever():
>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.address # ('0.0.0.0', 50000) >>> server.serve_forever()
Объект Server дополнительно имеет атрибут address.
BaseManager.connect():Метод BaseManager.connect() подключает объект локального менеджера к процессу удаленного менеджера:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
BaseManager.shutdown():Метод BaseManager.shutdown() останавливает процесс, используемый менеджером. Метод доступен только в том случае, если для запуска серверного процесса использовался метод BaseManager.start().
Метод BaseManager.shutdown() можно вызывать несколько раз.
BaseManager.register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]]):Метод BaseManager.register() метод класса, который может быть использован для регистрации типа или вызываемого объекта с помощью класса менеджера.
Аргумент typeid это "идентификатор типа", который используется для идентификации определенного типа общего объекта. Значение должно быть строкой.
Аргумент callable - вызываемый объект, используемый для создания объектов для этого идентификатора типа. Если экземпляр менеджера будет подключен к серверу с помощью метода BaseManager.connect() или если аргумент create_method=False, то тогда можно оставить значение None.
Аргумент proxytype является подклассом BaseProxy, который используется для создания прокси для общих объектов с идентификатор типа typeid. Если proxytype=None, то прокси-класс создается автоматически.
Аргумент exposed используется для указания последовательности имен методов, к которым прокси-серверы для типа typeid должны иметь доступ с помощью BaseProxy._callmethod().
None, то тогда вместо него используется proxytype._exposed_, если он существует.__call__() и имя которого не начинается с '_'.Аргумент method_to_typeid - это сопоставление (словарь), используемое для указания типа возвращаемого значения тех открытых методов, которые должны возвращать прокси. Он сопоставляет имена методов с типизированными строками.
Если method_to_typeid=None, тогда вместо него используется proxytype._method_to_typeid_, если он существует.Если имя метода не является ключом этого сопоставления или если сопоставление None, то объект, возвращаемый методом, будет скопирован по значению.
Аргумент create_method определяет, следует ли создавать метод с именем typeid, который можно использовать для сообщения серверному процессу о необходимости создания нового общего объекта и возврата для него прокси. По умолчанию это True.
BaseManager.address:Атрибут BaseManager.address возвращает адрес, используемый менеджером для прослушивания соединений.
Чтобы создать своего собственного менеджера, нужно создать подкласс BaseManager и использовать метод класса .register() для регистрации новых типов или вызываемых объектов в классе менеджера.
from multiprocessing.managers import BaseManager class MathsClass: def add(self, x, y): return x + y def mul(self, x, y): return x * y class MyManager(BaseManager): pass MyManager.register('Maths', MathsClass) if __name__ == '__main__': with MyManager() as manager: maths = manager.Maths() print(maths.add(4, 3)) print(maths.mul(7, 8)) # 7 # 56
from multiprocessing import freeze_support from multiprocessing.managers import BaseManager, BaseProxy import operator # Обычный класс class Foo: def fff(self): print('Вызван метод: Foo.fff()') def ggg(self): print('Вызван метод: Foo.ggg()') def _hhh(self): """Закрытый метод""" print('Вызван метод: Foo._hhh()') # Простая функция генератора def baz(): for i in range(10): yield i*i # Тип прокси для объектов генератора class GeneratorProxy(BaseProxy): _exposed_ = ['__next__'] def __iter__(self): return self def __next__(self): return self._callmethod('__next__') # Функция возвращает модуль `operator` def get_operator_module(): return operator class MyManager(BaseManager): pass # регистрация класса Foo как Foo1, тем самым делаем # общедоступные методы доступными через прокси MyManager.register('Foo1', Foo) # регистрация класса Foo как Foo2 и # методов _hhh()` и `ggg()` для работы через прокси MyManager.register('Foo2', Foo, exposed=('ggg', '_hhh')) # регистрация функции генератора baz. # Используем `GeneratorProxy` для создания прокси MyManager.register('baz', baz, proxytype=GeneratorProxy) # регистрация get_operator_module(), тем самым делаем # общедоступные функции доступными через прокси MyManager.register('operator', get_operator_module) # ФУНКЦИЯ ЗАПУСКА def test(): manager = MyManager() manager.start() print('-' * 20) print('Тест прокси класса Foo1 и его методов:') f1 = manager.Foo1() f1.fff() f1.ggg() print('Доступность закрытого метода Foo._hhh() =>', hasattr(f1, '_hhh')) assert sorted(f1._exposed_) == sorted(['fff', 'ggg']) print('-' * 20) print('Тест прокси класса Foo2 и его методов:') f2 = manager.Foo2() f2.ggg() f2._hhh() print('Доступность метода Foo.fff() =>', hasattr(f2, 'fff')) assert sorted(f2._exposed_) == sorted(['ggg', '_hhh']) print('-' * 20) print('Тест работы генератора baz через прокси:') it = manager.baz() for i in it: print(f'<{i}>', end=' ') print() print('-' * 20) print('Тест доступа к функциям модуля `operator` через прокси:') op = manager.operator() print('op.add(23, 45) =', op.add(23, 45)) print('op.pow(2, 94) =', op.pow(2, 94)) # print('op._exposed_ =', op._exposed_) if __name__ == '__main__': freeze_support() test() # -------------------- # Тест прокси класса Foo1 и его методов: # Вызван метод: Foo.fff() # Вызван метод: Foo.ggg() # Доступность закрытого метода Foo._hhh() => False # -------------------- # Тест прокси класса Foo2 и его методов: # Вызван метод: Foo.ggg() # Вызван метод: Foo._hhh() # Доступность метода Foo.fff() => False # -------------------- # Тест работы генератора baz через прокси: # <0> <1> <4> <9> <16> <25> <36> <49> <64> <81> # -------------------- # Тест доступа к функциям модуля `operator` через прокси: # op.add(23, 45) = 68 # op.pow(2, 94) = 19807040628566084398385987584
Можно запустить сервер диспетчера на одной машине и заставить клиентов использовать его с других машин, при условии, что задействованные брандмауэры позволяют это.
Выполнение следующих команд создает сервер для одной общей очереди, к которой могут получить доступ удаленные клиенты:
>>> from multiprocessing.managers import BaseManager >>> from queue import Queue >>> queue = Queue() >>> class QueueManager(BaseManager): pass >>> QueueManager.register('get_queue', callable=lambda:queue) >>> m = QueueManager(address=('', 50000), authkey=b'abracadabra') >>> s = m.get_server() >>> s.serve_forever()
Один клиент может получить доступ к серверу следующим образом:
>>> from multiprocessing.managers import BaseManager >>> class QueueManager(BaseManager): pass >>> QueueManager.register('get_queue') >>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra') >>> m.connect() >>> queue = m.get_queue() >>> queue.put('hello')
Другой клиент также может использовать его:
>>> from multiprocessing.managers import BaseManager >>> class QueueManager(BaseManager): pass >>> QueueManager.register('get_queue') >>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra') >>> m.connect() >>> queue = m.get_queue() >>> queue.get() # 'hello'
Локальные процессы также могут получить доступ к этой очереди, используя код сверху на клиенте для удаленного доступа к ней:
>>> from multiprocessing import Process, Queue >>> from multiprocessing.managers import BaseManager >>> class Worker(Process): ... def __init__(self, q): ... self.q = q ... super(Worker, self).__init__() ... def run(self): ... self.q.put('local hello') ... >>> queue = Queue() >>> w = Worker(queue) >>> w.start() >>> class QueueManager(BaseManager): pass ... >>> QueueManager.register('get_queue', callable=lambda: queue) >>> m = QueueManager(address=('', 50000), authkey=b'abracadabra') >>> s = m.get_server() >>> s.serve_forever()