TrioSim 模拟器 (五):消息的发送和接收

消息发送/接收不是靠 InferenceTracePlayer::Handle() 完成的。Handle() 只处理事件,比如 PlayNextEventLayerCompletionEventPlayNextReduceEvent

真正的消息路径是:

TracePlayer 创建 TensorMsg
-> src Port 发送
-> NetworkModel 计算传输时间并调度 transferUpdateEvent
-> Engine 到时间后调用 NetworkModel::Handle()
-> NetworkModel 把消息投递给 dst Port
-> dst Port 调用 TracePlayer::NotifyRecv()
-> TracePlayer 把 tensor 落到目标 MemoryRegion

本文重点讨论电气网络的消息发送和接收机制。在光学网络中,消息的传输过程与此类似。


发送消息

发送入口是 inference.cpp:

bool InferenceTracePlayer::MsgPkgToSend(...)

它会先找到源端口和目标端口:

auto* src = GetPortByName(srcRegion);
auto* dst = GetPortByName(dstRegion);

然后计算本次消息的数据量:

int64_t totalBytes = 0;
for (const auto& tensor : tensors) {
    totalBytes += static_cast<int64_t>(tensor.Bytes());
}

接着创建 TensorMsg

auto* msg = new TensorMsg();
msg->tensor_pkg = tensors;
msg->dst_region_name = dstRegion;
msg->gpu_id = gpu_id;
msg->purpose = purpose;
msg->meta.id = akita::sim::GetIDGenerator()->Generate();
msg->meta.src = src;
msg->meta.dst = dst;
msg->meta.send_time = CurrentTime();
msg->meta.traffic_bytes = totalBytes;

TensorMsg 定义在 trace.hpp。

它里面最重要的是:

tensor_pkg       // 要传输的张量
src / dst        // 源端口和目标端口
traffic_bytes    // 数据量
purpose          // fetch / scatter / gather / hop
gpu_id           // 目标 GPU 或相关 GPU ID

最后发送:

auto* sendError = src->Send(msg);

这里的 srcLimitNumMsgPort


Port::Send 做了什么

代码在 port.cpp:

SendError* LimitNumMsgPort::Send(Msg* msg) {
    SendError* err = conn_->Send(msg);
    ...
    return err;
}

Port 自己不计算网络延迟。它只是把消息交给连接对象 conn_

这个 conn_ 是谁?

在构建网络时设置的:

port->SetConnection(this);

所以如果当前是电气网络:

conn_ = PacketSwitchingNetworkModel

如果当前是光学网络:

conn_ = OpticalNetworkModel

网络模型负责消息传输

以电气网络为例,进入 packetswitching.cpp:

PacketSwitchingNetworkModel::Send(Msg* msg)

它不会立刻把消息交给目标端口,而是:

Route* route = findRoute(msg);
UpdateProgressNextHappenEvent(route);
scheduleNextHappenEvent();

含义是:

找路由
计算链路带宽/剩余传输时间
调度一个 transferUpdateEvent

也就是说发送是异步的。消息发出后,并不会马上到达,而是等模拟时间推进到传输完成事件。

光学网络类似,在 optical.cpp:

OpticalNetworkModel::Send(Msg* msg)

它会计算:

evt_time = now + latency + transfer_time;

然后调度:

transferUpdateEvent* evt = new transferUpdateEvent(evt_time, this, msg);
event_scheduler_->Schedule(evt);

传输完成事件如何触发接收

当模拟时间到达 transferUpdateEventSerialEngine 会调用网络模型的 Handle()

电气网络的处理在 packetswitching.cpp:

int PacketSwitchingNetworkModel::handleTransferUpdateEvent(...)

关键代码:

msg->Meta()->recv_time = time_teller_->CurrentTime();
akita::sim::SendError* err = msg->Meta()->dst->Recv(msg);

这一步才是真正“消息到达目标端口”。

也就是说:

NetworkModel::Send() 只是发起传输
NetworkModel::handleTransferUpdateEvent() 才把消息投递到 dst Port

Port::Recv 做了什么

代码在 port.cpp:

SendError* LimitNumMsgPort::Recv(Msg* msg)

它会先检查端口 buffer 是否有空间:

if (!buf_->CanPush()) {
    port_busy_ = true;
    return NewSendError();
}

如果目标端口 buffer 满了,接收失败,网络模型会把消息放进 pending_delivery_,以后端口空了再投递。

如果 buffer 有空间:

buf_->Push(msg);

然后通知端口所属组件:

comp_->NotifyRecv(msg->Meta()->recv_time, this);

这里的 comp_ 就是 InferenceTracePlayer

所以接收回调进入:

InferenceTracePlayer::NotifyRecv()

TracePlayer 接收消息

代码在 inference.cpp:

void InferenceTracePlayer::NotifyRecv(...)

它先从端口 buffer 取出消息:

akita::sim::Msg* msg = port->Retrieve(now);

然后转成 TensorMsg

auto* tensorMsg = dynamic_cast<TensorMsg*>(msg);

接着真正落地张量:

RecvTensorPkg(tensorMsg);

RecvTensorPkg() 在 inference.cpp:

RemoveInflightTransfer(msg);
AddTensorsToMemRegion(msg->tensor_pkg, msg->gpu_id, msg->purpose);

含义是:

从 inflight_transfer 中移除这条在途消息
把 tensor_pkg 加入目标 GPU 的 MemoryRegion

如果是 Ring AllReduce 的 scatter/gather,还会减少未完成发送计数:

if (send_to_finish_ > 0) {
    send_to_finish_--;
}

最后 NotifyRecv() 根据消息用途继续推进状态机:

if (purpose == "scatter" || purpose == "gather" || purpose == "hop") {
    ScheduleEvent(MakePlayNextReduceEvent(CurrentTime(), this, gpuID));
} else {
    ScheduleEvent(MakePlayNextEvent(CurrentTime(), this, gpuID));
}

评论