Сообщить об ошибке.

Объекты Value() и Array() модуля multiprocessing в Python

Синхронизированные объекты ctypes в общей памяти процессов

Общими ресурсами создаваемых процессов модулем multiprocessing могут быть общие объекты, используя общую память, которая может быть унаследована дочерними процессами.

Такие объекты могут создавать классы multiprocessing.Value() и multiprocessing.Array().

Содержание:


multiprocessing.Value(typecode_or_type, *args, lock=True):

Класс multiprocessing.Value() возвращает ctypes объект с атрибутом записываемого значения, выделенный из общей памяти. По умолчанию возвращаемое значение фактически является уже синхронизированной оболочкой для объекта. К самому объекту можно получить доступ через атрибут value объекта Value.

>>> import multiprocessing
>>> val = multiprocessing.Value('i', 10)
>>> val.value
# 10
>>> val.value = val.value - 1
>>> val.value
# 9

Аргумент typecode_or_type определяет тип возвращаемого объекта: это либо тип ctypes, либо односимвольный код типа, используемого модулем array.

Аргумент *args, это значение, которое передается конструктору для данного типа.

  • Если значение аргумента lock=True (по умолчанию), то создается новый объект рекурсивной блокировки для синхронизации доступа к значению.
  • Если lock - это объект Lock или RLock, то он будет использоваться для синхронизации доступа к значению.
  • Если значение lock=False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет "безопасным для процесса".

Обратите внимание, что lock - это ключевой аргумент.

Такие операции, как +=, которые включают чтение и запись, не являются атомарными. Если необходимо таким образом безопасно (атомарно) увеличить общее значение, недостаточно просто сделать:

>>> import multiprocessing
>>> val = multiprocessing.Value('i', 10)
>>> val.value += 5

Предполагая, что связанная блокировка является рекурсивной (по умолчанию), можно сделать это так:

with val.get_lock():
    val.value += 5

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True):

Класс multiprocessing.Array() возвращает массив ctypes, выделенный из общей памяти. По умолчанию возвращаемое значение фактически является синхронизированной оболочкой для массива.

Размер multiprocessing.Array() можно установить только один раз, в момент создания экземпляра. Это не список Python и не имеет методов добавления/удаления элементов!

>>> import multiprocessing
>>> arr = multiprocessing.Array('i', range(10))
>>> arr[1]
# 1
>>> arr[5]
# 5
>>> arr[5] = arr[5] + 5
>>> arr[5]
# 10

Для использования списка list Python или словаря dict в качестве общего ресурса для создаваемых процессов, используйте менеджер процессов multiprocessing.Manager().

Аргумент typecode_or_type определяет тип возвращаемого объекта: это либо тип ctypes, либо односимвольный код типа, используемого модулем array.

Если аргумент size_or_initializer является целым числом, то тогда он определяет длину массива и массив будет изначально обнулен. В противном случае size_or_initializer - это последовательность, которая используется для инициализации массива, длина которой - определяет длину массива.

  • Если значение аргумента lock=True (по умолчанию), то создается новый объект блокировки для синхронизации доступа к значению.
  • Если значение lock - это объект Lock или RLock, то он будет использоваться для синхронизации доступа к значению.
  • Если значение lock=False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет "безопасным для процесса".

Обратите внимание, что lock - это ключевой аргумент.

Обратите внимание, что массив types.c_char имеет атрибуты value и raw, которые позволяют использовать его для хранения и извлечения строк.

Пример использования синхронизированных объектов модуля ctypes в процессах.

Пример показывает, как осуществлять последовательный доступ на изменение элементов общего для процессов массива multiprocessing.Array(), при помощи объекта multiprocessing.Value().

import multiprocessing, time, random

def worker(num, arr):
    pid_proc = multiprocessing.current_process().pid
    # блокируем доступ к массиву из других потоков
    arr.acquire()
    try:
        for _ in range(3):
            # имитируем нагрузку, для того, что бы была 
            # конкуренция доступа к общему ресурсу (очереди)
            time.sleep(random.uniform(0.01, 0.1))
            # последовательно изменяем элемент 
            # массива на PID процесса 3 раза
            arr[num.value] = pid_proc
            # счетчик элементов массива
            num.value = num.value + 1
    finally:
        # завершаем процесс и разрешаем 
        # доступ к массиву другим процессам
        arr.release()

        
if __name__ == '__main__':
    # создаем общий объект для процессов
    num = multiprocessing.Value('i', 0)
    # создаем общий массив для процессов
    arr = multiprocessing.Array('i', 10)
    
    procs = []
    for _ in range(3):
        proc = multiprocessing.Process(target=worker, args=(num, arr))
        procs.append(proc)
        proc.start()

    # Ждем результатов
    [proc.join() for proc in procs]
    print('Вывод результатов:')
    print([i for i in arr if i != 0])
    # очищаем используемые ресурсы
    [proc.close() for proc in procs]

# Вывод результатов:
# [28690, 28690, 28690, 28691, 28691, 28691, 28692, 28692, 28692]