侧边栏壁纸
  • 累计撰写 218 篇文章
  • 累计创建 59 个标签
  • 累计收到 5 条评论

multiprocessing - 共享内存

barwe
2021-07-02 / 0 评论 / 0 点赞 / 1,832 阅读 / 3,512 字
温馨提示:
本文最后更新于 2022-07-14,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

SharedMemory

共享内存块

Python 3.8 新增了 shared_memory 模块,用于多个进程同时对 共享内存 进行访问。

中文文档:https://docs.python.org/zh-cn/3/library/multiprocessing.shared_memory.html

shared_memory 模块定义了一个类,提供了对 共享内存 对象的引用:

from multiprocessing.shared_memory import SharedMemory

这里提到的 共享内存 指的是 System V 类型的共享内存块,而不是 分布式共享内存

System V 共享内存

20180416112848158

在 Linux 中,每个 进程 都有属于自己的 进程控制块地址空间地址空间 对应了一个 页表,负责将进程的 虚拟地址 映射到 物理地址。当两个不同的虚拟地址映射到同一个物理地址时,这个物理地址指向的区域就是一块 共享内存

但是,共享内存 没有提供 同步机制,即一个进程的写入操作尚未完成时保护共享内存区域的数据不被其它进程读取或者改写。共享内存的安全机制需要通过其它方式实现,例如 信号量

使用 ipcs -m 命令可以查看系统中的共享存储段。

参考博客 https://blog.csdn.net/ypt523/article/details/79958188

分布式共享内存

应用于分布式系统上,提供了一个逻辑上统一的地址空间。

进程直接从内存读取数据,相对于在进程间发送消息、直接从磁盘读取数据或者通过其它序列化、反序列化方式复制数据,直接读取共享内存更加高效。

class SharedMemory(name=None, create=False, size=0)
  • name:readonly, 共享内存块的唯一标识
  • size:readonly, 共享内存块的 字节 大小
  • buf:内容,传递给 numpy 可以从中提取出数据
  • close():关闭实例对于共享内存的访问连接
  • unlink():请求销毁底层的共享内存块;只能调用一次。

通过 name 参数和 create 参数组合,可以选择 新建共享内存块 还是 引用存在的共享内存块

共享内存块的寿命可能会超过创建它的原始进程。当子进程不再使用内存块时,应该调用 close() 关闭当前进程对它的引用;当共享内存块不被任何进行使用时,应该调用 unlink() 方法清理掉它。

共享字节数组

新建共享内存块

原生数组可以通过将其 字节数组 赋值给 SharedMemory 实例的 .buf 属性来实现内存共享。

from multiprocessing.shared_memory import SharedMemory

shm = SharedMemory(create=True, size=10)
buffer = shm.buf
buffer[:4] = bytearray([22, 33, 44, 55])
buffer[4] = 100

重引共享内存块

使用 array 模块来获取共享内存块中的数据。

shm = SharedMemory(name=other_shm.name)

import array

## 从共享内存块中引用出数据
arr = array.array('b', shm.buf[:5])

## 引用的数据可以直接修改,赋值字节类型的数据
arr[:5] = b'howdy'

## 直接访问共享内存块中的字节数据
data = bytes(shm.buf[:5]) #=> b'howdy'

## 不要忘了关闭和注销
shm.close()
shm.unlink()

共享 numpy 数组

新建共享内存块

可以从一个已知的 numpy 数组新建一个共享内存块。

# np.ndarray 实例的 nbytes 属性返回该实例占用的字节大小
arr = np.zeros((1000,), dtype='int8')

# 不指定 name 时会分配一个默认的名字
# create 为 True 表示新建共享内存块
# size 指定共享内存块的字节大小
shm = SharedMemory(create=True, size=arr.nbytes)

## 获取 SharedMemory 实例的 Numpy 数组引用
# buffer=shm.buf 指定了数据来源
ref = np.ndarray(arr.shape, dtype=arr.dtype, buffer=shm.buf)

## 给 ref 赋值相当于给共享内存块赋值
## 需要复制数组的元素,而不是数组的引用
ref[:] = arr[:]
# ref = arr # 这是复制引用,ref的含义变了

## 派发任务到每个进程
...

## 等待所有子进程任务结束
...

## 销毁共享内存块
shm.close().
shm.unlink()

重引共享内存块

一般情况下,我们可以在 主进程 中新建一个共享内存块(上面例子中的 shm),并将 shm.name 传递给 子进程。在子进程中,可以通过指定共享内存块的唯一标识符实例化 SharedMemory 获取对共享内存块的引用。

def worker(shm_name: str, *args, **kwargs):
    shm = SharedMemory(name=shm_name)
    ref = np.ndarray(shm.size, dtype='int8', buffer=shm.buf)
    ...
    
    ## 关闭引用
    shm.close()
    
    return 

ShareableList

from multiprocessing.shared_memory import ShareableList

class ShareableList(sequence=None, *, name=None):
    ## value 出现的次数
    def count(self, value): pass
    ## 返回 value 首次出现的位置
    # 如果 value 不存在, 则抛出 ValueError 异常
    def index(self, value): pass
    ## 包含由所有当前存储值所使用的 struct 打包格式的只读属性
    format: readonly struct 
    ## 存储了值的 SharedMemory 实例
    shm: SharedMemory

一种跟 list 相似的数据结构,只是其数据存储在 共享内存块 中:

  • sequence 不为 None 时创建一个 ShareableList 实例
  • sequenceNone 时通过指定 name 来关联已经存在的 ShareableList 实例

然而 ShareableList 与内置的 list 相比存在诸多限制:

  • 只能存储基本的数据类型,例如 int, float, bool, str, bytes, None
  • 长度不能修改,不能使用 appendinsert 等方法动态插入数据,共享的数据在实例化时指定
  • 不能通过对 ShareableList 使用 切片 来创建新的 ShareableList 实例

例如:

smm.ShareableList(range(10))
#> ShareableList([0, 1, 2, 3], name='psm_6572_7512')
smm.ShareableList('abcd')
#> ShareableList(['a', 'b', 'c', 'd'], name='psm_6572_12221')

SharedMemoryManager

SharedMemory 提供了管理共享内存块的 Python API,但是它仍需要用户手动跟踪和释放共享内存资源。此外,还可以通过 multiprocessing.managers.SharedMemoryManager 类来协助管理共享内存的生命周期。SharedMemoryManager 提供了在多进程情景下对共享内存块的自动管理。

SharedMemoryManager 实例主要有两个方法:

  • .start():启动一个新进程。通过此种方式启动的进程可以使得当前的 SharedMemoryManager 实例能够管理由子进程创建的共享内存块的生存周期
  • .shutdown():在停止进程前触发实例管理的所有共享内存块的 .unlink() 方法
from multiprocessing.managers import SharedMemoryManager
with SharedMemoryManager() as smm:
	smm.start()
	...
	smm.shutdown()

上面例子中的 smmSharedMemoryManager 的一个实例。除了 startshutdown 外,它还提供了用于共享数据的两个方法(ShareableListSharedMemory):

  • .SharedMemory(size) 创建一个指定字节大小的 SharedMemory 对象并返回
  • .ShareableList(sequence) 指定输入序列创建并返回一个新的 ShareableList 对象
0

评论区