Общими ресурсами создаваемых процессов модулем multiprocessing
могут быть общие объекты, используя общую память, которая может быть унаследована дочерними процессами.
Такие объекты могут создавать классы multiprocessing.Value()
и multiprocessing.Array()
.
multiprocessing.Value()
общий синхронизированный объект для процессов,multiprocessing.Array()
общий синхронизированный массив для процессов,ctypes
в процессах.multiprocessing.Value(typecode_or_type, *args, lock=True)
:Класс multiprocessing.Value()
возвращает ctypes
объект с атрибутом записываемого значения, выделенный из общей памяти. По умолчанию возвращаемое значение фактически является уже синхронизированной оболочкой для объекта. К самому объекту можно получить доступ через атрибут value
объекта Value
.
>>> import multiprocessing >>> val = multiprocessing.Value('i', 10) >>> val.value # 10 >>> val.value = val.value - 1 >>> val.value # 9
Аргумент typecode_or_type
определяет тип возвращаемого объекта: это либо тип ctypes
, либо односимвольный код типа, используемого модулем array
.
Аргумент *args
, это значение, которое передается конструктору для данного типа.
lock=True
(по умолчанию), то создается новый объект рекурсивной блокировки для синхронизации доступа к значению. lock
- это объект Lock
или RLock
, то он будет использоваться для синхронизации доступа к значению. lock=False
, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет "безопасным для процесса".Обратите внимание, что lock
- это ключевой аргумент.
Такие операции, как +=
, которые включают чтение и запись, не являются атомарными. Если необходимо таким образом безопасно (атомарно) увеличить общее значение, недостаточно просто сделать:
>>> import multiprocessing >>> val = multiprocessing.Value('i', 10) >>> val.value += 5
Предполагая, что связанная блокировка является рекурсивной (по умолчанию), можно сделать это так:
with val.get_lock(): val.value += 5
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
:Класс multiprocessing.Array()
возвращает массив ctypes
, выделенный из общей памяти. По умолчанию возвращаемое значение фактически является синхронизированной оболочкой для массива.
Размер multiprocessing.Array()
можно установить только один раз, в момент создания экземпляра. Это не список Python и не имеет методов добавления/удаления элементов!
>>> import multiprocessing >>> arr = multiprocessing.Array('i', range(10)) >>> arr[1] # 1 >>> arr[5] # 5 >>> arr[5] = arr[5] + 5 >>> arr[5] # 10
Для использования списка list
Python или словаря dict
в качестве общего ресурса для создаваемых процессов, используйте менеджер процессов multiprocessing.Manager()
.
Аргумент typecode_or_type
определяет тип возвращаемого объекта: это либо тип ctypes
, либо односимвольный код типа, используемого модулем array
.
Если аргумент size_or_initializer
является целым числом, то тогда он определяет длину массива и массив будет изначально обнулен. В противном случае size_or_initializer
- это последовательность, которая используется для инициализации массива, длина которой - определяет длину массива.
lock=True
(по умолчанию), то создается новый объект блокировки для синхронизации доступа к значению. lock
- это объект Lock
или RLock
, то он будет использоваться для синхронизации доступа к значению. lock=False
, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет "безопасным для процесса".Обратите внимание, что lock
- это ключевой аргумент.
Обратите внимание, что массив types.c_char
имеет атрибуты value
и raw
, которые позволяют использовать его для хранения и извлечения строк.
ctypes
в процессах.Пример показывает, как осуществлять последовательный доступ на изменение элементов общего для процессов массива multiprocessing.Array()
, при помощи объекта multiprocessing.Value()
.
import multiprocessing, time, random def worker(num, arr): pid_proc = multiprocessing.current_process().pid # блокируем доступ к массиву из других потоков arr.acquire() try: for _ in range(3): # имитируем нагрузку, для того, что бы была # конкуренция доступа к общему ресурсу (очереди) time.sleep(random.uniform(0.01, 0.1)) # последовательно изменяем элемент # массива на PID процесса 3 раза arr[num.value] = pid_proc # счетчик элементов массива num.value = num.value + 1 finally: # завершаем процесс и разрешаем # доступ к массиву другим процессам arr.release() if __name__ == '__main__': # создаем общий объект для процессов num = multiprocessing.Value('i', 0) # создаем общий массив для процессов arr = multiprocessing.Array('i', 10) procs = [] for _ in range(3): proc = multiprocessing.Process(target=worker, args=(num, arr)) procs.append(proc) proc.start() # Ждем результатов [proc.join() for proc in procs] print('Вывод результатов:') print([i for i in arr if i != 0]) # очищаем используемые ресурсы [proc.close() for proc in procs] # Вывод результатов: # [28690, 28690, 28690, 28691, 28691, 28691, 28692, 28692, 28692]