Общими ресурсами создаваемых процессов модулем multiprocessing
могут быть общие объекты, используя общую память, которая может быть унаследована дочерними процессами.
Такие объекты могут создавать классы multiprocessing.Value()
и multiprocessing.Array()
.
multiprocessing.Value()
общий синхронизированный объект для процессов,multiprocessing.Array()
общий синхронизированный массив для процессов,ctypes
в процессах.multiprocessing.Value(typecode_or_type, *args, lock=True)
:Класс multiprocessing.Value()
возвращает ctypes
объект с атрибутом записываемого значения, выделенный из общей памяти. По умолчанию возвращаемое значение фактически является уже синхронизированной оболочкой для объекта. К самому объекту можно получить доступ через атрибут value
объекта Value
.
>>> import multiprocessing
>>> val = multiprocessing.Value('i', 10)
>>> val.value
# 10
>>> val.value = val.value - 1
>>> val.value
# 9
Аргумент typecode_or_type
определяет тип возвращаемого объекта: это либо тип ctypes
, либо односимвольный код типа, используемого модулем array
.
Аргумент *args
, это значение, которое передается конструктору для данного типа.
lock=True
(по умолчанию), то создается новый объект рекурсивной блокировки для синхронизации доступа к значению. lock
- это объект Lock
или RLock
, то он будет использоваться для синхронизации доступа к значению. lock=False
, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет "безопасным для процесса".Обратите внимание, что lock
- это ключевой аргумент.
Такие операции, как +=
, которые включают чтение и запись, не являются атомарными. Если необходимо таким образом безопасно (атомарно) увеличить общее значение, недостаточно просто сделать:
>>> import multiprocessing
>>> val = multiprocessing.Value('i', 10)
>>> val.value += 5
Предполагая, что связанная блокировка является рекурсивной (по умолчанию), можно сделать это так:
with val.get_lock():
val.value += 5
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
:Класс multiprocessing.Array()
возвращает массив ctypes
, выделенный из общей памяти. По умолчанию возвращаемое значение фактически является синхронизированной оболочкой для массива.
Размер multiprocessing.Array()
можно установить только один раз, в момент создания экземпляра. Это не список Python и не имеет методов добавления/удаления элементов!
>>> import multiprocessing
>>> arr = multiprocessing.Array('i', range(10))
>>> arr[1]
# 1
>>> arr[5]
# 5
>>> arr[5] = arr[5] + 5
>>> arr[5]
# 10
Для использования списка list
Python или словаря dict
в качестве общего ресурса для создаваемых процессов, используйте менеджер процессов multiprocessing.Manager()
.
Аргумент typecode_or_type
определяет тип возвращаемого объекта: это либо тип ctypes
, либо односимвольный код типа, используемого модулем array
.
Если аргумент size_or_initializer
является целым числом, то тогда он определяет длину массива и массив будет изначально обнулен. В противном случае size_or_initializer
- это последовательность, которая используется для инициализации массива, длина которой - определяет длину массива.
lock=True
(по умолчанию), то создается новый объект блокировки для синхронизации доступа к значению. lock
- это объект Lock
или RLock
, то он будет использоваться для синхронизации доступа к значению. lock=False
, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет "безопасным для процесса".Обратите внимание, что lock
- это ключевой аргумент.
Обратите внимание, что массив types.c_char
имеет атрибуты value
и raw
, которые позволяют использовать его для хранения и извлечения строк.
ctypes
в процессах.Пример показывает, как осуществлять последовательный доступ на изменение элементов общего для процессов массива multiprocessing.Array()
, при помощи объекта multiprocessing.Value()
.
import multiprocessing, time, random
def worker(num, arr):
pid_proc = multiprocessing.current_process().pid
# блокируем доступ к массиву из других потоков
arr.acquire()
try:
for _ in range(3):
# имитируем нагрузку, для того, что бы была
# конкуренция доступа к общему ресурсу (очереди)
time.sleep(random.uniform(0.01, 0.1))
# последовательно изменяем элемент
# массива на PID процесса 3 раза
arr[num.value] = pid_proc
# счетчик элементов массива
num.value = num.value + 1
finally:
# завершаем процесс и разрешаем
# доступ к массиву другим процессам
arr.release()
if __name__ == '__main__':
# создаем общий объект для процессов
num = multiprocessing.Value('i', 0)
# создаем общий массив для процессов
arr = multiprocessing.Array('i', 10)
procs = []
for _ in range(3):
proc = multiprocessing.Process(target=worker, args=(num, arr))
procs.append(proc)
proc.start()
# Ждем результатов
[proc.join() for proc in procs]
print('Вывод результатов:')
print([i for i in arr if i != 0])
# очищаем используемые ресурсы
[proc.close() for proc in procs]
# Вывод результатов:
# [28690, 28690, 28690, 28691, 28691, 28691, 28692, 28692, 28692]