Общими ресурсами создаваемых процессов модулем multiprocessing могут быть общие объекты, используя общую память, которая может быть унаследована дочерними процессами.
Такие объекты могут создавать классы multiprocessing.Value() и multiprocessing.Array().
multiprocessing.Value() общий синхронизированный объект для процессов,multiprocessing.Array() общий синхронизированный массив для процессов,ctypes в процессах.multiprocessing.Value(typecode_or_type, *args, lock=True):Класс multiprocessing.Value() возвращает ctypes объект с атрибутом записываемого значения, выделенный из общей памяти. По умолчанию возвращаемое значение фактически является уже синхронизированной оболочкой для объекта. К самому объекту можно получить доступ через атрибут value объекта Value.
>>> import multiprocessing >>> val = multiprocessing.Value('i', 10) >>> val.value # 10 >>> val.value = val.value - 1 >>> val.value # 9
Аргумент typecode_or_type определяет тип возвращаемого объекта: это либо тип ctypes, либо односимвольный код типа, используемого модулем array.
Аргумент *args, это значение, которое передается конструктору для данного типа.
lock=True (по умолчанию), то создается новый объект рекурсивной блокировки для синхронизации доступа к значению.lock - это объект Lock или RLock, то он будет использоваться для синхронизации доступа к значению.lock=False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет "безопасным для процесса".Обратите внимание, что lock - это ключевой аргумент.
Такие операции, как +=, которые включают чтение и запись, не являются атомарными. Если необходимо таким образом безопасно (атомарно) увеличить общее значение, недостаточно просто сделать:
>>> import multiprocessing >>> val = multiprocessing.Value('i', 10) >>> val.value += 5
Предполагая, что связанная блокировка является рекурсивной (по умолчанию), можно сделать это так:
with val.get_lock(): val.value += 5
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True):Класс multiprocessing.Array() возвращает массив ctypes, выделенный из общей памяти. По умолчанию возвращаемое значение фактически является синхронизированной оболочкой для массива.
Размер multiprocessing.Array() можно установить только один раз, в момент создания экземпляра. Это не список Python и не имеет методов добавления/удаления элементов!
>>> import multiprocessing >>> arr = multiprocessing.Array('i', range(10)) >>> arr[1] # 1 >>> arr[5] # 5 >>> arr[5] = arr[5] + 5 >>> arr[5] # 10
Для использования списка list Python или словаря dict в качестве общего ресурса для создаваемых процессов, используйте менеджер процессов multiprocessing.Manager().
Аргумент typecode_or_type определяет тип возвращаемого объекта: это либо тип ctypes, либо односимвольный код типа, используемого модулем array.
Если аргумент size_or_initializer является целым числом, то тогда он определяет длину массива и массив будет изначально обнулен. В противном случае size_or_initializer - это последовательность, которая используется для инициализации массива, длина которой - определяет длину массива.
lock=True (по умолчанию), то создается новый объект блокировки для синхронизации доступа к значению.lock - это объект Lock или RLock, то он будет использоваться для синхронизации доступа к значению.lock=False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет "безопасным для процесса".Обратите внимание, что lock - это ключевой аргумент.
Обратите внимание, что массив types.c_char имеет атрибуты value и raw, которые позволяют использовать его для хранения и извлечения строк.
ctypes в процессах.Пример показывает, как осуществлять последовательный доступ на изменение элементов общего для процессов массива multiprocessing.Array(), при помощи объекта multiprocessing.Value().
import multiprocessing, time, random def worker(num, arr): pid_proc = multiprocessing.current_process().pid # блокируем доступ к массиву из других потоков arr.acquire() try: for _ in range(3): # имитируем нагрузку, для того, что бы была # конкуренция доступа к общему ресурсу (очереди) time.sleep(random.uniform(0.01, 0.1)) # последовательно изменяем элемент # массива на PID процесса 3 раза arr[num.value] = pid_proc # счетчик элементов массива num.value = num.value + 1 finally: # завершаем процесс и разрешаем # доступ к массиву другим процессам arr.release() if __name__ == '__main__': # создаем общий объект для процессов num = multiprocessing.Value('i', 0) # создаем общий массив для процессов arr = multiprocessing.Array('i', 10) procs = [] for _ in range(3): proc = multiprocessing.Process(target=worker, args=(num, arr)) procs.append(proc) proc.start() # Ждем результатов [proc.join() for proc in procs] print('Вывод результатов:') print([i for i in arr if i != 0]) # очищаем используемые ресурсы [proc.close() for proc in procs] # Вывод результатов: # [28690, 28690, 28690, 28691, 28691, 28691, 28692, 28692, 28692]