Сообщить об ошибке.

Модуль multiprocessing.shared_memory в Python

Защищенная память для доступа к данным между процессами в Python

Модуль multiprocessing.shared_memory предоставляет класс SharedMemory для выделения и управления общей памятью, к которой обращается один или несколько процессов на многоядерной или симметричной многопроцессорной машине (SMP). Для облегчения управления жизненным циклом разделяемой SHARED памяти, особенно между отдельными процессами, в модуле multiprocessing.managers также предусмотрен подкласс BaseManager, SharedMemoryManager.

В этом модуле разделяемая память относится к блокам в стиле "System V" (хотя это не обязательно реализовано как таковое) и не относится к "распределенной разделяемой памяти". Этот стиль разделяемой памяти позволяет различным процессам потенциально читать и записывать в общую (или совместно используемую) область энергозависимой памяти. Процессы обычно ограничены доступом только к своему собственному пространству памяти процесса, но общая память позволяет обмениваться данными между процессами, избегая необходимости отправлять сообщения между процессами, содержащими эти данные. Совместное использование данных непосредственно через память может обеспечить значительные преимущества в производительности по сравнению с обменом данными через диск или сокет или другими видами связи, требующими сериализации/десериализации и копирования данных.

Содержание:

multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0):

Класс SharedMemory() создает новый блок общей памяти или присоединяется к существующему блоку общей памяти. Каждому блоку общей памяти присваивается уникальное имя. Таким образом, один процесс может создать блок общей памяти с определенным именем, а другой процесс может присоединиться к тому же блоку общей памяти, используя то же имя.

В качестве ресурса для обмена данными между процессами, блоки общей памяти могут пережить исходный процесс, который их создал. Когда одному процессу больше не нужен доступ к разделяемому блоку памяти, но он все еще может быть нужен другим процессам, то следует вызвать метод SharedMemory.close(). Когда блок разделяемой памяти вообще больше не нужен, и чтобы обеспечить надлежащую очистку, то следует вызвать метод SharedMemory.unlink().

Аргумент name - это уникальное имя запрошенной разделяемой памяти, указанное в виде строки. При создании нового блока общей памяти, если для имени указано None (по умолчанию), то будет сгенерировано новое имя.

Аргумент create управляет созданием нового блока общей памяти (True) или присоединением существующего блока общей памяти (False).

Аргумент size указывает запрошенное количество байтов при создании нового блока разделяемой памяти. Так как некоторые платформы выбирают выделение блоков памяти на основе размера страницы памяти этой платформы, точный размер блока общей памяти может быть больше или равен запрошенному размеру. При присоединении к существующему блоку общей памяти, этот параметр игнорируется.

Атрибуты и методы объекта SharedMemory:

  • SharedMemory.close(): закрывает доступ к общей памяти из этого экземпляра. Чтобы обеспечить надлежащую очистку ресурсов, все экземпляры должны вызывать .close(), как только экземпляр больше не нужен. Обратите внимание, что вызов .close() не приводит к уничтожению самого блока разделяемой памяти.
  • SharedMemory.unlink(): запрашивает уничтожение базового блока разделяемой памяти. Чтобы обеспечить надлежащую очистку ресурсов, .unlink() следует вызывать ТОЛЬКО ОДИН РАЗ для всех процессов, которым требуется блок общей памяти. После запроса на его уничтожение блок общей памяти может быть немедленно уничтожен, а может и нет, и это поведение может различаться на разных платформах. Попытки доступа к данным внутри блока разделяемой памяти после вызова функции .unlink() могут привести к ошибкам доступа к памяти. Примечание: последний процесс, освобождающий блок разделяемой памяти, может вызывать unlink() и close() в любом порядке.
  • SharedMemory.buf: представление (memoryview) содержимого блока совместно используемой памяти.
  • SharedMemory.name: уникальное имя блока общей памяти (только для чтения).
  • SharedMemory.size: размер блока общей памяти (только для чтения).

Низкоуровневое использование экземпляров SharedMemory:

>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
# <class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
# 10

# одновременное изменение нескольких значений 
>>> buffer[:4] = bytearray([22, 33, 44, 55])
# изменение по одному байту за раз
>>> buffer[4] = 100
>>> # Подключение к существующему блоку общей памяти
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
# копирование данных в новый массив array.array
>>> array.array('b', shm_b.buf[:5])
# array('b', [22, 33, 44, 55, 100])
# Изменение через `shm_b` с использованием байтов
>>> shm_b.buf[:5] = b'howdy'  
# Доступ через `shm_a`
>>> bytes(shm_a.buf[:5])
# b'howdy'

# закрываем все экземпляры `SharedMemory`
>>> shm_b.close()
>>> shm_a.close()
# чтобы освободить общую память 
# вызываем `unlink` только один раз 
>>> shm_a.unlink()

В следующем примере демонстрируется практическое использование класса SharedMemory с массивами |NumPy| при доступе к одному и тому же numpy.ndarray из двух разных оболочек Python:

>>> # В первой интерактивной оболочке Python
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8])  # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
# создаем массив NumPy, поддерживаемый общей памятью
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
# копирование исходных данных в общую память
>>> b[:] = a[:]  # Copy the original data into shared memory
>>> b
# array([1, 1, 2, 3, 5, 8])
>>> type(b)
# <class 'numpy.ndarray'>
>>> type(a)
# <class 'numpy.ndarray'>
# при создании SharedMemory не указали аргумент 
# `name`, поэтому было назначено уникальное
>>> shm.name
'psm_21467_46075'

# Во второй интерактивной оболочке Python
>>> import numpy as np
>>> from multiprocessing import shared_memory
# Присоединяемся к существующему блоку общей памяти
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
# Обратите внимание, что в этом примере значение 
#  `a.shape` равно `(6,)`, `а.dtype` равно `np.int64`
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
# array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
# array([  1,   1,   2,   3,   5, 888])

# Переходим к первой оболочке Python: `b` отражает это изменение
>>> b
# array([  1,   1,   2,   3,   5, 888])

# Переходим ко второй оболочке: выполняем очистку
>>> del c  # Это излишнее действие 
>>> existing_shm.close()

# Переходим к первой оболочке: выполняем очистку
>>> shm.close()
# в самом конце освобождаем блок общей памяти 
>>> shm.unlink()

multiprocessing.managers.SharedMemoryManager([address[, authkey]]):

Это подкласс BaseManager, который можно использовать для управления блоками общей памяти между процессами.

Вызов метода .start() экземпляра SharedMemoryManager приводит к запуску нового процесса. Единственной целью этого нового процесса является управление жизненным циклом всех блоков общей памяти, созданных с его помощью. Чтобы инициировать освобождение всех блоков разделяемой памяти, управляемых этим процессом, вызовите shutdown() для экземпляра. Это инициирует вызов SharedMemory.unlink() для всех объектов SharedMemory, управляемых этим процессом, а затем останавливает сам процесс. Создавая экземпляры SharedMemory с помощью SharedMemoryManager, мы избегаем необходимости вручную отслеживать и инициировать освобождение ресурсов общей памяти.

Этот класс предоставляет методы для создания и возврата экземпляров SharedMemory, а также для создания спископодобного объекта ShareableList, поддерживаемого общей памятью.

Для описания унаследованных необязательных входных аргументов address и authkey и того, как их можно использовать для подключения к существующей службе SharedMemoryManager из других процессов, смотрите описание класса multiprocessing.managers.BaseManager.

  • SharedMemory(size): создает и возвращает новый объект SharedMemory с указанным размером size в байтах.
  • ShareableList(sequence): создает и возвращает новый объект ShareableList, инициализированный значениями из входной последовательности sequence.

Демонстрация основных механизмов SharedMemoryManager:

>>> from multiprocessing.managers import SharedMemoryManager
>>> smm = SharedMemoryManager()
# Запускаем процесс управления блоками разделяемой памяти
>>> smm.start()
>>> sl = smm.ShareableList(range(4))
>>> sl
# ShareableList([0, 1, 2, 3], name='psm_6572_7512')
>>> raw_shm = smm.SharedMemory(size=128)
>>> another_sl = smm.ShareableList('alpha')
>>> another_sl
# ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221')

# Вызывает метод `.unlink()` для `sl`, `raw_shm` и `other_sl`
>>> smm.shutdown()

В следующем примере показан потенциально более удобный шаблон для использования объектов SharedMemoryManager с помощью инструкции with, чтобы гарантировать освобождение всех блоков общей памяти после того, как они больше не нужны:

>>> with SharedMemoryManager() as smm:
...     sl = smm.ShareableList(range(2000))
...     # разделение работы между двумя процессами, 
...     # сохраняя частичные результаты в `sl`
...     p1 = Process(target=do_work, args=(sl, 0, 1000))
...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
...     p1.start()
...     # `multiprocessing.Pool` может быть более эффективна
...     p2.start()  
...     p1.join()
...     # ждем завершения всей работы в обоих процессах
...     p2.join()
...     # консолидируем частичные результаты в sl
...     total_result = sum(sl)

При использовании SharedMemoryManager в операторе with все блоки разделяемой памяти, созданные с помощью этого диспетчера, освобождаются, когда завершается выполнение блока кода оператора with.

multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None):

Класс ShareableList предоставляет изменяемый объект, похожий на список, в котором все значения хранятся в общем блоке памяти. Этот класс ограничивает сохраняемые значения только встроенными типами данных int, float, bool, str (менее 10Мб каждый), bytes (менее 10Мб каждый) и None. Он также заметно отличается от встроенного списка тем, что ShareableList не может изменять свою общую длину (т. е. не добавлять, не вставлять и т. д.) и не поддерживают динамическое создание новых экземпляров ShareableList посредством срезов.

Аргумент sequence используется для заполнения нового списка ShareableList, полного значений. Установите значение None, подключения к уже существующему ShareableList по его уникальному имени name в общей памяти.

Аргумент name - это уникальное имя запрошенной общей памяти, как описано в определении SharedMemory. При подключении к существующему ShareableList укажите уникальное имя name его блока общей памяти, оставив для sequence значение None.

Атрибуты и методы объекта SharedMemory:

  • count(value): возвращает количество вхождений value.
  • index(value): возвращает первую индексную позицию value. Вызывает ValueError, если значение отсутствует.
  • format: атрибут только для чтения, содержащий формат упаковки структуры, используемый всеми сохраненными в данный момент значениями.
  • shm: экземпляр SharedMemory, в котором хранятся значения.

Базовое использование экземпляра ShareableList:

>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
# [<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, 
# <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
# -273.154
>>> a[2] = -78.5
>>> a[2]
# -78.5

# поддерживается изменение типов данных
>>> a[2] = 'dry ice'
>>> a[2]
# 'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
# Traceback (most recent call last):
#  ...
# ValueError: exceeds available storage for existing str
>>> a[2]
# 'dry ice'
>>> len(a)
# 7
>>> a.index(42)
# 6
>>> a.count(b'howdy')
# 0
>>> a.count(b'HoWdY')
# 1
>>> a.shm.close()
>>> a.shm.unlink()
# Использование `ShareableList` после 
# вызова `unlink()` не поддерживается
>>> del a

В следующем примере показано, как один, два или несколько процессов могут получить доступ к одному и тому же ShareableList, указав имя блока разделяемой памяти:

# В первом процессе
>>> b = shared_memory.ShareableList(range(5))
# Во втором процессе
>>> c = shared_memory.ShareableList(name=b.shm.name) 
>>> c
# ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
# -999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()

В следующих примерах показано, что объекты ShareableList (и лежащие в их основе SharedMemory) при необходимости могут быть выделены и удалены. Обратите внимание, что это будет все тот же общий объект. Такое поведение происходит потому, что десериализованный объект имеет такое же уникальное имя и просто присоединяется к существующему объекту с таким же именем (если объект еще жив):

>>> import pickle
>>> from multiprocessing import shared_memory
>>> sl = shared_memory.ShareableList(range(10))
>>> list(sl)
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> deserialized_sl = pickle.loads(pickle.dumps(sl))
>>> list(deserialized_sl)
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl[0] = -1
>>> deserialized_sl[1] = -2
>>> list(sl)
# [-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> list(deserialized_sl)
# [-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl.shm.close()
>>> sl.shm.unlink()