639 字
3 分钟
AsyncServer模型流程梳理

AsyncServer模型流程梳理#

模型#

model

UML图#

UML

Client && CSession/CServer#

sequenceDiagram
participant Client
participant CServer
participant CSession
participant io_context
CServer ->> CServer: 初始化acceptor, 开始监听CServer.start_accept()
io_context ->> io_context:ioc.run()
Client->>CServer: 发起连接请求
CServer->>io_context: 注册async_accept()
io_context-->>CServer: 连接到达,触发回调handle_accept()
CServer->>CSession: 创建新会话,CSession.Start()
CSession->>io_context: 注册async_read
Client->>CSession: 发送数据
io_context-->>CSession: 数据可读,触发回调handle_read()
CSession->>CSession: 处理业务逻辑
CSession->>io_context: 注册async_write
CSession-->>Client: 发送响应数据
io_context-->>CSession: 数据发送完成,触发回调handle_write()
CSession->>io_context: 再次注册async_read

CSession & LogicSystem#

sequenceDiagram
participant CSession
participant LogicSystem
participant LogicNode_Queue
CSession->>LogicSystem: 单例模式调用
LogicSystem->>LogicSystem: 初始化,注册CallBack事件,启动工作线程循环处理队列数据。
CSession->> LogicNode_Queue: 将消息数据添加到队列
LogicNode_Queue->>LogicSystem: 调用事件对应的CallBack
LogicSystem->>CSession: 执行CallBack,处理数据Send()

AsioIOServicePool与AsioIOThreadPool的对比#

二者地模型都是由主线程执行CServer,单线程地监听主线程io_context上的连接请求。构

AsioIOServicePool(并发)#

AsioIOServicePool中,创建了多个io_context,这些io_context运行在多个CPU核心上,并行地执行不同的CSession上的异步IO与异步HandleIO()函数。

LogicSystem负责单线程地从队列中读取数据,并执行对应的回调函数。随后调用CSessionSend()函数,并发地执行异步Write()

因此:在监听连接事件和异步IO的过程,利用了io_context多线程的并行。在LogicSystem执行数据处理的过程为单线程执行。

AsioIOThreadPool(并行)#

AsioIOThreadPool与ServicePool的不同主要在于,它只存在一个io_context,这个io_context由多个CSession在多个线程并行地调用。

隐患#

由于socket的访问不是线程安全的。因此当多个线程以很小的时间间隔,执行同一个socketasync_read()时,就会导致重复读取_data数据,导致第2个线程的操作覆盖前一次的操作。

解决方法:Strand序列化同一个CSession::socket上的操作#

boost::asio::strand将单个Session::socket上的IO变为单线程的串行执行。而多个Session上的执行仍然是多线程并发执行的。避免了单一socket上的数据竞争问题。

  1. CSession中添加成员变量,并且绑定io_context的执行器get_executor()
// 成员变量
using boost::asio::io_context;
boost::asio::strand<io_context::executor_type> _strand;
// 获取io_context的执行器,并绑定。
CSession::CSession(boost::asio::io_context& io_context, CServer* server):
_socket(io_context), _server(server), _b_close(false),
_b_head_parse(false), _strand(io_context.get_executor()){
boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
_uuid = boost::uuids::to_string(a_uuid);
_recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN);
}
  1. 在执行IO的过程中,使用boost::asio::bind_executor()函数,将IO的回调函数绑定到指定的strand上。
_socket.async_read_some(
boost::asio::buffer(_data, MAX_LENGTH),
boost::asio::bind_executor(
_strand,
std::bind(
&CSession::HandleRead, this,
std::placeholders::_1, std::placeholders::_2, SharedSelf()
)
)
);
boost::asio::async_write(
_socket,
boost::asio::buffer(msgnode->_data, msgnode->_total_len),
boost::asio::bind_executor(
_strand,
std::bind(
&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()
)
)
);
AsyncServer模型流程梳理
https://chrisnake11.github.io/blog/posts/coding/chatroom/asioioservicepoolserver模型流程梳理/
作者
Zheyv
发布于
2025-05-11
许可协议
CC BY-NC-SA 4.0