CANN/cann-learning-hub:torch_npu IPC特性详解
1 背景介绍【免费下载链接】cann-learning-hubCANN 学习中心仓支持在线互动运行、边学边练提供教程、示例与优化方案一站式助力昇腾开发者快速上手。项目地址: https://gitcode.com/cann/cann-learning-hubIPCInter-Process Communication进程间通信允许不同进程之间直接访问共享的设备内存而无需进行显式的内存拷贝操作从而显著提升通信效率。昇腾当前已基于Ascend Extension for PyTorch昇腾NPU适配PyTorch框架的插件也称为torch_npu提供了IPC特性的原子能力使开发者在分布式训练、强化学习等需要多进程大规模数据通信场景可以自主开发优化提升数据传输性能并节省设备内存消耗。目前已经在强化学习实践中验证通过使能IPC特性可以省去训练进程向推理进程拷贝权重数据的过程推理进程通过共享内存直接获取权重数据大幅降低了推理进程的内存消耗。本文将详细介绍IPC的原理以及使用方法。2 torch_npu的IPC特性解析2.1 torch_npu的IPC设计原理torch_npu的IPC设计整体逻辑主要包括以下两个步骤发送方操作将张量Tensor和存储Storage的内存信息封装为一个句柄Handle并结合存储大小Storage_Size、存储偏移量Storage_Offset等关键信息以及用于重构的函数一并打包返回供接收方使用接收方操作接收到这些信息后利用重构函数和Handle等参数恢复到原来的Tensor和Storage对象实现跨进程的数据共享。图1. torch_npu IPC通信流程图以reduce_tensor为例其具体流程如下:图2. reduce_tensor流程示例发送方流程统一的reduce化入口无论采用multiprocessing传参、还是直接调用reduce_tensor进行reduce化操作最终都会通过storage._share_npu_接口对内存进行reduce化处理;内存Handle生成storage._share_npu_的C实现会判断虚拟内存是否启用并调用相应的acl接口。这一过程生成一个字符串形式的内存句柄Handle并与storage_offset等辅助参数打包后返回参数传递到用户层上层Python获取到打包后的参数后结合rebuild函数返回完整的数据供用户进一步使用并发送给接收方。接收方流程参数获取与rebuild启动接收方接收发送方传来的全部参数并调用rebuild函数将参数传入从而触发NPU内存的重建通过storage._new_shared_npu接口实现。建内存解析storage._new_shared_npu的C实现会根据虚拟内存配置的不同调用相应acl接口。此步骤会解析出内存Handle并获取到实际的内存指针。在启用虚拟内存的场景中还需额外进行从物理内存到虚拟内存的映射操作。返回完整storage对象接口内部将内存指针与storage_offset等参数封装为完整的storage对象最终交由上层Python返回完成共享内存的传递过程。2.2 torch_npu IPC版本约束1. HDK多进程支持限制由于HCCP的多进程功能在HDK 25.0.RC1版本中才首次实现因此若要使用该功能必须升级至HDK 25.0.RC1或更高版本。2. HDK多次Import支持限制HDK 25.3.RC1版本修复了多次调用aclrtMemImportFromShareableHandle接口时无法成功导入的问题。为适配这一改进torch_npu 7.2.0版本移除了与多次导入不兼容的内存映射机制。因此若需使用多次导入功能需使用HDK 25.3.RC1或更高版本。3. CANN 白名单与 P2P 支持限制CANN 8.3.0版本开始支持去除白名单限制和EnableP2P功能。torch_npu 7.2.0对此进行了相应适配不再需要手动控制白名单。因此若想使用这些功能需使用CANN 8.3.RC1或更高版本。4. torch_npu功能适配限制torch_npu对HDK与CANN的相关改进进行了适配。为确保功能正常运行建议使用torch_npu 7.2.0或更高版本。5. 节点内共享限制当前仅支持在同一节点内部进行内存共享跨节点共享暂不支持。6. 内存格式限制torch_npu目前仅支持ND格式的内存共享。若通过IPC进程间通信共享Storage必须使用ND格式不支持私有内存格式的共享。2.3 IPC接口列表2.4 跨卡内存映射在处理reduce_tensor操作时接收方可以通过将接收到的Tensor直接映射到其他NPU设备上实现跨设备共享。例如# 接收方获取参数 func, list_args list(args) # 修改目标设备号示例映射到卡1 list_args[6] 1 # 重建tensor到目标设备 tensor2 func(*list_args)2.5 rebuild_npu_tensor参数详解rebuild_npu_tensor的作用是重新构建一个指向共享内存存储设备的Tensor。该函数支持跨设备Tensor映射和恢复其详细定义如下def rebuild_npu_tensor( tensor_cls, # tensor类型 (通过type(tensor)获取) tensor_size, # tensor尺寸 tensor_stride, # tensor步长 tensor_offset, # tensor偏移量 storage_cls, # storage类型 (通过type(storage)获取) dtype, # 数据类型 storage_device, # storage设备 (修改此参数实现跨卡映射) storage_handle, # 内存handle (接收方解析获取) storage_size_bytes, # storage字节大小 storage_offset_bytes, # storage字节偏移 requires_grad, # 是否需要梯度 ref_counter_handle, # 引用计数管理handle (禁止修改) ref_counter_offset, # 引用计数偏移量 (禁止修改) event_handle, # 事件handle (当前版本不支持) event_sync_required # 事件同步标记 (当前版本恒为False) )【注意】事件同步机制说明1PTA当前不支持Event序列化传输创建Event并序列化将触发错误。2发送方在创建IPC共享内存之前会执行一次流同步。2.6 底层共享接口说明2.6.1 核心接口详解# 发送方创建共享内存handle storage_handle tensor.storage()._share_npu_() # 接收方从handle重建storage storage torch.UntypedStorage._new_shared_npu(*handle_args)2.6.2 _new_shared_npu参数详解def _new_shared_npu( storage_device, # storage设备 (修改此参数实现跨卡映射) storage_handle, # 内存handle (接收方解析获取) storage_size_bytes, # storage字节大小 storage_offset_bytes, # storage字节偏移 ref_counter_handle, # 引用计数管理handle (禁止修改) ref_counter_offset, # 引用计数偏移量 (禁止修改) event_handle, # 事件handle (当前版本不支持) event_sync_required # 事件同步标记 (当前版本恒为False) ) - torch.UntypedStorage3 完整示例代码3.1 多进程之间直接传递Tensor以下示例展示了如何在多进程中直接传递Tensorimport torch import torch_npu import torch.multiprocessing as mp from multiprocessing import Queue def worker(tensor): print(f接收方收到 tensor {tensor}) if __name__ __main__: # 必须使用spawn方法 mp.set_start_method(spawn) tensor torch.full((5,), float(1.0), devicenpu:0) print(f发送方发送 tensor {tensor}) # torch.npu.synchronize() p mp.Process(targetworker, args(tensor,)) p.start() p.join()执行结果3.2 通过multiprocessing.Queue传递Tensor以下示例展示了如何利用multiprocessing.Queue进行Tensor的传输import torch import torch_npu import torch.multiprocessing as mp from multiprocessing import Queue def worker(tensor_queue): tensor tensor_queue.get() print(f接收方收到 tensor {tensor}) if __name__ __main__: # 必须使用spawn方法 mp.set_start_method(spawn) tensor_queue Queue() p mp.Process(targetworker, args(tensor_queue,)) p.start() tensor torch.full((5,), float(1.0), devicenpu:0) tensor_queue.put(tensor) print(f发送方发送 tensor {tensor}) p.join()执行结果3.3 通过reduce_tensor传递Tensor接收方可以通过修改Tensor的Device信息实现跨设备映射import torch import torch_npu import torch.multiprocessing as mp from torch.multiprocessing.reductions import reduce_tensor from multiprocessing import Queue def worker(tensor_queue): shared_handles tensor_queue.get() func, args shared_handles print(func) list_args list(args) # 修改目标设备为卡1 list_args[6] 1 tensor func(*list_args) print(f接收方收到 shared_handles {shared_handles}, tensor {tensor}) if __name__ __main__: mp.set_start_method(spawn) tensor_queue Queue() p mp.Process(targetworker, args(tensor_queue,)) p.start() tensor torch.full((5, ), float(3.14), devicefnpu:0) shared_handle reduce_tensor(tensor) tensor_queue.put(shared_handle) print(f发送方发送 shared_handle {shared_handle}, tensor {tensor}) p.join()执行结果3.4 通过_share_npu_传递Tensor (集合通信方式)该方法适合在分布式框架中处理跨进程内存共享示例如下import import torch import torch_npu import torch.distributed as dist import torch.multiprocessing as mp def worker(rank, world_size): 分布式工作进程 # 初始化分布式环境 os.environ[MASTER_ADDR] localhost os.environ[MASTER_PORT] 12355 torch.npu.set_device(rank) dist.init_process_group(backendhccl, rankrank, world_sizeworld_size) pid os.getpid() # 进程0创建共享内存 if rank 0: tensor torch.full((5,), float(1.0), devicenpu:0) # 创建共享内存handle local_storage {rank: rank, storage: tensor.storage()._share_npu_()} print(f发送方 pid {pid} 发送 tensor {tensor}, handle {local_storage[storage][1]}) else: local_storage {rank: rank, storage: None} # 广播共享内存handle gather_storage_list [None] * world_size dist.all_gather_object(gather_storage_list, local_storage) # 进程1接收并重建tensor if rank 1: tensor torch.zeros((5,), devicenpu:0) # 获取进程0发送的handle send_storage gather_storage_list[0][storage] # 重建storage对象 storage torch.UntypedStorage._new_shared_npu(*send_storage) # 设置tensor存储 tensor.set_(storage) print(f接收方 pid {pid} 收到 tensor {tensor}, handle {send_storage[1]}) if __name__ __main__: mp.set_start_method(spawn) world_size 2 processes [] # 启动工作进程 for rank in range(world_size): p mp.Process(targetworker, args(rank, world_size)) p.start() processes.append(p) # 等待所有进程结束 for p in processes: p.join()执行结果相关链接[1] Ascend Extension for PyTorch/PyTorch框架特性指南/内存资源优化/内存共享IPChttps://www.hiascend.com/document/detail/zh/Pytorch/720/ptmoddevg/Frameworkfeatures/featuresguide_00031.html[2] Ascend/pytorch代码仓https://gitcode.com/ascend/pytorch/tree/master/torch_npu/csrc/ipc【免费下载链接】cann-learning-hubCANN 学习中心仓支持在线互动运行、边学边练提供教程、示例与优化方案一站式助力昇腾开发者快速上手。项目地址: https://gitcode.com/cann/cann-learning-hub创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考