中山小榄网站,慕课网站开发,免费开源的个人网站系统,做执法设备有哪些网站Reactor 是一种事件驱动的设计模式#xff08;Event-Driven Pattern#xff09;#xff0c;主要用于处理高并发 I/O#xff0c;特别适合网络服务器场景。它通过一个多路复用机制监听多个事件源#xff08;如 socket 文件描述符#xff09;#xff0c;并在事件就绪时将事…        Reactor 是一种事件驱动的设计模式Event-Driven Pattern主要用于处理高并发 I/O特别适合网络服务器场景。它通过一个多路复用机制监听多个事件源如 socket 文件描述符并在事件就绪时将事件分发给对应的处理器回调函数执行。六、单reactor单线程模型1、核心思想利用一个统一的事件分发中心EventLoop在单个线程中通过 I/O 多路复用机制如 epoll高效监听和响应多个并发连接的 I/O 事件将事件检测与事件处理解耦从而实现结构简单、性能良好的并发 I/O 服务器。
2、核心组件类名主要职责Channel封装一个 fd 的事件及回调是 fd 与 epoll 的桥梁EventLoop管理 epoll 及事件分发是每个线程中的核心循环TcpServer管理监听 socket 和连接接入逻辑创建 TcpConnTcpConn封装已建立的连接的读写处理、缓冲、生命周期等1Channel类-事件通道
Channel 是 一个 fd 与其事件处理逻辑之间的中介负责事件注册设置监听哪些事件如 EPOLLIN, EPOLLOUT事件触发内核通知时调用用户设置的回调函数与 EventLoop 协作将事件注册/修改到 epoll 中
.h/* 负责在事件分发系统中起到“事件通道”的作用连接底层的I/O多路复用机制如epoll、select、poll和具体的事件处理逻辑 */
class EventLoop;class Channel {
public:Channel(EventLoop loop, int fd);~Channel();// 设置 fd 对应的感兴趣事件EPOLLIN/EPOLLOUTvoid EnableReading();void EnableWriting();void DisableWriting();void DisableAll();// 用户提供的读写事件回调void SetReadCallback(std::functionvoid() cb);void SetWriteCallback(std::functionvoid() cb);// 实际处理事件触发调用该函数判断是否调用 read_cb_ 或 write_cb_void HandleEvent(uint32_t events);// 获取 fdint GetFd() const;// 获取事件uint32_t GetEvents() const;// 将当前 Channel 注册到 epoll 或修改其状态。void Update();private:int fd_;                          // 监听的文件描述符bool added_  false;              // 是否已添加到 epollEventLoop loop_;                 // 所属的事件循环uint32_t events_;                 // 当前监听的事件类型EPOLLIN/EPOLLOUTstd::functionvoid() read_cb_;   // 读事件回调std::functionvoid() write_cb_;  // 写事件回调
};函数名功能调用时机EnableReading()关注读事件并更新 epoll 状态监听 socket / 连接 socket 可读时EnableWriting()关注写事件注册 EPOLLOUTSend() 数据写不完时注册DisableWriting()注销写事件避免忙等写完所有 buffer 后HandleEvent(events)根据 epoll 返回事件触发回调epoll_wait 返回后由 EventLoop 调用Update()将本 Channel 注册或修改到 epoll每次 Enable/Disable 事件后必须调用.cppChannel::Channel(EventLoop loop, int fd): fd_(fd), loop_(loop), events_(0) {}Channel::~Channel() {DisableAll();
}void Channel::EnableReading() {events_ | EPOLLIN;Update();
}void Channel::EnableWriting() {events_ | EPOLLOUT;Update();
}void Channel::DisableWriting() {events_  ~EPOLLOUT;Update();
}void Channel::DisableAll() {loop_.DelEvent(fd_);events_  0;
}void Channel::SetReadCallback(std::functionvoid() cb) {read_cb_  std::move(cb);
}void Channel::SetWriteCallback(std::functionvoid() cb) {write_cb_  std::move(cb);
}void Channel::HandleEvent(uint32_t events) {if ((events  EPOLLIN)  read_cb_) read_cb_();if ((events  EPOLLOUT)  write_cb_) write_cb_();
}int Channel::GetFd() const {return fd_;
}uint32_t Channel::GetEvents() const {return events_;
}void Channel::Update() {if (!added_) {loop_.AddEvent(fd_, events_, this);added_  true;} else {loop_.ModEvent(fd_, events_, this);}
}2EventLoop类-事件循环
EventLoop 是 Reactor 的核心调度器负责管理所有 Channel 的事件监听调用 epoll_wait 等待就绪事件分发事件并调用 Channel 的处理函数
.h#define MAX_EVENTS 1024/* 负责事件的等待、分发和调度执行*/
class EventLoop
{
public:EventLoop() ;~EventLoop();void AddEvent(int fd, uint32_t events, void *ptr);void ModEvent(int fd, uint32_t events, void *ptr);void DelEvent(int fd);void Run();void stop();
private:int epfd_;bool running_;
};
函数名功能说明Run()启动事件循环持续调用 epoll_wait 并触发回调stop()停止事件循环设置 running_ 为 falseAddEvent(fd, events, ptr)添加一个 fd 到 epollfd 封装在 Channel 内由 TcpConn 创建ModEvent(fd, events, ptr)修改 fd 的监听事件典型于 Send() 或关闭写事件DelEvent(fd)从 epoll 中删除 fd连接关闭、Channel 销毁时.cppEventLoop::EventLoop() : epfd_(::epoll_create1(0)), running_(true)
{if (epfd_  -1){std::cerr  epoll_create error:   errno  std::endl;exit(EXIT_FAILURE);}
}EventLoop::~EventLoop()
{close(epfd_);
}void EventLoop::AddEvent(int fd, uint32_t events, void *ptr)
{epoll_event ev;ev.events  events;ev.data.ptr  ptr;if (::epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, ev)  -1){std::cerr  epoll_ctl add error:   errno  std::endl;}
}void EventLoop::ModEvent(int fd, uint32_t events, void *ptr)
{epoll_event ev;ev.events  events;ev.data.ptr  ptr;if (::epoll_ctl(epfd_, EPOLL_CTL_MOD, fd, ev)  -1){std::cerr  epoll_ctl mod error:   errno  std::endl;}
}void EventLoop::DelEvent(int fd)
{if (::epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr)  -1){std::cerr  epoll_ctl del error:   errno  std::endl;}
}void EventLoop::Run()
{epoll_event events[MAX_EVENTS];while (running_){// 超时参数传入TimerInstance()-WaitTime()功能是确保 epoll_wait 最迟要在定时任务到期时返回否则任务会延迟处理。int nfds  ::epoll_wait(epfd_, events, MAX_EVENTS, TimerInstance()-WaitTime());if (nfds  -1){if (errno  EINTR) // EINTR系统中断时忽略重试其他错误打印后直接返回。continue;std::cerr  epoll_wait error:   errno  std::endl;return;}// 遍历就绪事件数组for (int i  0; i  nfds; i){// 获取就绪事件存储在 data.ptr 的 Channel*Channel* ch  static_castChannel*(events[i].data.ptr);// 触发读/写等回调ch-HandleEvent(events[i].events);}// 处理定时器任务TimerInstance()-HandleTimeout();}
}void EventLoop::stop() {running_  false;
}
3TcpServer-连接管理
TcpServer 是服务端框架的顶层组织者负责创建监听 socket创建 socket、bind、listen创建Accept Channel负责接收新连接事件并注册到 EventLoop创建新连接回调接收到新连接后通过回调交由用户处理
.hclass TcpConn;
class EventLoop;class TcpServer {
public:// 新连接回调供用户处理新连接事件。using NewConnCallback  std::functionvoid(std::shared_ptrTcpConn);TcpServer(EventLoop loop);~TcpServer();// 启动 TCP 服务监听注册新连接的回调void Start(uint16_t port, NewConnCallback cb);private:// 新连接建立时的处理逻辑作为回调函数使用void HandleAccept();private:EventLoop loop_;int listen_fd_;std::shared_ptrChannel accept_channel_; // 监听 fd 的事件通道负责读事件注册新连接到来NewConnCallback new_conn_cb_;             // 外部注入的新连接回调函数
};函数名功能说明Start(port, cb)初始化 socket设置 Channel 和回调注册 EPOLLIN 用于接收连接HandleAccept()有新连接到来时被触发accept() 然后调用回调构建 TcpConnnew_conn_cb_用户设置的处理逻辑通常设置为 lambda 创建 TcpConn 实例.cppTcpServer::TcpServer(EventLoop loop): loop_(loop), listen_fd_(-1) {}TcpServer::~TcpServer() {if (listen_fd_ ! -1) {accept_channel_-DisableAll();close(listen_fd_);}
}void TcpServer::Start(uint16_t port, NewConnCallback cb) {new_conn_cb_  std::move(cb); // 保存回调函数// 创建 IPv4 TCP 套接字非阻塞模式listen_fd_  socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);if (listen_fd_  -1) {std::cerr  socket error:   errno  std::endl;return;}// 配置 SO_REUSEADDR端口立即重用避免“Address already in use” 和 SO_REUSEPORT多进程监听同一端口可用于多核负载均衡int opt  1;setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, opt, sizeof(opt));setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEPORT, opt, sizeof(opt));// 绑定 socket 到指定端口地址sockaddr_in addr{};addr.sin_family  AF_INET;addr.sin_addr.s_addr  INADDR_ANY;addr.sin_port  htons(port);if (bind(listen_fd_, (sockaddr*)addr, sizeof(addr))  -1) {std::cerr  bind error:   errno  std::endl;close(listen_fd_);return;}// 开始监听监听队列长度为 SOMAXCONN4096if (listen(listen_fd_, SOMAXCONN)  -1) {std::cerr  listen error:   errno  std::endl;close(listen_fd_);return;}// 创建 Channel 对象监听 listen_fd_ 上的事件会注册到 epoll 中accept_channel_  std::make_sharedChannel(loop_, listen_fd_);// 设置读事件回调函数即新连接到来时应调用的逻辑函数accept_channel_-SetReadCallback([this]() { HandleAccept(); });// 启动监听accept_channel_-EnableReading();std::cout  Server listening on port   port  std::endl;
}void TcpServer::HandleAccept() {sockaddr_in client_addr{};socklen_t len  sizeof(client_addr);// 接受新连接返回新的客户端 fd设置非阻塞int conn_fd  accept4(listen_fd_, (sockaddr*)client_addr, len, SOCK_NONBLOCK);if (conn_fd  -1) return;// 创建新的连接对象TcpConn管理该客户端 fdauto conn  std::make_sharedTcpConn(conn_fd, loop_);// 调用用户逻辑处理这个连接if (new_conn_cb_) new_conn_cb_(conn);
}
4TcpConn-与客户端通信
TcpConn 封装了每个 TCP 连接的生命周期、读写事件、缓冲区管理等。
.h/**声明 TcpConn 类同时继承 enable_shared_from_this方便在回调中安全获取 shared_ptrTcpConn 自己的智能指针。*/
class TcpConn : public std::enable_shared_from_thisTcpConn {
public:// 声明回调函数using ReadCallback  std::functionvoid();using CloseCallback  std::functionvoid();TcpConn(int fd, EventLoop loop);~TcpConn();// 设置回调函数void SetReadCallback(ReadCallback cb);void SetCloseCallback(CloseCallback cb);// 异步发送数据int Send(const char* data, size_t size);// 获取当前接收缓冲区内的全部数据std::string GetAllData();private:// 内部事件处理函数读取、写入、关闭连接void HandleRead();void HandleWrite();void Close();private:int fd_;bool closed_;EventLoop loop_;MessageBuffer input_buffer_;       // 读缓冲区std::string output_buffer_{};        // 写缓冲区std::shared_ptrChannel channel_; // 封装 fd 的 epoll 管理类ReadCallback read_cb_;             // 外部注入的回调函数CloseCallback close_cb_;
};
函数名功能调用说明HandleRead()可读事件触发时从 fd 读数据到 input_buffer_然后触发 read_cb_HandleWrite()可写事件触发时将 output_buffer_ 中数据写出写完后注销 EPOLLOUTSend(data)异步发送数据若 fd 可写则立即写否则加入 bufferGetAllData()获取 input_buffer_ 所有数据可在 onMessage 中使用Close()主动关闭连接注销事件、关闭 fd、触发 close_cb_.cppTcpConn::TcpConn(int fd, EventLoop loop): fd_(fd), closed_(false), loop_(loop) {// // 设置 socket 为非阻塞模式// int flags  fcntl(fd, F_GETFL, 0);// fcntl(fd, F_SETFL, flags | O_NONBLOCK);// 创建 Channel 并设置读写回调注册 EPOLLIN 监听可读事件。channel_  std::make_sharedChannel(loop_, fd);channel_-SetReadCallback([this]() { HandleRead(); });channel_-SetWriteCallback([this]() { HandleWrite(); });channel_-EnableReading();
}TcpConn::~TcpConn() {Close();
}void TcpConn::SetReadCallback(ReadCallback cb) {read_cb_  std::move(cb);
}void TcpConn::SetCloseCallback(CloseCallback cb) {close_cb_  std::move(cb);
}int TcpConn::Send(const char* data, size_t size) {// 若连接已关闭或无数据直接返回。if (closed_ || data  nullptr || size  0) return -1;// 如果写缓冲区中已有未发送的数据追加数据并监听写事件。if (!output_buffer_.empty()) {output_buffer_.append(data, size);channel_-EnableWriting();return size;}// 直接发送零拷贝MSG_NOSIGNAL防止对方关闭连接时触发 SIGPIPE 信号使得进程崩溃。int n  send(fd_, data, size, MSG_NOSIGNAL);if (n  0  (errno  EAGAIN || errno  EWOULDBLOCK)) { // 非阻塞 socket 下内核发送缓冲区可能暂时满// 将数据缓存到用户态 output_buffer_ 监听写事件output_buffer_.append(data, size);channel_-EnableWriting();} else if (n  0  n  static_castint(size)) {   // 有部分数据被写入 socket如发送了 n 字节小于总长度 size// 剩下的数据未能写完需要缓存到 output_buffer_并监听写事件output_buffer_.append(data  n, size - n);channel_-EnableWriting();} else if (n  0) {Close();}return n;
}std::string TcpConn::GetAllData() {auto data  input_buffer_.GetAllData();if (data.first ! nullptr) {// 获取读缓冲区全部有效数据std::string result(reinterpret_castchar*(data.first), data.second);// 标记已读的数据input_buffer_.ReadCompleted(data.second);return result;}return ;
}void TcpConn::HandleRead() {int err  0;// 调用 MessageBuffer::Recv() 使用 readv() 读取数据读取数据到缓冲区中。int n  input_buffer_.Recv(fd_, err);if (n  0  read_cb_) {read_cb_(); // 触发读取回调逻辑} else if (n  0 || (n  0  err ! EAGAIN  err ! EWOULDBLOCK)) { // 连接关闭或错误则关闭连接。Close();}
}void TcpConn::HandleWrite() {// 如果写缓冲区为空则取消监听写事件。if (output_buffer_.empty()) {channel_-DisableWriting();return;}// 缓冲区不为空调用 send() 发送数据发送成功则删除发送缓冲区中的数据。int n  send(fd_, output_buffer_.data(), output_buffer_.size(), MSG_NOSIGNAL);if (n  0) {output_buffer_.erase(0, n);if (output_buffer_.empty()) {channel_-DisableWriting();}} else if (n  0  errno ! EAGAIN  errno ! EWOULDBLOCK) { // 非阻塞错误以外的写失败则关闭连接Close();}
}void TcpConn::Close() {if (closed_) return;closed_  true;channel_-DisableAll();close(fd_);if (close_cb_) close_cb_();
}
3、完整调用流程
启动阶段TcpServer::Start(port, cb)创建 listen_fd设为非阻塞创建 accept_channel_ 封装 listen_fd设置其 read_cb 为 TcpServer::HandleAccept将其注册到 EventLoop 中AddEvent()
有连接到来内核通知 listen_fd 可读 → Channel::HandleEvent() → read_cb_ → TcpServer::HandleAccept()调用 accept()获得 conn_fd设置为非阻塞构建 TcpConn 对象封装连接的生命周期创建该连接的 Channel并注册其 read_cb_ → TcpConn::HandleRead
数据收发流程conn_fd 可读 → epoll 通知 → EventLoop 触发 TcpConn::HandleRead()从 socket 读入数据到 input_buffer_调用 read_cb_ 让上层应用逻辑处理数据TcpConn::Send()若当前 fd 可写则直接发送否则写入 output_buffer_并注册 EPOLLOUT 事件channel_-SetWriteCallback(...) 设定写回调conn_fd 可写 → TcpConn::HandleWrite()从 output_buffer_ 中写出数据若 buffer 为空注销写事件调用 writeCompleteCallback