Класс multiprocessing.managers.BaseManager()
позволяет создать и запустить сервер диспетчера на одной машине и заставить клиентов использовать его с других машин, при условии, что задействованные брандмауэры позволяют это.
import multiprocessing.managers multiprocessing.managers.BaseManager([address[, authkey]])
address
- адрес, на котором прослушиваются соединения,authkey
- ключ аутентификации, для проверки соединений.Класс BaseManager()
модуля multiprocessing
создает объект BaseManager
.
После создания необходимо вызвать методы BaseManager.start()
или BaseManager.get_server()
. чтобы гарантировать, что объект-менеджер ссылается на запущенный процесс-менеджер, необходимо вызвать метод Server.serve_forever()
.
Аргумент address
- это адрес, на котором процесс-менеджер прослушивает новые соединения. Если адрес имеет значение None
, то выбирается произвольный.
Аргумент authkey
- это ключ аутентификации, который будет использоваться для проверки действительности входящих соединений с серверным процессом. Если authkey=None
, то используется Process.аuthkey
, в противном случае используется переданная строка байтов authkey
.
BaseManager
, атрибуты и методы.BaseManager.start()
запускает менеджер,BaseManager.get_server()
возвращает объект Server
,BaseManager.connect()
подключает локальный менеджер к удаленному менеджеру,BaseManager.shutdown()
останавливает менеджер,BaseManager.register()
регистрирует вызываемый объект в менеджере,BaseManager.address
адрес, менеджера в сети,BaseManager.start([initializer[, initargs]])
:Метод BaseManager.start()
запускает дочерний процесс, для запуска менеджера.
Если аргумент initializer
не равен None
, то при запуске дочерний процесс вызовет initializer(*initargs)
.
BaseManager.get_server()
:Метод BaseManager.get_server()
возвращает объект Server
, который представляет фактический сервер под управлением Manager
. Объект Server
поддерживает метод Server.serve_forever()
:
>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.address # ('0.0.0.0', 50000) >>> server.serve_forever()
Объект Server
дополнительно имеет атрибут address
.
BaseManager.connect()
:Метод BaseManager.connect()
подключает объект локального менеджера к процессу удаленного менеджера:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
BaseManager.shutdown()
:Метод BaseManager.shutdown()
останавливает процесс, используемый менеджером. Метод доступен только в том случае, если для запуска серверного процесса использовался метод BaseManager.start()
.
Метод BaseManager.shutdown()
можно вызывать несколько раз.
BaseManager.register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
:Метод BaseManager.register()
метод класса, который может быть использован для регистрации типа или вызываемого объекта с помощью класса менеджера.
Аргумент typeid
это "идентификатор типа", который используется для идентификации определенного типа общего объекта. Значение должно быть строкой.
Аргумент callable
- вызываемый объект, используемый для создания объектов для этого идентификатора типа. Если экземпляр менеджера будет подключен к серверу с помощью метода BaseManager.connect()
или если аргумент create_method=False
, то тогда можно оставить значение None
.
Аргумент proxytype
является подклассом BaseProxy
, который используется для создания прокси для общих объектов с идентификатор типа typeid
. Если proxytype=None
, то прокси-класс создается автоматически.
Аргумент exposed
используется для указания последовательности имен методов, к которым прокси-серверы для типа typeid
должны иметь доступ с помощью BaseProxy._callmethod()
.
None
, то тогда вместо него используется proxytype._exposed_
, если он существует. __call__()
и имя которого не начинается с '_'
.Аргумент method_to_typeid
- это сопоставление (словарь), используемое для указания типа возвращаемого значения тех открытых методов, которые должны возвращать прокси. Он сопоставляет имена методов с типизированными строками.
Если method_to_typeid=None
, тогда вместо него используется proxytype._method_to_typeid_
, если он существует.Если имя метода не является ключом этого сопоставления или если сопоставление None
, то объект, возвращаемый методом, будет скопирован по значению.
Аргумент create_method
определяет, следует ли создавать метод с именем typeid
, который можно использовать для сообщения серверному процессу о необходимости создания нового общего объекта и возврата для него прокси. По умолчанию это True
.
BaseManager.address
:Атрибут BaseManager.address
возвращает адрес, используемый менеджером для прослушивания соединений.
Чтобы создать своего собственного менеджера, нужно создать подкласс BaseManager
и использовать метод класса .register()
для регистрации новых типов или вызываемых объектов в классе менеджера.
from multiprocessing.managers import BaseManager class MathsClass: def add(self, x, y): return x + y def mul(self, x, y): return x * y class MyManager(BaseManager): pass MyManager.register('Maths', MathsClass) if __name__ == '__main__': with MyManager() as manager: maths = manager.Maths() print(maths.add(4, 3)) print(maths.mul(7, 8)) # 7 # 56
from multiprocessing import freeze_support from multiprocessing.managers import BaseManager, BaseProxy import operator # Обычный класс class Foo: def fff(self): print('Вызван метод: Foo.fff()') def ggg(self): print('Вызван метод: Foo.ggg()') def _hhh(self): """Закрытый метод""" print('Вызван метод: Foo._hhh()') # Простая функция генератора def baz(): for i in range(10): yield i*i # Тип прокси для объектов генератора class GeneratorProxy(BaseProxy): _exposed_ = ['__next__'] def __iter__(self): return self def __next__(self): return self._callmethod('__next__') # Функция возвращает модуль `operator` def get_operator_module(): return operator class MyManager(BaseManager): pass # регистрация класса Foo как Foo1, тем самым делаем # общедоступные методы доступными через прокси MyManager.register('Foo1', Foo) # регистрация класса Foo как Foo2 и # методов _hhh()` и `ggg()` для работы через прокси MyManager.register('Foo2', Foo, exposed=('ggg', '_hhh')) # регистрация функции генератора baz. # Используем `GeneratorProxy` для создания прокси MyManager.register('baz', baz, proxytype=GeneratorProxy) # регистрация get_operator_module(), тем самым делаем # общедоступные функции доступными через прокси MyManager.register('operator', get_operator_module) # ФУНКЦИЯ ЗАПУСКА def test(): manager = MyManager() manager.start() print('-' * 20) print('Тест прокси класса Foo1 и его методов:') f1 = manager.Foo1() f1.fff() f1.ggg() print('Доступность закрытого метода Foo._hhh() =>', hasattr(f1, '_hhh')) assert sorted(f1._exposed_) == sorted(['fff', 'ggg']) print('-' * 20) print('Тест прокси класса Foo2 и его методов:') f2 = manager.Foo2() f2.ggg() f2._hhh() print('Доступность метода Foo.fff() =>', hasattr(f2, 'fff')) assert sorted(f2._exposed_) == sorted(['ggg', '_hhh']) print('-' * 20) print('Тест работы генератора baz через прокси:') it = manager.baz() for i in it: print(f'<{i}>', end=' ') print() print('-' * 20) print('Тест доступа к функциям модуля `operator` через прокси:') op = manager.operator() print('op.add(23, 45) =', op.add(23, 45)) print('op.pow(2, 94) =', op.pow(2, 94)) # print('op._exposed_ =', op._exposed_) if __name__ == '__main__': freeze_support() test() # -------------------- # Тест прокси класса Foo1 и его методов: # Вызван метод: Foo.fff() # Вызван метод: Foo.ggg() # Доступность закрытого метода Foo._hhh() => False # -------------------- # Тест прокси класса Foo2 и его методов: # Вызван метод: Foo.ggg() # Вызван метод: Foo._hhh() # Доступность метода Foo.fff() => False # -------------------- # Тест работы генератора baz через прокси: # <0> <1> <4> <9> <16> <25> <36> <49> <64> <81> # -------------------- # Тест доступа к функциям модуля `operator` через прокси: # op.add(23, 45) = 68 # op.pow(2, 94) = 19807040628566084398385987584
Можно запустить сервер диспетчера на одной машине и заставить клиентов использовать его с других машин, при условии, что задействованные брандмауэры позволяют это.
Выполнение следующих команд создает сервер для одной общей очереди, к которой могут получить доступ удаленные клиенты:
>>> from multiprocessing.managers import BaseManager >>> from queue import Queue >>> queue = Queue() >>> class QueueManager(BaseManager): pass >>> QueueManager.register('get_queue', callable=lambda:queue) >>> m = QueueManager(address=('', 50000), authkey=b'abracadabra') >>> s = m.get_server() >>> s.serve_forever()
Один клиент может получить доступ к серверу следующим образом:
>>> from multiprocessing.managers import BaseManager >>> class QueueManager(BaseManager): pass >>> QueueManager.register('get_queue') >>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra') >>> m.connect() >>> queue = m.get_queue() >>> queue.put('hello')
Другой клиент также может использовать его:
>>> from multiprocessing.managers import BaseManager >>> class QueueManager(BaseManager): pass >>> QueueManager.register('get_queue') >>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra') >>> m.connect() >>> queue = m.get_queue() >>> queue.get() # 'hello'
Локальные процессы также могут получить доступ к этой очереди, используя код сверху на клиенте для удаленного доступа к ней:
>>> from multiprocessing import Process, Queue >>> from multiprocessing.managers import BaseManager >>> class Worker(Process): ... def __init__(self, q): ... self.q = q ... super(Worker, self).__init__() ... def run(self): ... self.q.put('local hello') ... >>> queue = Queue() >>> w = Worker(queue) >>> w.start() >>> class QueueManager(BaseManager): pass ... >>> QueueManager.register('get_queue', callable=lambda: queue) >>> m = QueueManager(address=('', 50000), authkey=b'abracadabra') >>> s = m.get_server() >>> s.serve_forever()