济南建设网站企业报价,咸阳网站建设工作室,男女做暖暖的试看网站大全,外贸网站建设不可缺少的灵活性目录 1.实现目标
2.HTTP服务器
实现高性能服务器-Reactor模型
模块划分
SERVER模块#xff1a;
HTTP协议模块#xff1a;
3.项目中的子功能
秒级定时任务实现
时间轮实现
正则库的简单使用
通⽤类型any类型的实现
4.SERVER服务器实现
日志宏的封装
缓冲区Buffer…目录 1.实现目标
2.HTTP服务器
实现高性能服务器-Reactor模型
模块划分
SERVER模块
HTTP协议模块
3.项目中的子功能
秒级定时任务实现
时间轮实现
正则库的简单使用
通⽤类型any类型的实现
4.SERVER服务器实现
日志宏的封装
缓冲区Buffer类实现
套接字Socket实现
事件管理Channel模块实现
描述符事件监控Poller模块实现
定时任务管理TimerWheel类实现
EventLoop线程池类实现
编辑
LoopThread模块实现
LoopThreadPool线程池模块实现
通信连接管理Connection模块实现
监听描述符管理Acceptor类实现
TcpServer模块
HTTP协议模块实现
Util⼯具类实现
HttpRequest请求类实现
HttpResponse响应类实现
HttpServer服务器模块
服务器搭建并进行测试
搭建服务器
长连接测试
测试超时连接是否销毁
错误请求处理
服务器性能达到瓶颈的处理
一次发送多个请求测试
测试大文件传输
服务器性能测试
项目源码 1.实现目标 通过实现的⾼并发服务器组件可以简洁快速的完成⼀个⾼性能的服务器搭建。并且通过组件内提供的不同应⽤层协议⽀持也可以快速完成⼀个⾼性能应⽤服务器的搭建 2.HTTP服务器 HTTP概念超⽂本传输协议是应⽤层协议是⼀种简单的请求-响应协议HTTP服务器本质上就是个TCP服务器只不过在应⽤层基于HTTP协议格式进⾏数据的组织和解析来明确客⼾端的请求并完成业务处理 实现高性能服务器-Reactor模型 概念Reactor模式是指通过⼀个或多个输⼊同时传递给服务器进⾏请求处理时的事件驱动处理模式。服务端程序处理传⼊多路请求并将它们同步分派给请求对应的处理线程Reactor模式也叫Dispatcher模式。使⽤ I/O多路复⽤统⼀监听事件收到事件后分发给处理进程或线程 分类1.单Reactor单线程单I/O多路复⽤业务处理 1. 通过IO多路复⽤模型进⾏客⼾端请求监控 2. 触发事件后进⾏事件处理 a. 如果是新建连接请求则获取新建连接并添加⾄多路复⽤模型进⾏事件监控。 b. 如果是数据通信请求则进⾏对应数据处理接收数据处理数据发送响应 优点所有操作均在同⼀线程中完成思想流程较为简单不涉及进程/线程间通信及资源争抢问题。 缺点⽆法有效利⽤CPU多核资源很容易达到性能瓶颈。适⽤场景适⽤于客⼾端数量较少且处理速度较为快速的场景 2.单Reactor多线程单I/O多路复⽤线程池业务处理 1. Reactor线程通过I/O多路复⽤模型进⾏客户端请求监控 2. 触发事件后进⾏事件处理 a. 如果是新建连接请求则获取新建连接并添加⾄多路复⽤模型进⾏事件监控。 b. 如果是数据通信请求则接收数据后分发给Worker线程池进⾏业务处理。 c. ⼯作线程处理完毕后将响应交给Reactor线程进⾏数据响应 优点充分利⽤CPU多核资源 缺点多线程间的数据共享访问控制较为复杂单个Reactor承担所有事件的监听和响应在单线程中运⾏⾼并发场景下容易成为性能瓶颈。 3.多Reactor多线程多I/O多路复⽤线程池业务处理 1. 在主Reactor中处理新连接请求事件有新连接到来则分发到⼦Reactor中监控 2. 在⼦Reactor中进⾏客户端通信监控有事件触发则接收数据分发给Worker线程池 3. Worker线程池分配独⽴的线程进⾏具体的业务处理 a. ⼯作线程处理完毕后将响应交给⼦Reactor线程进⾏数据响应 当前项目采用的方式One Thread One Loop主从Reactor模型⾼并发服务器 1.主Reactor线程仅仅监控监听描述符获取新建连接保证获取新连接的⾼效性提⾼服务器的并发性能。 2.主Reactor获取到新连接后分发给从属Reactor进⾏通信事件监控。而从属Reactor线程监控各⾃的描述符的读写事件进⾏数据读写以及业务处理。 3.One Thread One Loop的思想就是把所有的操作都放到⼀个线程中进⾏⼀个线程对应⼀个事件处理的循环 简单来说就是主属Reactor用于对连接的管理从属Reactor就是把剩下的工作全部做完 组件使用者需要自行决定是否需要线程池自己完成实现。 模块划分 我们要实现的是⼀个带有协议⽀持的Reactor模型⾼性能服务器因此我们可以划分成两个模块 SERVER模块实现Reactor模型的TCP服务器 协议模块对当前的Reactor模型服务器提供应⽤层协议⽀持。 SERVER模块 分成3个方面的管理 1.监听连接管理 2.通信连接管理 3.超时连接管理 根据这3个方面的管理我们可以划分为下面多个子模块Buffer模块Buffer模块是⼀个缓冲区模块⽤于实现通信中用户态的接收缓冲区和发送缓冲区功能 Socket模块Socket模块是对套接字操作封装的⼀个模块主要实现的socket的各项操作 Channel模块结合Poller模块对事件进行监控处理分别监控可读可写错误任意事件的监控根据不同的事件调用不同的回调函数进行处理 Poller模块Poller模块是对epoll进⾏封装的⼀个模块主要实现epoll的IO事件添加修改移除获取活跃连接功能。 Connection模块Connection模块是对Buffer模块Socket模块Channel模块的⼀个整体封装实现了对⼀个通信套接字的整体的管理每⼀个进⾏数据通信的套接字也就是accept获取到的新连接都会使⽤Connection进⾏管理 Acceptor模块Acceptor模块是对Socket模块Channel模块的⼀个整体封装实现了对⼀个监听套接字的整体的管理 TimerQueue模块TimerQueue模块是实现固定时间定时任务的模块可以理解就是要给定时任务管理器向定时任务管理器中添加⼀个任务任务将在固定时间后被执⾏同时也可以通过刷新定时任务来延迟任务的执⾏。 这个模块主要是对Connection对象的⽣命周期管理对⾮活跃连接进⾏超时后的释放功能。 EventLoop模块EventLoop模块是对Poller模块TimerQueue模块Socket模块的⼀个整体封装进⾏所有描述符的事件监控这里为了保证保证整个服务器的线程安全问题因此要求使⽤者对于Connection的所有操作⼀定要在其对应的EventLoop线程内完成也就是One Thread One Loop 的思想 TcpServer模块这个模块是⼀个整体Tcp服务器模块的封装内部封装了Acceptor模块EventLoopThreadPool模块 HTTP协议模块 Util模块这个模块是⼀个⼯具模块主要提供HTTP协议模块所⽤到的⼀些⼯具函数⽐如url编解码⽂件读写等 HttpRequest模块这个模块是HTTP请求数据模块⽤于保存HTTP请求数据被解析后的各项请求元素信息 HttpResponse模块这个模块是HTTP响应数据模块⽤于业务处理后设置并保存HTTP响应数据的的各项元素信息最终会被按照HTTP协议响应格式组织成为响应信息发送给客户端 HttpContext模块这个模块是⼀个HTTP请求接收的上下⽂模块主要是为了防⽌在⼀次接收的数据中不是⼀个完整的HTTP请求则解析过程并未完成⽆法进⾏完整的请求处理需要在下次接收到新数据后继续根据上下⽂进⾏解析最终得到⼀个HttpRequest请求信息对象 HttpServer模块这个模块是最终给组件使⽤者提供的HTTP服务器模块了⽤于以简单的接⼝实现HTTP服务器的搭建。 3.项目中的子功能 秒级定时任务实现 在我们的项目中需要使用到超时销毁的功能 所以我们需要一个时间轮能让一个连接在一定的时间进行销毁这就需要我们写一个简单的秒级定时任务 在Linux中给我们提供了定时器 int timerfd_create(int clockid, int flags); clockid: CLOCK_REALTIME-系统实时时间如果修改了系统时间就会出问题 CLOCK_MONOTONIC-从开机到现在的时间是⼀种相对时间(采用这个时间保证准确性 flags: 0-默认阻塞属性 int timerfd_settime(int fd, int flags, struct itimerspec *new, structitimerspec *old);fd: timerfd_create返回的⽂件描述符 flags: 0-相对时间 1-绝对时间默认设置为0即可. new ⽤于设置定时器的新超时时间 old ⽤于接收原来的超时时间 struct timespec {time_t tv_sec; /* Seconds */long tv_nsec; /* Nanoseconds */};struct itimerspec {struct timespec it_interval; /* 第⼀次之后的超时间隔时间 */struct timespec it_value; /* 第⼀次超时时间 */}; 定时器会在每次超时⾃动给fd中写⼊8字节的数据表⽰在上⼀次读取数据到当前读取数据期间超时了多少次。这样我们每次读取定时器的时候都知道他超时了多少次然后我们就执行多少秒的释放连接即可 定时器的使用用例 #include iostream
#include unistd.h
#include sys/timerfd.hint main()
{//int timerfd_create(int clockid, int flags);int timerfd timerfd_create(CLOCK_MONOTONIC,0);if(timerfd 0){perror(timerfd_create fail);return -1;}//int timerfd_settime(int fd, int flags,const struct itimerspec *new_value,struct itimerspec *old_value);//设置结构体struct itimerspec itim;itim.it_value.tv_sec 1;itim.it_value.tv_nsec 0; //设置第一次超时时间itim.it_interval.tv_sec 1;itim.it_interval.tv_nsec 0; //第一次超时之后每隔1秒超时一次int ret timerfd_settime(timerfd,0,itim,nullptr);if(ret ! 0){perror(timerfd_settime fail);return -1;}while(1){//因为这个也是一个文件描述符所以可以使用read进行系统调用来读取到其中的数据uint64_t times 0;int ret read(timerfd,times,8);if(ret 0){perror(read fail);return -1;}std::couttimesstd::endl;}return 0;
} 时间轮实现 如果每次我们都需要遍历一次连接来进行超时销毁这样效率是非常低的这里采用时间轮的思想来提高效率 我们通过Linux提供的定时器每次将tick指针移动到指定位置时进行销毁即可 如果我们需要使用到分级定时器或者时级定时器那么为了保证不消耗那么多空间这里采用像Linux中页面的设计那么采用多级时间轮。 这样就可以大大的节省空间了3个时间轮就可以把一个的每一个时刻都定位到了如果还想要精确到年月日也是可以通过创建更多的时间轮来实现。 这里主要实现秒级时间轮 #include iostream
#include functional
#include vector
#include unistd.h
#include memory
#include unordered_mapusing OnCloseTime std::functionvoid(); //定时器要执行的任务
using ReleaseTime std::functionvoid(); //删除时间管理对象中weak_ptr的信息//定时器任务对象
class TimeTask
{
private:uint32_t _timeout; //超时时间uint64_t _id; //每个任务的idbool _cancel; //取消定时任务OnCloseTime _close_cb; //销毁定时任务的回调ReleaseTime _release_cb; //因为时间轮中会记录一个weak_ptr对象所以最后需要销毁
public:TimeTask(uint32_t timeout,uint64_t id,const OnCloseTime close_cb):_timeout(timeout),_id(id),_close_cb(close_cb),_cancel(false){}~TimeTask(){ if(_cancel false) _close_cb();_release_cb();}uint64_t id() { return _id; }void SetRelease(const ReleaseTime release_cb) { _release_cb release_cb; }uint32_t Delay() { return _timeout; }void Cancel() { _cancel true; }
};//时间轮管理对象
class TimeWheel
{
private:using WeakTask std::weak_ptrTimeTask; //使用weak_ptr防止在shared_ptr直接对对象的操作using PtrTask std::shared_ptrTimeTask; //使用shared_ptr保证释放时不到0不销毁int _capacity; //记录时间轮的大小int _tick; //记录当前指针指向的时间指到哪里销毁哪里std::vectorstd::vectorPtrTask _wheel; //时间轮std::unordered_mapuint64_t,WeakTask _times; //记录id和weak_ptr之间的映射关系
private:void RemoveTask(uint64_t id){if(_times.find(id) ! _times.end()){_times.erase(id);}}
public:TimeWheel():_tick(0),_capacity(60),_wheel(_capacity){}void AddTask(uint64_t id,uint32_t delay,const OnCloseTime close_cb){PtrTask pt(new TimeTask(delay,id,close_cb));//设置ReleaseTaskpt-SetRelease(std::bind(TimeWheel::RemoveTask,this,id));//把任务添加到数组中int pos (_tick delay) %_capacity;_wheel[pos].push_back(pt);//将id和weakTask映射关联起来_times[id] WeakTask(pt);}void CancelTask(uint64_t id){//通过id找到任务如果没有直接返回有的话将标志置为trueif(_times.find(id) _times.end()) return;PtrTask pt _times[id].lock(); //获得weak_ptr中的shared_ptrpt-Cancel();}void RefreshTask(uint64_t id){//创建一个新的智能指针对象然后添加到数组中//如果在原数组中没有找到那么直接返回if(_times.find(id) _times.end()) return;std::cout找到了定时任务\n;PtrTask pt _times[id].lock(); //获得weak_ptr中的shared_ptrint delay pt-Delay();int pos (_tick delay) %_capacity;_wheel[pos].push_back(pt);}void RunTask(){_tick (_tick1)%_capacity;_wheel[_tick].clear();}
};后序项目需要使用到的时候直接融合进去即可 正则库的简单使用 正则表达式(regular expression)描述了⼀种字符串匹配的模式pattern可以⽤来检查⼀个串是否含有某种⼦串、将匹配的⼦串替换或者从某个串中取出符合某个条件的⼦串等。 正则表达式的使⽤可以使得HTTP请求的解析更加简单不需要更多的操作简化代码编写注释中有简单的使用方法这里不一一赘述 #include iostream
#include string
#include regexint main()
{// 请求 GET /hello/login?userxiaomingpasswd123456 HTTP/1.1\r\nstd::string str get /hello/login?userxiaomingpasswd123456 HTTP/1.1;//提取请求方法std::smatch matches;std::regex e((GET|POST|HEAD|DELETE|PUT) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\r|\r\n)?,std::regex::icase); //匹配请求方法的正则表达式//(GET|POST|HEAD|DELETE|PUT) 匹配其中任意一个请求方法//([^?]*) 匹配非字符 *表示0次或者多次//\\?(.*) 提取?问号之后的字符直到遇到后面的空格//[01] 提起0或者1其中任意一个//(?:\r|\r\n)? ?:表示匹配某个字符串但是不提取 后面的表示匹配0次或者1次bool ret regex_match(str,matches,e);if(ret false) return -1;std::string method matches[1];std::transform(method.begin(),method.end(),method.begin(),::toupper); //转换大小写std::coutmethodstd::endl;for(int i 0;imatches.size();i){std::cout i : ;std::coutmatches[i]std::endl;}return 0;
} 通⽤类型any类型的实现 Connection中需要设置协议处理的上下⽂来控制处理节奏由于应用层协议非常多所以我们需要需要使用通用类型来保存不同的数据结构 实现思想 首先Any类一定不是一个模板类因为模板类的实例化的时候需要传类型但是我们可以通过在Any类中设计一个父类和一个子类其中父类不能是模板类因为Any类访问父类指针的时候就需要父类的类型所以我们可以把子类设计成模板类子类继承父类并通过重写虚函数来实现多态。当需要保存数据时通过new一个带模板参数的子类对象来保存数据然后让Any类中的父类指针指向这个子类对象就可以了。 实现如下 class Any
{
private://父类不是模板类这样可以保证Any类不是模板class holer{public:virtual ~holer() {}virtual const std::type_info type() 0; //设置成纯虚函数那么子类想要实例化就必须要重写虚函数 virtual holer* clone() 0;};template class Tclass placeholer : public holer{public:placeholer(const T val):_val(val){}virtual ~placeholer() {}virtual const std::type_info type() { return typeid(T);}virtual placeholer* clone() { return new placeholer(_val);}public:T _val;};
private:Any swap(Any any){std::swap(_holer,any._holer);return *this;}holer* _holer;
public:Any():_holer(nullptr) {}~Any(){ delete _holer; }Any(const Any any):_holer(any._holer nullptr?nullptr:any._holer-clone()){}template class TAny(const T val):_holer(new placeholerT(val)) {}templateclass TT* get(){assert(typeid(T) _holer-type());return ((placeholerT*)_holer)-_val;}Any operator(Any any){Any(any).swap(*this);return *this;}templateclass TAny operator(const T val){Any(val).swap(*this);return *this;}
}; 4.SERVER服务器实现
日志宏的封装 在项目中为了方便调试我们可以通过日志打印的方式来高效的确定和自己预期不相同的地方关于日志宏在上一个五子棋项目中已经有比较详细的解析主要分成以下几步 1.通过time函数获取时间 2.通过localtime来获取具体的时间即年月日时分秒 3.通过strftime将时间以格式化数据存放到缓冲区中 4.通过打印的方式打印出来同时还要加上文件以及行号线程等 #define INF 0
#define DBG 1
#define ERR 2
#define DEFAULT_LOG_LEVEL DBG
#define LOG(level, format, ...) {\if (level DEFAULT_LOG_LEVEL) {\time_t t time(NULL);\struct tm *m localtime(t);\char ts[32] {0};\strftime(ts, 31, %H:%M:%S, m);\fprintf(stdout, [%p %s %s:%d] format \n, (void*)pthread_self(), ts, __FILE__, __LINE__, ##__VA_ARGS__);\}\
}
#define ILOG(format, ...) LOG(INF, format, ##__VA_ARGS__);
#define DLOG(format, ...) LOG(DBG, format, ##__VA_ARGS__);
#define ELOG(format, ...) LOG(ERR, format, ##__VA_ARGS__); 缓冲区Buffer类实现 Buffer类⽤于实现用户态缓冲区提供数据缓冲取出等功能后序模块中的发送数据其实是像缓冲区中写入而读取数据则是从缓冲区中取走数据。 模块模型 成员设计 std::vectorchar _buffer; // 缓冲区
uint64_t _read_idx; // 读偏移
uint64_t _write_idx; // 写偏移 模块主要功能 1.获取当前写位置地址 char *WritePosition() { return Begin() _write_idx; } 2.确保可写空间足够 uint64_t ReadAbleSize() { return _write_idx - _read_idx; } 如果不够就先移动数据因为前面和后面都是有空余空间的如果还不够那就直接扩容 void EnsureWriteSpace(uint64_t len) 3.获取末尾空闲空间大小 总空间-写偏移 uint64_t TailIdleSpace() { return _buffer.size() - _write_idx; } 4.获取前面的空闲空间大小读偏移 uint64_t HeadIdleSpace() { return _read_idx; } 5.将写位置向后移动指定长度这里就是写入数据之后把可写下标移动 void MoveWriteOffSet(uint64_t len) 6.获取当前读位置地址 char *ReadPosition() { return Begin() _read_idx; } 7.获取可读数据大小可读到可写的区间 uint64_t ReadAbleSize() { return _write_idx - _read_idx; } 8.将读位置向后移动到指定长度处理完数据之后移动 void MoveReadOffSet(uint64_t len) 9.清理缓冲区把读写偏移放到0即可 void Clear() 由于当前模块实现并不难这里只有一个函数需要注意一下 如果确保可写空间足够 扩容策略 这样就可以高效的使用内存了具体代码如下 // 确保可写空间足够头部和尾部的空间够则移动数据不够则扩容void EnsureWriteSpace(uint64_t len){if (len TailIdleSpace())return;else if (len TailIdleSpace() HeadIdleSpace()){// 将数据拷贝到最前面int sz ReadAbleSize();std::copy(ReadPosition(), ReadPosition() sz, Begin());// 设置读写偏移_read_idx 0;_write_idx sz;}else{// 扩容_buffer.resize(_write_idx len);}} 其他部分的缓冲区模块实现起来比较简单后面会有源码 套接字Socket实现 在使用网络通信中我们需要Socket的接口为了方便使用所以统一封装了一个套接字类 成员设计 int _sockfd; // 套接字 主要接口 1.创建套接字 bool Create()
// int socket(int domain, int type, int protocol); 2.绑定地址信息 bool Bind(const std::string ip, uint16_t port)
// int bind(int sockfd, const struct sockaddr *addr,socklen_t addrlen); 3.开始监听 bool Listen(int backlog MAX_LISTEN)
// int listen(int sockfd, int backlog); 4.向服务器发起连接 bool Connect(const std::string ip, uint16_t port)
// int connect(int sockfd, const struct sockaddr *addr,socklen_t addrlen); 5.获取新连接 int Accept()
// int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen); 6.接收数据 ssize_t Recv(void *buf, size_t len, int flag 0)
// ssize_t recv(int sockfd, void *buf, size_t len, int flags); 7.发送数据 ssize_t Send(const void *buf, size_t len, int flag 0)
// ssize_t send(int sockfd, const void *buf, size_t len, int flags); 8.关闭套接字 void Close() 9.创建一个服务端连接可以通过该接口快速创建一个服务端套接字 bool CreateSerber(uint16_t port, const std::string ip 0.0.0.0, bool flag false)
// 1.创建套接字 2.绑定地址 3.监听 4.设置地址重用 5.设置非阻塞 10.创建一个客户端连接可以通过该接口快速创建一个客户端套接字 bool CreateClient(uint16_t port, const std::string ip)
// 1.创建套接字 2.连接 11.开启地址端口重用 如果是服务器先关闭为了快速重启服务器可以选择 void ReuseAddr()
//通过使用该函数来实现
//int setsockopt(int sockfd, int level, int optname,const void *optval, socklen_t optlen); 12.设置非阻塞 给套接字设置因为我们使用的epoll是需要设置套接字非阻塞的 void SetNonBlock()
// int fcntl(int fd, int cmd, ... /* arg */ ); 事件管理Channel模块实现 结合EventLoop模块通过对描述符的监控如果某一个描述符事件就绪那么我们只需要调用对应的回调函数即可 成员设计 1.需要对应监控的描述符 2.EventLoop模块进行监控 3.用一个变量来记录监控事件本质是位图哪一个事件需要监控哪一个bit就置为1 4.用一个变量记录就绪事件 5.各种回调函数 int _fd; //监控的描述符
EventLoop* _loop;
uint32_t _event; //监控事件
uint32_t _revent; //已经发生的事件
//各种回调函数
using EventCallback std::functionvoid();
EventCallback _read_callback; //读事件回调
EventCallback _write_callback; //写事件回调
EventCallback _error_callback; //错误事件回调
EventCallback _close_callback; //关闭事件回调
EventCallback _event_callback; //任意事件回调 主要接口 1.设置各种回调函数以及返回描述符设置事件等 void SetReadCallback(const EventCallback cb) { _read_callback cb; }
void SetWriteCallback(const EventCallback cb) { _write_callback cb; }
void SetErrorCallback(const EventCallback cb) { _error_callback cb; }
void SetCloseCallback(const EventCallback cb) { _close_callback cb; }
void SetEventCallback(const EventCallback cb) { _event_callback cb; }
int Fd() { return _fd; }
uint32_t Events() { return _event; }
void SetRevent(uint32_t revent) { _revent revent; } 2.当前是否监控了可读 bool ReadAble() { return _event EPOLLIN;} 3.当前是否监控了可写 bool WriteAble() { return _event EPOLLOUT;} 4.启动可读事件监控 void SetRead() { _event | EPOLLIN; Update();} 5.启动可写事件监控 void SetWrite() { _event | EPOLLOUT; Update();} 6.关闭可读事件监控 void CloseRead() { _event ~EPOLLIN ; Update();} 7.关闭可写事件监控 void CloseWrite() { _event ~EPOLLOUT; Update();} 8.关闭全部事件监控 void CloseEvent() { _event 0; Update();} 9.移除监控因为这里的移除是通过EventLoop模块进行移除所以需要在类外实现 10.更新事件和移除监控一样 void Remove();
void Update(); 11.处理任意事件 void HandleEvent() 其他的实现比较简单这里处理任意事件的逻辑应该如下 这里处理任意事件的本质就是刷新活跃度因为后面实现每一个任务的时候是把任务压入到任务池当中所以这里可以直接执行任务而不需要刷新活跃度等到事件处理完成之后再刷新活跃度这样就可以防止一个事件可能会处理的事件很长但是却提前刷新了活跃度导致后序结果有问题。 具体实现如下 void HandleEvent()
{//这里因为是把销毁任务压入到任务池中执行所以这里可以直接执行任务而不需要先刷新活跃度//满足条件都会触发的if((_revent EPOLLIN) || (_revent EPOLLRDHUP) || (_revent EPOLLPRI)){if(_read_callback) _read_callback();}//有可能释放连接的操作一次只能处理一个if(_revent EPOLLOUT){if(_write_callback) _write_callback();}else if(_revent EPOLLERR){if(_error_callback) _error_callback();}else if(_revent EPOLLHUP){if(_close_callback) _close_callback();}//所有事件处理过都需要刷新活跃度if(_event_callback) _event_callback();
} 描述符事件监控Poller模块实现 通过epoll来对描述符的IO事件监控 成员设计 1.需要一个epoll操作句柄 2.拥有一个struct epoll_event 结构数组监控保存所有活跃事件 3.通过hash表管理描述符以及Channel对象的管理 int _epfd;
struct epoll_event _evs[MAX_EVENTSIZE];
std::unordered_mapint,Channel* _channels; //描述符和channel的映射关系 接口设计 1.构造函数创建一个epoll模型 Poller(){//创建一个epoll模型_epfd epoll_create(MAX_EVENTSIZE);if(_epfd 0){ELOG(EPOLL_CREATE FAILED:%s,strerror(errno));abort(); //退出程序}} 2.添加或者修改监控事件在channels中找不到就添加找到就修改 void UpdateEvent(Channel* channel) 3.移除监控事件 删除hash映射关系同时移除epoll监控 void RemoveEvent(Channel* channel) 4.开始监控返回活跃连接 void Poll(std::vectorChannel** active)
//int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout); 5.对epoll的操作 上面函数有对epoll的操作统一使用这个接口设计成私有函数 void Update(Channel* channel,int op)// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); 6.判断一个事件是否被监控 设计为私有函数 bool HasChannel(Channel* channel) 开始监控函数需要我们注意应该有以下流程 通过epoll_wait监控得到的事件首先我们需要通过Channel来设置事件就绪然后再返回活跃连接给上层进行处理 void Poll(std::vectorChannel** active){//int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);int nfds epoll_wait(_epfd,_evs,MAX_EVENTSIZE,-1);if(nfds 0){//这里有可能是信号打断if(errno EINTR){return;}//其他原因ELOG(EPOLL_WAIT FAILED! %s,strerror(errno));abort(); //退出程序}for(int i 0;infds;i){auto it _channels.find(_evs[i].data.fd);assert(it ! _channels.end());//向channel设置就绪事件it-second-SetRevent(_evs[i].events);//向外输出活跃事件的channelactive-push_back(it-second);}} 定时任务管理TimerWheel类实现 在前面我们已经实现了定时任务以及时间轮了这里我们只需要把他们融合到一起即可 这里主要说明两个回调函数的含义 using OnCloseTime std::functionvoid(); //定时器要执行的任务
using ReleaseTime std::functionvoid(); //删除时间管理对象中weak_ptr的信息 其中OnCloseTime的回调就是每一个任务执行的时候销毁定时任务也就是把智能指针中的引用计数--就可如果减到0就销毁这个任务 ReleaseTime回调就是当引用计数减到0的时候需要销毁这个定时任务的时候同时我们需要把hash表中的id和WeakTask去除关联。 这里对TimeWheel模块进行解析 成员设计 1.需要一个数组来充当时间轮 2.我们需要使用到shared_ptr同时我们每次刷新活跃度的时候都需要创建一个shared_ptr所以我们需要一个weak_ptr并保存起来 3.记录id和weak_ptr的映射关闭这样我们可以随时通过id来创建一个shared_ptr 4.因为我们添加取消刷新任务的时候我们需要在线程中执行所以我们需要EventLoop模块指针 5.我们需要对定时器事件进行管理需要管理指针 成员如下 using WeakTask std::weak_ptrTimeTask; //使用weak_ptr防止在shared_ptr直接对对象的操作using PtrTask std::shared_ptrTimeTask; //使用shared_ptr保证释放时不到0不销毁int _capacity; //记录时间轮的大小int _tick; //记录当前指针指向的时间指到哪里销毁哪里std::vectorstd::vectorPtrTask _wheel; //时间轮std::unordered_mapuint64_t,WeakTask _times; //记录id和weak_ptr之间的映射关系int _timerfd; //定时器描述符EventLoop* _loop;std::unique_ptrChannel _timer_channel; //用于定时器事件管理 功能 1.添加任务2.删除任务3.刷新任务 因为这些接口需要在各自的线程中执行所以我们需要添加到线程中这样就不需要考虑线程安全问题了。 //定时器任务对象
class TimeTask
{
private:uint32_t _timeout; //超时时间uint64_t _id; //每个任务的idbool _cancel; //取消定时任务OnCloseTime _close_cb; //销毁定时任务的回调ReleaseTime _release_cb; //因为时间轮中会记录一个weak_ptr对象所以最后需要销毁
public:TimeTask(uint32_t timeout,uint64_t id,const OnCloseTime close_cb):_timeout(timeout),_id(id),_close_cb(close_cb),_cancel(false){}~TimeTask(){ if(_cancel false) _close_cb();_release_cb();}uint64_t id() { return _id; }void SetRelease(const ReleaseTime release_cb) { _release_cb release_cb; }uint32_t Delay() { return _timeout; }void Cancel() { _cancel true; }
};//时间轮管理对象
class TimeWheel
{
private:using WeakTask std::weak_ptrTimeTask; //使用weak_ptr防止在shared_ptr直接对对象的操作using PtrTask std::shared_ptrTimeTask; //使用shared_ptr保证释放时不到0不销毁int _capacity; //记录时间轮的大小int _tick; //记录当前指针指向的时间指到哪里销毁哪里std::vectorstd::vectorPtrTask _wheel; //时间轮std::unordered_mapuint64_t,WeakTask _times; //记录id和weak_ptr之间的映射关系int _timerfd; //定时器描述符EventLoop* _loop;std::unique_ptrChannel _timer_channel; //用于定时器事件管理
private:void RemoveTask(uint64_t id){auto it _times.find(id);if(it ! _times.end()){_times.erase(it);}}static int CreateTimerFd(){//int timerfd_create(int clockid, int flags);int timerfd timerfd_create(CLOCK_MONOTONIC,0);if(timerfd 0){ELOG(timerfd_create fail);abort();}//int timerfd_settime(int fd, int flags,const struct itimerspec *new_value,struct itimerspec *old_value);//设置结构体struct itimerspec itim;itim.it_value.tv_sec 1;itim.it_value.tv_nsec 0; //设置第一次超时时间itim.it_interval.tv_sec 1;itim.it_interval.tv_nsec 0; //第一次超时之后每隔1秒超时一次timerfd_settime(timerfd,0,itim,nullptr);return timerfd;}int ReadTimerFd(){uint64_t times;int ret read(_timerfd,times,8);if(ret 0){ELOG(READTIMERFD FAILED!);abort();}return times;}void RunTask(){_tick (_tick1)%_capacity;_wheel[_tick].clear();}void Ontime(){//读取timerfd中内容根据实时的超时次数执行任务,这里防止服务器因为处理繁忙而导致这里只进行了一次的的刷新必须要刷新够次数int times ReadTimerFd();for(int i 0;itimes;i){RunTask();}}void AddTaskInLoop(uint64_t id,uint32_t delay,const OnCloseTime close_cb){PtrTask pt(new TimeTask(delay,id,close_cb));//设置ReleaseTaskpt-SetRelease(std::bind(TimeWheel::RemoveTask,this,id));//把任务添加到数组中int pos (_tick delay) %_capacity;_wheel[pos].push_back(pt);//将id和weakTask映射关联起来_times[id] WeakTask(pt);}void CancelTaskInLoop(uint64_t id){//通过id找到任务如果没有直接返回有的话将标志置为trueauto it _times.find(id);if(it _times.end()) return;PtrTask pt it-second.lock(); //获得weak_ptr中的shared_ptrif(pt) pt-Cancel();}void RefreshTaskInLoop(uint64_t id){//创建一个新的智能指针对象然后添加到数组中//如果在原数组中没有找到那么直接返回auto it _times.find(id);if(it _times.end()) return;PtrTask pt it-second.lock(); //获得weak_ptr中的shared_ptrint delay pt-Delay();int pos (_tick delay) %_capacity;_wheel[pos].push_back(pt);}
public:TimeWheel(EventLoop* loop):_tick(0),_capacity(60),_wheel(_capacity),_timerfd(CreateTimerFd()),_loop(loop),_timer_channel(new Channel(_loop,_timerfd)){//设置读回调并启动读监控_timer_channel-SetReadCallback(std::bind(TimeWheel::Ontime,this));_timer_channel-SetRead();}//因为当前类中有使用到数据结构为了保证线程安全而又不用加锁的方式来提高效率那么我们让其在一个线程中执行void AddTask(uint64_t id,uint32_t delay,const OnCloseTime close_cb);void CancelTask(uint64_t id);void RefreshTask(uint64_t id);//这个接口存在线程安全问题只能在EventLoop模块中使用bool HasTimer(uint64_t id){auto it _times.find(id);if(it _times.end()){return false;}return true;}
}; EventLoop线程池类实现 这个模块设计思想 1.在线程中对描述符进行事件监控 2.有描述符就绪则对描述符进行事件处理要保证操作都在线程中执行保证线程安全 3.所有事件处理完成之后再讲任务队列中的任务一一执行前面的处理实际上是把任务压入到任务队列然后在线程中执行 通过下图来理解 这样就可以保证线程安全了因为只需要提供一把锁就可以保证就是对task保护线程来取出数据过程中是有可能有线程安全问题的。 类成员设计 1.要进行事件监控 -Poller模块 2.执行任务队列中的任务 - 一个线程安全的任务队列 3.添加任务的时候需要定时器需要一个事件轮 4.需要有一个事件通知的描述符来唤醒事件监控的阻塞 类功能 1.需要创建一个描述符就行事件通知来唤醒事件监控的阻塞 2.该类最主要的函数就是start函数在这个函数中通过Poller监控的就绪事件然后分别调用对应的回调函数对事件进行处理压入到任务队列中然后再将任务队列中的任务放到线程中一一执行 class EventLoop
{
private:using Func std::functionvoid(); std::thread::id _thread_id; //线程IDPoller _poller;int _event_fd; std::unique_ptrChannel _event_channel; //通过channel来管理eventfdstd::vectorFunc _tasks; //任务队列std::mutex _mutex; //保证任务队列的线程安全TimeWheel _time_wheel; //时间轮
private:static int CreateEventfd(){int efd eventfd(0,EFD_CLOEXEC | EFD_NONBLOCK);if(efd 0){ELOG(eventfd failed!);abort(); //退出程序}return efd;}void ReadEventFd(){uint64_t res 0;int ret read(_event_fd,res,sizeof(res));if(ret 0){//信号打断或者读阻塞if(errno EINTR || errno EAGAIN){return;}ELOG(READEVENTFD FAILED!);abort();}}void WeakUpEventFd(){uint64_t val 1;int ret write(_event_fd,val,sizeof(val));if(ret 0){if(errno EINTR){return;}ELOG(WEAKUPEVENTFD FAILED!);abort();}}//执行任务队列中的所有任务void RunAllTask() {std::vectorFunc functor;{std::unique_lockstd::mutex lock(_mutex);_tasks.swap(functor);}//执行任务for(auto f:functor) f();}
public:EventLoop():_thread_id(std::this_thread::get_id()),_event_fd(CreateEventfd()),_event_channel(new Channel(this,_event_fd)),_time_wheel(this){//给eventChannel设置回调函数_event_channel-SetReadCallback(std::bind(EventLoop::ReadEventFd,this));//启动读事件监控_event_channel-SetRead();}//如果要执行的任务在当前线程那就直接执行不在就压入任务队列void RunInLoop(const Func cb){if(IsInloop()) cb();else QueueInLoop(cb);}//断言一个线程是否在当前线程中void AssertInLoop(){assert(_thread_id std::this_thread::get_id());}//压入任务队列void QueueInLoop(const Func cb){{std::unique_lockstd::mutex lock(_mutex);_tasks.push_back(cb);}//唤醒有可能因为事件还没有就绪的阻塞线程WeakUpEventFd();}//判断当前线程是否是EventLoop线程bool IsInloop() { return _thread_id std::this_thread::get_id();}//添加/更新监控事件void UpdateEvent(Channel* channel) { return _poller.UpdateEvent(channel);}//移除监控事件void RemoveEvent(Channel* channel) { return _poller.RemoveEvent(channel);}//事件监控 -就绪事件处理 - 执行任务void Start() {while(1){std::vectorChannel* actives;_poller.Poll(actives);//就绪事件处理for(auto a:actives){a-HandleEvent();}//执行任务RunAllTask();}}void TimerAdd(uint64_t id,uint32_t delay,const OnCloseTime close_cb) { _time_wheel.AddTask(id,delay,close_cb); }void TimerRefresh(uint64_t id) { _time_wheel.RefreshTask(id); }void TimerCancel(uint64_t id) { _time_wheel.CancelTask(id); }bool HasTimer(uint64_t id) { return _time_wheel.HasTimer(id); }
}; 他们之间的模块关系图 LoopThread模块实现 这个模块的意义因为EventLoop模块在实例化对象的时候必须是在线程内部如果我们创建了多个EventLoop对象又同时创建多个线程将各个线程id重新给EventLoop进行设置那么在构造EventLoop对象到设置新的thread_id期间是不可控的。 我们必须先创建线程然后在线程的入口函数中进行EventLoop对象的实例化 类成员设计 1.创建一个线程 2.为了保证先实例了loop之后外界才能获取到所以我们需要使用到条件变量以及互斥锁 类功能 1.我们需要创建一个线程绑定线程入口函数保证了先创建线程再在线程中实例化EventLoop这就是One Thread One Loop思想 2.提供给外部获得到实例化的EventLoop对象需要使用条件变量控制 class LoopThread
{
private://实现获取loop和构造函数的同步关系保证先实例化了loop之后才能获取std::mutex _mutex;std::condition_variable _cond;EventLoop* _loop; std::thread _thread; //一个线程对应一个loop
private:void ThreadEntry(){//1.创建loop 2.通过条件变量来唤醒等待线程 3.运行EventLoop loop; //这里的临时变量生命周期跟随LoopThread{std::unique_lockstd::mutex lock(_mutex);_cond.notify_all();_loop loop;}_loop-Start();}
public:LoopThread():_loop(nullptr),_thread(std::bind(LoopThread::ThreadEntry,this)){}EventLoop* GetLoop(){EventLoop* loop;{std::unique_lockstd::mutex lock(_mutex);//通过条件变量保证同步_cond.wait(lock,[](){ return _loop ! nullptr; });loop _loop;}return loop;}
}; LoopThreadPool线程池模块实现 对所有的LoopThread进行管理分配 类成员设计 1.用户可以根据需求来创建线程的数量 2.我们这里使用RR轮转的方式进行分配 3.根据主从Reactor模型首先我们需要一个主Reactor是一直进行工作的 4.全部的线程都需要进行管理使用数组进行管理 类功能 1.根据用户需要的线程数量来创建线程 2.根据RR轮转来给用户提供从属EventLoop class LoopThreadPool
{
private:int _thread_count; //创建的LoopThread数量int _next_idx; //采用RR轮转的方式进行分配EventLoop* _base_loop; //主EventLoop跟随主线程std::vectorLoopThread* _threads; //管理全部的线程std::vectorEventLoop* _loops; //管理从属EventLoop
public:LoopThreadPool(EventLoop* loop):_base_loop(loop),_thread_count(0),_next_idx(0){}void SetThreadCount(int count) { _thread_count count; }//根据数量来创建出对应的LoopThreadvoid Create(){if(_thread_count0){_threads.resize(_thread_count);_loops.resize(_thread_count);for(int i 0;i_thread_count;i){_threads[i] new LoopThread();_loops[i] _threads[i]-GetLoop();}}}EventLoop* NextLoop(){if(_thread_count 0) return _base_loop;_next_idx (_next_idx 1)% _thread_count;return _loops[_next_idx];}
}; 通信连接管理Connection模块实现 Connection模块是Server模块中最重要的模块存在的意义对连接进行管理所有的操作都是通过这个模块完成的 类成员设计管理方式 1.对套接字的管理能进行套接字的各种操作 2.连接事件的管理如可读可写错误挂断任意事件 3.缓冲区的管理通过socket对数据进行接收发送 4.协议上下文的管理记录数据处理的过程即使用户没有一次性把数据全部发送过来但是 也可以保存当前处理的阶段方便下次处理 5.回调函数的管理各种情况应该如何处理交给用户决定必须有调用业务处理的回调函数 6.连接状态的管理不同的连接状态有不同的限制 7.是否启动非活跃连接超时销毁 类功能设计 1.发送数据提供发送数据接口但是这个并不是真正发送接口而是把数据发送到发送缓冲区然后启动写事件监控等待写事件就绪进行发送 2.关闭连接给用户提供的关闭连接接口但是并不是实际的关闭而是先看看输入输出缓冲区中有没有数据等待处理如果有先处理然后关闭连接 3.启动非活跃连接超时销毁 4.取消非活跃连接超时销毁 5.协议切换一个连接接收数据之后如何进行处理取决于上下文以及业务处理回调函数 6.这里需要注意的是大多说的处理其实都是在线程中执行的所以我们都需要通过bind让其在线程中执行。 class Connection;//DISCONNECTED 断开连接状态
//CONNECTING 连接建立但是未完成全部工作的过渡态
//CONNECTED 连接建立完成可以通信状态
//DISCONNECTING 连接待关闭状态等待处理后序工作之后断开连接
typedef enum {DISCONNECTED,CONNECTING,CONNECTED,DISCONNECTING }ConnectStatus;
using PtrConnection std::shared_ptrConnection;
class Connection : public std::enable_shared_from_thisConnection
{
private:int _conn_id; //连接建立的唯一id//uint64_t _timer_id 这里为了简化使用直接使用connid来作为timeridint _socketfd; //连接对应的文件描述符bool _enable_active_release; //是否启动非活跃连接超时销毁EventLoop* _loop; //连接所关联的线程Socket _socket; //套接字管理Channel _channel; //事件管理ConnectStatus _status; //连接状态Buffer _in_buffer; //输入缓冲区从套接字中读取然后放入到缓冲区中Buffer _out_buffer; //输出缓冲区将待发送数据放到输出缓冲区Any _context; //连接上下文
private://事件处理回调函数using ConnectCallback std::functionvoid(const PtrConnection);using MessageCallback std::functionvoid(const PtrConnection,Buffer*);using ClosedCallback std::functionvoid(const PtrConnection);using AnyEventCallback std::functionvoid(const PtrConnection);ConnectCallback _connect_callback;MessageCallback _msg_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;//组件内关闭回调函数组件内使用因为使用智能指针进行Connect的管理一旦关闭就应该在管理的地方进行删除ClosedCallback _server_event_callback;//为了保证线程安全每一个接口函数都应该放入到一个线程中void SendInLoop(Buffer buf){//这里并不是实际的发送接口而是把数据放到发送缓冲区中if(_status DISCONNECTED) return; _out_buffer.WriteBufferAndPush(buf);//启动写事件监控if(_channel.WriteAble() false) _channel.SetWrite();}void ShutDownInLoop(){//这里也不是实际的关闭操作而是将待处理的数据处理待发送的数据发送_status DISCONNECTING; //处于待关闭状态if(_in_buffer.ReadAbleSize() 0){if(_msg_callback) _msg_callback(shared_from_this(),_in_buffer);}if(_out_buffer.ReadAbleSize() 0){//启动写事件监控if(_channel.WriteAble() false) _channel.SetWrite();}//关闭连接不管数据有没有处理完因为这里的数据可能不完整不需要处理了if(_out_buffer.ReadAbleSize() 0) Release();}void ReleaseInLoop(){//1.修改连接状态_status DISCONNECTED;//2.移除事件监控_channel.Remove();//3.关闭描述符_socket.Close();//4.取消定时销毁任务if(_loop-HasTimer(_conn_id)) CancelActiveReleaseInLoop();//5.调用关闭的回调函数if(_closed_callback) _closed_callback(shared_from_this());//组件内调用的关闭函数if(_server_event_callback) _server_event_callback(shared_from_this());}void EstablishedInLoop(){//1.修改状态assert(_status CONNECTING);_status CONNECTED;//2.启动读事件监控_channel.SetRead();//3.调用连接成功回调函数if(_connect_callback) _connect_callback(shared_from_this());}void SetActiveReleaseInloop(int sec){//1.修改判断标志位_enable_active_release true;//2.如果定时任务存在那就延迟一下if(_loop-HasTimer(_conn_id)){return _loop-TimerRefresh(_conn_id);}//3.不存在就添加_loop-TimerAdd(_conn_id,sec,std::bind(Connection::Release,this));}void CancelActiveReleaseInLoop(){_enable_active_release false;if(_loop-HasTimer(_conn_id)) _loop-TimerCancel(_conn_id);}void UpgradeInLoop(const Any context,const ConnectCallback conn,const MessageCallback msg, const ClosedCallback closed,const AnyEventCallback event){//修改各个类成员变量即可_context context;_connect_callback conn;_msg_callback msg;_closed_callback closed;_event_callback event;}//五个Channel事件回调函数//将socket数据放到接收缓冲区中调用message_callback进行消息的读取void HandlerRead(){//1.把数据放入到接收缓冲区char buffer[65536];ssize_t ret _socket.NonBlockRecv(buffer,65535);if(ret 0){//出错了不能直接关闭而是调用ShutDownInLoop()return ShutDownInLoop();}//如果接收到的数据是0就不需要进行消息处理//2.调用message_callback_in_buffer.WriteAndPush(buffer,ret);if(_in_buffer.ReadAbleSize() 0){_msg_callback(shared_from_this(),_in_buffer);}}//将发送缓冲区的数据发送void HandlerWrite(){//将发送缓冲区中的数据发送出去ssize_t ret _socket.NonBlockSend(_out_buffer.ReadPosition(),_out_buffer.ReadAbleSize());if(ret 0){//出错了就看接收缓冲区有没有实际要处理的数据有的话处理完就实际关闭连接if(_in_buffer.ReadAbleSize() 0){_msg_callback(shared_from_this(),_in_buffer);}return Release();}//记得把写偏移移动_out_buffer.MoveReadOffSet(ret);if(_out_buffer.ReadAbleSize() 0){_channel.CloseWrite(); //关闭写事件监控//如果是连接待关闭状态则关闭连接if(_status DISCONNECTING){return Release();}}}//描述符挂断事件处理void HandlerClose(){if(_in_buffer.ReadAbleSize() 0){_msg_callback(shared_from_this(),_in_buffer);}return Release();}//描述符错误事件处理void HandlerError() { HandlerClose(); }//描述符触发任意事件void HandlerEvent() { //1.判断是否需要刷新活跃度if(_enable_active_release true) { _loop-TimerRefresh(_conn_id);}//2.调用组件使用者的任意事件回调if(_event_callback) { return _event_callback(shared_from_this()); }} public:Connection(int connid,int socketfd,EventLoop* loop):_conn_id(connid),_socketfd(socketfd),_enable_active_release(false),_loop(loop),_socket(socketfd),_channel(_loop,_socketfd),_status(CONNECTING){//设置channel回调函数_channel.SetReadCallback(std::bind(Connection::HandlerRead,this));_channel.SetWriteCallback(std::bind(Connection::HandlerWrite,this));_channel.SetErrorCallback(std::bind(Connection::HandlerError,this));_channel.SetCloseCallback(std::bind(Connection::HandlerClose,this));_channel.SetEventCallback(std::bind(Connection::HandlerEvent,this));}~Connection() { DLOG(CONNECTION RELEASE :%p,this); }//成员变量的接口 int Fd() { return _socketfd; }//获取连接Idint Id() { return _conn_id; }//设置上下文void SetContext(const Any context) { _context context; }//获取上下文信息Any* Context() { return _context; }//判断当前是否是连接状态bool IsConnected() { return _status CONNECTED; }//设置各种回调函数void SetConnectCallback(const ConnectCallback cb) { _connect_callback cb; }void SetMessageCallback(const MessageCallback cb) { _msg_callback cb; }void SetClosedCallback(const ClosedCallback cb) { _closed_callback cb; }void SetEventCallback(const AnyEventCallback cb) { _event_callback cb; }void SetSvrCallback(const ClosedCallback cb) { _server_event_callback cb; }//发送数据,将数据放到发送缓冲区启动写事件监控void Send(const char* data,size_t len){//这里外面传过来的是一个临时对象有可能会销毁所以保存一份变量保证安全性Buffer buf;buf.WriteAndPush(data,len);_loop-RunInLoop(std::bind(Connection::SendInLoop,this,std::move(buf)));}//这是提供给组件使用的关闭操作但是并不是实际的关闭需要内部进行处理void ShutDown(){_loop-RunInLoop(std::bind(Connection::ShutDownInLoop,this));}//实际关闭连接的接口销毁时应该放入任务池中在执行完任务之后才销毁否则可能会导致其他任务处理时连接被释放导致内存访问错误void Release(){_loop-QueueInLoop(std::bind(Connection::ReleaseInLoop,this));}//连接建立成功之后设置channel启动读事件监控调用_connect_callbackvoid Established(){_loop-RunInLoop(std::bind(Connection::EstablishedInLoop,this));}//启动非活跃连接销毁void SetActiveRelease(int sec){_loop-RunInLoop(std::bind(Connection::SetActiveReleaseInloop,this,sec));}//取消非活跃连接销毁void CancelActiveRelease(){_loop-RunInLoop(std::bind(Connection::CancelActiveReleaseInLoop,this));}//协议切换void Upgrade(const Any context,const ConnectCallback conn,const MessageCallback msg, const ClosedCallback closed,const AnyEventCallback event){//因为协议切换是需要放在线程中并且应该立即执行否则用户切换协议之前的数据处理就没有意义了_loop-AssertInLoop();_loop-RunInLoop(std::bind(Connection::UpgradeInLoop,this,context,conn,msg,closed,event));}
}; 其中模块之间的关系图帮助理解 监听描述符管理Acceptor类实现 Acceptor模块只进行监听连接的管理有事件新的连接到来就调用对应的回调函数进行处理即可 类成员设计 1.创建一个监听套接字 2.需要一个新连接处理的回调函数 类功能 1.创建一个监听套接字 2.启动读事件监控 3.事件触发获取新连接 4.调用对应的回调函数 对于新连接如何处理应该是服务器模块来进行管理的 class Acceptor
{
private:EventLoop* _loop;Socket _socket;Channel _channel;using Accept_callback std::functionvoid(int);Accept_callback _accept_cb;
private:int CreateSocket(int port){bool ret _socket.CreateSerber(port);assert(ret);return _socket.Fd();}void HandlerRead(){int newfd _socket.Accept();if(newfd 0) return;if(_accept_cb) _accept_cb(newfd);}
public://构造函数不能立刻启用可读事件监控否则这里有可能导致回调函数还没有设置此时如果立刻有连接到来会导致newfd没有得到处理最终资源泄露Acceptor(EventLoop* loop,int port):_loop(loop),_socket(CreateSocket(port)),_channel(_loop,_socket.Fd()){_channel.SetReadCallback(std::bind(Acceptor::HandlerRead,this));}void SetAcceptCallback(const Accept_callback cb) { _accept_cb cb; }//开始监听,启动读事件监控void Listen(){_channel.SetRead();}
}; 模块之前的关系图 TcpServer模块 对所有模块的整合通过该模块实例化对象可以非常简单的完成一个服务器搭建 类成员设计管理 1.Acceptor对象创建一个监听套接字 2.EventLoop对象baseloop对象实现对监听套接字的事件监控 3.通过hash表来实现对新连接的管理 4.LoopThreadPool对象创建loop线程池对新连接进行事件监控以及处理 类功能 1.设置从属线程池数量 2.启动服务器 3.设置各种回调函数连接建立完成消息关闭任意 4.是否启动非活跃连接超时销毁功能 5.添加定时任务功能 class TcpServer
{
private:uint64_t _next_id; int _timeout; //销毁时间bool _enable_active_release; //是否其实非活跃超时销毁EventLoop _base_loop; //主线程Acceptor _acceptor; //监听套接字管理的对象LoopThreadPool _pool; //从属线程std::unordered_mapuint64_t,PtrConnection _conns; //管理连接using ConnectCallback std::functionvoid(const PtrConnection);using MessageCallback std::functionvoid(const PtrConnection,Buffer*);using ClosedCallback std::functionvoid(const PtrConnection);using AnyEventCallback std::functionvoid(const PtrConnection);using Functor std::functionvoid();ConnectCallback _connect_callback;MessageCallback _msg_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;
private://新连接构造一个connection管理void NewConnection(int newfd){_next_id;PtrConnection conn(new Connection(_next_id,newfd,_pool.NextLoop()));conn-SetConnectCallback(_connect_callback);conn-SetMessageCallback(_msg_callback);conn-SetClosedCallback(_closed_callback);conn-SetEventCallback(_event_callback);conn-SetSvrCallback(std::bind(TcpServer::RemoveConnection,this,std::placeholders::_1));if(_enable_active_release) conn-SetActiveRelease(_timeout);conn-Established();_conns.insert(std::make_pair(_next_id,conn));}//关闭时调用去除管理Connectionvoid RemoveConnection(const PtrConnection conn){_base_loop.RunInLoop(std::bind(TcpServer::RemoveConnectionInLoop,this,conn));}void RemoveConnectionInLoop(const PtrConnection conn){int id conn-Id();_conns.erase(id);}void RunAfterInLoop(const Functor func,int delay){_next_id;_base_loop.TimerAdd(_next_id,delay,func);}
public:TcpServer(int port):_next_id(0),_enable_active_release(false),_acceptor(_base_loop,port),_pool(_base_loop){_acceptor.SetAcceptCallback(std::bind(TcpServer::NewConnection,this,std::placeholders::_1));_acceptor.Listen(); //将套接字挂到loop中}void SetConnectCallback(const ConnectCallback cb) { _connect_callback cb; }void SetMessageCallback(const MessageCallback cb) { _msg_callback cb; }void SetClosedCallback(const ClosedCallback cb) { _closed_callback cb; }void SetEventCallback(const AnyEventCallback cb) { _event_callback cb; }//设置线程数量void SetThreadCount(int count) { _pool.SetThreadCount(count); }//设置非活跃超时销毁void SetActiveRelease(int timeout) { _timeout timeout; _enable_active_release true; }//添加一个任务void RunAfter(const Functor func,int delay){_base_loop.RunInLoop(std::bind(TcpServer::RunAfterInLoop,this,func,delay));}void Start() { _pool.Create(); _base_loop.Start();}
}; HTTP协议模块实现
Util⼯具类实现 工具类中主要包含我们后面可能使用到的函数所以统一封装成static成员 成员函数如下 1.对字符串进行分割 -对url进行解析的时候需要根据特殊字符进行分割 2.读取文件中的内容 -当用户需要获取静态资源页面的时候 3.向文件中写入数据 -当用户需要传输大文件的时候需要把外面的文件写入到服务器中 4.url编码 - 在网络传输过程中需要对特殊字符进行编码 5.url解码 - 有编码就一定要在特定时候进行解码 6.通过响应状态码获取响应信息 - 在进行响应的时候需要响应状态码已经对应的响应描述 7.通过文件名获取mime -在前端展示的时候需要知道这是一个什么类型的文件响应头部中 需要进行设置 (6.7需要提前准备好hash表把所以的kv都放入到hash中方便查找同时应该在全局初始化这样的话就可以防止每一次都开辟空间造成资源的浪费 8.判断一个文件是否是目录 -在文件上传的时候需要保证是文件而不是目录 9.判断一个文件是否是目录 -通过Linux中提供的stat函数进行判断 10.判断资源路径的合法性 -访问的资源如果在web根目录的上级就会有问题所以必须保证资源合法性 通过访问的层数进行判断如果访问的层数小于0那么就是非法 //为了提高效率把映射关系定义成全局
std::unordered_mapint, std::string _statu_msg {{100, Continue},{101, Switching Protocol},{102, Processing},{103, Early Hints},{200, OK},{201, Created},{202, Accepted},{203, Non-Authoritative Information},{204, No Content},{205, Reset Content},{206, Partial Content},{207, Multi-Status},{208, Already Reported},{226, IM Used},{300, Multiple Choice},{301, Moved Permanently},{302, Found},{303, See Other},{304, Not Modified},{305, Use Proxy},{306, unused},{307, Temporary Redirect},{308, Permanent Redirect},{400, Bad Request},{401, Unauthorized},{402, Payment Required},{403, Forbidden},{404, Not Found},{405, Method Not Allowed},{406, Not Acceptable},{407, Proxy Authentication Required},{408, Request Timeout},{409, Conflict},{410, Gone},{411, Length Required},{412, Precondition Failed},{413, Payload Too Large},{414, URI Too Long},{415, Unsupported Media Type},{416, Range Not Satisfiable},{417, Expectation Failed},{418, Im a teapot},{421, Misdirected Request},{422, Unprocessable Entity},{423, Locked},{424, Failed Dependency},{425, Too Early},{426, Upgrade Required},{428, Precondition Required},{429, Too Many Requests},{431, Request Header Fields Too Large},{451, Unavailable For Legal Reasons},{501, Not Implemented},{502, Bad Gateway},{503, Service Unavailable},{504, Gateway Timeout},{505, HTTP Version Not Supported},{506, Variant Also Negotiates},{507, Insufficient Storage},{508, Loop Detected},{510, Not Extended},{511, Network Authentication Required}
};
std::unordered_mapstd::string, std::string mim_msg {{.aac, audio/aac},{.abw, application/x-abiword},{.arc, application/x-freearc},{.avi, video/x-msvideo},{.azw, application/vnd.amazon.ebook},{.bin, application/octet-stream},{.bmp, image/bmp},{.bz, application/x-bzip},{.bz2, application/x-bzip2},{.csh, application/x-csh},{.css, text/css},{.csv, text/csv},{.doc, application/msword},{.docx, application/vnd.openxmlformats-officedocument.wordprocessingml.document},{.eot, application/vnd.ms-fontobject},{.epub, application/epubzip},{.gif, image/gif},{.htm, text/html},{.html, text/html},{.ico, image/vnd.microsoft.icon},{.ics, text/calendar},{.jar, application/java-archive},{.jpeg, image/jpeg},{.jpg, image/jpeg},{.js, text/javascript},{.json, application/json},{.jsonld, application/ldjson},{.mid, audio/midi},{.midi, audio/x-midi},{.mjs, text/javascript},{.mp3, audio/mpeg},{.mpeg, video/mpeg},{.mpkg, application/vnd.apple.installerxml},{.odp, application/vnd.oasis.opendocument.presentation},{.ods, application/vnd.oasis.opendocument.spreadsheet},{.odt, application/vnd.oasis.opendocument.text},{.oga, audio/ogg},{.ogv, video/ogg},{.ogx, application/ogg},{.otf, font/otf},{.png, image/png},{.pdf, application/pdf},{.ppt, application/vnd.ms-powerpoint},{.pptx, application/vnd.openxmlformats-officedocument.presentationml.presentation},{.rar, application/x-rar-compressed},{.rtf, application/rtf},{.sh, application/x-sh},{.svg, image/svgxml},{.swf, application/x-shockwave-flash},{.tar, application/x-tar},{.tif, image/tiff},{.tiff, image/tiff},{.ttf, font/ttf},{.txt, text/plain},{.vsd, application/vnd.visio},{.wav, audio/wav},{.weba, audio/webm},{.webm, video/webm},{.webp, image/webp},{.woff, font/woff},{.woff2, font/woff2},{.xhtml, application/xhtmlxml},{.xls, application/vnd.ms-excel},{.xlsx, application/vnd.openxmlformats-officedocument.spreadsheetml.sheet},{.xml, application/xml},{.xul, application/vnd.mozilla.xulxml},{.zip, application/zip},{.3gp, video/3gpp},{.3g2, video/3gpp2},{.7z, application/x-7z-compressed}
};class Util
{
public://分割字符串static size_t Spilt(const std::string src,const std::string sep,std::vectorstd::string* arr){//abc,sdf,ijk ,int offset 0; //偏移量while(offset src.size()){int pos src.find(sep,offset);if(pos std::string::npos){//没找到说明后面的offset后面的字符都是结果arr-push_back(src.substr(offset,pos-offset));return arr-size();}//这里可能有多个重复sepif(offset pos){offset pos sep.size();continue;}//收集结果arr-push_back(src.substr(offset,pos-offset));offset pos sep.size();}return arr-size();}//读取文件内容static bool ReadFile(const std::string filename,std::string* buf){std::ifstream ifs(filename,std::ios::binary);if(!ifs.is_open()){DLOG(OPEN %s FAILED!,filename);return false;}int fsize 0;//先把文件指针偏移到末尾获取到文件的大小ifs.seekg(0,ifs.end);fsize ifs.tellg();//回到起始开始读取文件ifs.seekg(0,ifs.beg);buf-resize(fsize);//读取文件ifs.read((*buf)[0],fsize);if(!ifs.good()){DLOG(READ FILE %s FAILED!,filename);ifs.close();return false;}//记得关闭文件防止资源泄露ifs.close();return true;}//写入文件static bool WriteFile(const std::string filename,const std::string buf){std::ofstream ofs(filename,std::ios::binary | std::ios::trunc);if(!ofs.is_open()){DLOG(OPEN %s FAILED!,filename.c_str());return false;}//向文件写入ofs.write(buf.c_str(),buf.size());if(!ofs.good()){DLOG(WRITEFILE %s FAILED!,filename.c_str());ofs.close();return false;}ofs.close();return true;}//url编码static std::string UrlEnCode(const std::string url,bool convert_space_to_plus){//. - _ ~以及数字字母字符采用绝对不编码convert_space_to_plus - 是否启用空格转std::string ret;for(auto ch: url){if(ch . || ch - || ch _ || ch ~ || isalnum(ch)){ret ch;continue;}//空格转if(convert_space_to_plus ch ){ret ;continue;}//其余都是需要转换的字符char tmp[4] {0};snprintf(tmp,4,%%%02X,ch);ret tmp;}return ret;}static char HexToI(char ch){if(ch 0 ch 9){return ch - 0;}else if(ch a ch z){return ch - a 10;}else if(ch A ch Z){return ch - A 10;}return -1; //错误}//url解码static std::string UrlDeCode(const std::string url,bool convert_plus_to_space){std::string ret;for(int i 0;iurl.size();i){//判断convert_plus_to_space条件是否满足if(convert_plus_to_space url[i] ){ret ;continue;}//遇到%后面的数就把第一个数转换成16进制第二个数相加if(url[i] %){char v1 HexToI(url[i1]);char v2 HexToI(url[i2]);char res (v1 4) v2; ret res;i 2;continue;}//其他情况直接放入结果集即可ret url[i];}return ret;}//通过响应状态码获得响应信息static std::string StatuDesc(int statu){auto it _statu_msg.find(statu);if (it ! _statu_msg.end()) {return it-second;}return Unknow;}static std::string GetMime(const std::string filename){//a.txt 找到文件后缀size_t pos filename.find_last_of(.);if(pos std::string::npos){//没找到就是二进制流数据return application/octet-stream;}std::string tmp filename.substr(pos);auto it mim_msg.find(tmp);if(it mim_msg.end()){return application/octet-stream;}return it-second;}//判断一个文件是否是目录static bool IsDir(const std::string filename){struct stat st;int ret stat(filename.c_str(),st);if(ret 0){return false;}return S_ISDIR(st.st_mode);}//判断一个文件是否是普通文件static bool IsRegular(const std::string filename){struct stat st;int ret stat(filename.c_str(),st);if(ret 0){return false;}return S_ISREG(st.st_mode);}//资源路径的有效性判断static bool IsValPath(const std::string path){//根据层数来判断当前路径是否在web根目录下///index.html /../index.htmlstd::vectorstd::string vs;Spilt(path,/,vs);int level 0;for(auto str:vs){if(str ..){--level;if(level 0) return false; //任意一层小于0都是有问题的continue;}level;}return true;}
}; HttpRequest请求类实现 该模块主要存储HTTP请求信息要素提供简单的功能性接口 请求要素如下 HTTP常见header Content-Type数据类型text/html等。 Content-Length正文的长度。 Host客户端告知服务器所请求的资源是在哪个主机的哪个端口上。 User-Agent声明用户的操作系统和浏览器的版本信息。 Referer当前页面是哪个页面跳转过来的。 Location搭配3XX状态码使用告诉客户端接下来要去哪里访问。 Cookie用户在客户端存储少量信息用于实现会话session的功能。 类成员设计 1.请求方法 2.资源路径url 3.协议版本 4.请求正文 5.正则提取字符串 -我们需要使用使用之前提供过的正则库已经正则表达式来对请求信息进行匹配从而简化代码的编写 6.头部字段 -把请求中的头部字段以kv的方式存储在hash表中方便查询 7.查询字符串 -在url中可能有查询字符串同样以kv的方式存储在hash中 类功能性接口 1.将成员变量设置为公有这样方便直接访问 2.提供头部字段以及查询字符串的插入以及查询功能 3.获取正文长度 4.判断是否为长连接如果是短连接就需要处理完之后直接关闭连接Connection头部字段 Connection close/keep-alive class HttpRequest
{
public:std::string _method; //请求方法std::string _path; //资源路径std::string _version;//协议版本std::string _body; //请求正文std::smatch _matches; //正则提取查询字符串std::unordered_mapstd::string,std::string _headers; //头部字段std::unordered_mapstd::string,std::string _parame; //查询字符串
public:HttpRequest():_version(HTTP/1.1){}void ReSet(){_method.clear();_path.clear();_version.clear();_body.clear();std::smatch tmp; //用于交换清除_matches.swap(tmp);_headers.clear();_parame.clear();}//插入头部字段void SetHeader(const std::string key,const std::string val){_headers.insert(std::make_pair(key,val));}//获取指定头部的值std::string GetHeader(const std::string key){auto it _headers.find(key);if(it ! _headers.end()){return it-second;}return ;}//查询指定头部的值是否存在bool HasHeader(const std::string key){auto it _headers.find(key);if(it ! _headers.end()){return true;}return false;}//插入查询字符串void SetParam(const std::string key,const std::string val){_parame.insert(std::make_pair(key,val));}//获取指定查询字符串std::string GetParam(const std::string key){auto it _parame.find(key);if(it ! _parame.end()){return it-second;}return ;}//判断是否存在某个查询字符串bool HasParam(const std::string key){auto it _parame.find(key);if(it ! _parame.end()){return true;}return false;}//获取正文长度size_t Content_Length(){auto it _headers.find(Content-Length);if(it _headers.end()){return 0;}return std::stol(it-second);}//判断是否为短连接bool Close(){if(HasHeader(Connection) GetHeader(Connection) keep-alive){return false;}return true;}
};HttpResponse响应类实现 类功能存储HTTP响应信息要素提供简单的功能性接口 响应信息要素 类成员设计 1.响应状态 -我们需要知道当前处理是否有问题所以需要给用户返回响应状态 2.是否重定向 - 如果该url需要重定向那么我们就需要把重定向的url保存起来 3.响应正文 4.头部字段 -设置头部kv到hash中方便查询 类功能性接口 1.为了方便成员的访问将成员设置成公有 2.头部字段的新增查询获取 3.正文的设置 4.重定向的设置 5.长短连接的判断 class HttpResponse
{
public: int _statu; //响应状态bool _redirect_flag; //是否重定向std::string _body; //响应正文std::string _redirect_url; //重定向urlstd::unordered_mapstd::string,std::string _headers; //头部字段
public:HttpResponse():_statu(200),_redirect_flag(false){}HttpResponse(int statu):_statu(statu),_redirect_flag(false){}void ReSet(){_statu 200;_redirect_flag false;_body.clear();_redirect_url.clear();_headers.clear();}//插入头部字段void SetHeader(const std::string key,const std::string val){_headers.insert(std::make_pair(key,val));}//获取指定头部的值std::string GetHeader(const std::string key){auto it _headers.find(key);if(it ! _headers.end()){return it-second;}return ;}//查询指定头部的值是否存在bool HasHeader(const std::string key){auto it _headers.find(key);if(it ! _headers.end()){return true;}return false;}//设置响应正文void SetContent(const std::string body,const std::string type){_body body;SetHeader(Content-Length,type);}//设置重定向void SetRedirect(const std::string url,int statu 302){_statu statu;_redirect_flag true;_redirect_url url;}//判断是否为短连接bool Close(){if(HasHeader(Connection) GetHeader(Connection) keep-alive){return false;}return true;}
};HttpContent上下文类实现 该类是处理的Http请求响应的上下文模块当一个请求没有一次性发送过来的时候那么处理的时候就不能只处理当次的而是要保证处理一个完整的请求所以需要上下文模块来记录上一次处理以便处理一个完整的请求 类成员设计 1.我们需要一个响应状态码因为每一次处理的结果都可能是不一样的响应状态 2.需要记录当前解析到的状态 方便下一次解析 3.解析得到的信息需要放到HttpRequest中 类功能 这里之后一个函数就是进行解析工作但是其中分成几个子函数 1.接收请求行 从缓冲区中取出数据并更新解析状态 2.解析请求行 通过正则表达式进行处理并把查询字符串放入到Request 3.接收头部 将Buffer中数据进行分割 4.解析头部 以kv的形式存储在Request中 5.接收正文 通过Content-Length获取正文长度并根据需求来获取正文 需要注意的是 任意一个状态有问题都需要立即停止剩下的工作应该那是没有意义的。而且上一个工作没有完成是不能进行下一步进行处理的。 typedef enum {RECV_HTTP_ERROR,RECV_HTTP_LINE,RECV_HTTP_HEAD,RECV_HTTP_BODY,RECV_HTTP_OVER,
}HttpRecvStatu;#define MAX_LINE 8192class HttpContent
{
private:int _resp_statu; //响应状态码HttpRecvStatu _recv_statu; //当前解析阶段HttpRequest _request; //已经解析得到的信息
private:bool ParseLine(const std::string line){std::smatch matches;std::regex e((GET|POST|HEAD|DELETE|PUT) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\r|\r\n)?,std::regex::icase); //匹配请求方法的正则表达式bool ret regex_match(line,matches,e);if(ret false) return false;//结果如下//0 : GET /hello/login?userxiaomingpasswd123456 HTTP/1.1//1 : GET//2 : /hello/login//3 : userxiaomingpasswd123456//4 : HTTP/1.1_request._method matches[1];//这里统一转换为大写兼容性更强std::transform(_request._method.begin(),_request._method.end(),_request._method.begin(),::toupper);//这里资源路径需要解码且不需要转空格_request._path Util::UrlDeCode(matches[2],false);_request._version matches[4];//这个查询字符串要特殊处理一下std::string param matches[3];std::vectorstd::string vs;Util::Spilt(param,,vs);//通过等号进行继续分割得到key valfor(auto str:vs){size_t pos str.find();if(pos std::string::npos){//没找到认为是有问题的返回错误_resp_statu 400; //BAD REQUEST_recv_statu RECV_HTTP_ERROR;return false;}//这里得到的key和val也是需要进行解码的且需要空格转std::string key Util::UrlDeCode(str.substr(0,pos),true);std::string val Util::UrlDeCode(str.substr(pos1),true);_request.SetParam(key,val);}return true;}bool RecvLine(Buffer* buf){if(_recv_statu ! RECV_HTTP_LINE) return false;//接收一行std::string line buf-GetLineAndPop();if(line.size() 0){//没有一行有可能是一行的数据太大了也可能是不够一行的数据if(buf-ReadAbleSize() MAX_LINE){//{414, URI Too Long}_resp_statu 414;_recv_statu RECV_HTTP_ERROR;return false;}return true;}if(line.size() MAX_LINE){_resp_statu 414;_recv_statu RECV_HTTP_ERROR;return false;}bool ret ParseLine(line);if(ret false){return false;}//首行处理完毕进入头部阶段_recv_statu RECV_HTTP_HEAD;return true;}bool RecvHead(Buffer* buf){if(_recv_statu ! RECV_HTTP_HEAD) return false;//这里是有多行数据而且数据都是以key val的形式while(true){std::string line buf-GetLineAndPop();if(line.size() 0){//没有一行有可能是一行的数据太大了也可能是不够一行的数据if(buf-ReadAbleSize() MAX_LINE){//{414, URI Too Long}_resp_statu 414;_recv_statu RECV_HTTP_ERROR;return false;}return true;}if(line.size() MAX_LINE){_resp_statu 414;_recv_statu RECV_HTTP_ERROR;return false;}if(line \n || line \r\n) break; //这里说明头部已经解析完成bool ret ParseHead(line);if(ret false) return false;}//头部解析完成进行正文解析_recv_statu RECV_HTTP_BODY;return true;}bool ParseHead(std::string line){//去掉换行符if(line.back() \n) line.pop_back();if(line.back() \r) line.pop_back();//这里都是key val的形式size_t pos line.find(: );if(pos std::string::npos){//解析失败_resp_statu 400;_recv_statu RECV_HTTP_ERROR;return false;}std::string key line.substr(0,pos);std::string val line.substr(pos2);_request.SetHeader(key,val);return true;}bool RecvBody(Buffer* buf){if(_recv_statu ! RECV_HTTP_BODY) return false;//正文数据大小为0则直接返回即可size_t content_len _request.Content_Length();if(content_len 0){//解析完毕直接返回_recv_statu RECV_HTTP_OVER;return true;}//获得剩下应该获取的正文长度size_t real_len content_len - _request._body.size();//缓冲区的数据大于剩余正文长度取出当前所需if(buf-ReadAbleSize() real_len){_request._body.append(buf-ReadPosition(),real_len);//这里不要忘了向后移动buf-MoveReadOffSet(real_len);_recv_statu RECV_HTTP_OVER;return true;}//这里的缓冲区数据不够则把数据放到正文中然后返回即可_request._body.append(buf-ReadPosition(),buf-ReadAbleSize());//这里不要忘了向后移动buf-MoveReadOffSet(buf-ReadAbleSize());return true;}
public:HttpContent():_resp_statu(200),_recv_statu(RECV_HTTP_LINE){};void Reset(){_resp_statu 200;_recv_statu RECV_HTTP_LINE;_request.ReSet();}int Statu() { return _resp_statu; }HttpRecvStatu RecvStatu() { return _recv_statu; }HttpRequest RecvReq() { return _request; }//接收并解析请求void HttpRecvReqest(Buffer* buffer){//根据不同的状态进入不同的函数这里一个函数处理完成之后会立刻进入下一个函数完成全过程switch(_recv_statu){case RECV_HTTP_LINE: RecvLine(buffer);case RECV_HTTP_HEAD: RecvHead(buffer);case RECV_HTTP_BODY: RecvBody(buffer);}}
}; HttpServer服务器模块 该类主要是实现HTTP服务器的搭建可以让用户快速搭建使用服务器 实现思想 我们可以通过一个请求路由表来做统一的处理这个表记录了那个请求对应哪一个方法通过hash的方式什么方法怎么处理这个有用户自己决定服务器只需要执行对应的函数即可。 优势用户只需要实现业务处理函数处理好请求以及处理函数的映射关系添加到服务器中而服务器只需要接收数据解析数据查找路由表中的映射关系同时执行对应的函数即可 类成员设计 1.各种方法的请求路由表 GET/POST/PUT/DELETE) 2.静态资源根目录 -实现静态资源获取的处理 3.高性能的TCP服务器 -进行连接的IO处理 服务器处理流程 1.从socket接受数据放入到接收缓冲区 2.调用OnMessage回调函数进行处理 3.对请求进行解析得到一个HttpRequest里面填写好所有的请求要素 4.通过请求路由查找对应的方法进行处理 4.1如果是静态资源的请求 - 实体文件资源的请求 将静态资源文件中的数据读取出来填到HttpResponse中 4.2如果是功能性请求 - 查找对应的路由函数执行对应的方法并填写HttpResponse 5.业务处理完成之后已经得到一个HttpResponse结构组织http响应并进行发送 类功能函数设计 1.添加请求 -建立函数映射表GET/POST/PUT/DELETE) 2.设置静态资源根目录 3.设置是否启动超时连接关闭 4.设置线程池中线程的数量 5.启动服务器 调用TcpServer中的start接口 class HttpServer
{
private:using Functor std::functionvoid(const HttpRequest,HttpResponse*);using Handler std::vectorstd::pairstd::regex,Functor;//各种方法的路由表第一个是正则表达式第二个是方法Handler _get_route;Handler _post_route;Handler _put_route;Handler _delete_route;std::string _base_dir; //静态资源根目录TcpServer _server;
private:void ErrorHandler(const HttpRequest req,HttpResponse* resp){//1.组织一个错误的页面std::string body;body html;body head;body meta charsetutf-8;body /head;body body;body h1;body std::to_string(resp-_statu);body ;body Util::StatuDesc(resp-_statu);body /h1;body /body;body /html;//2.讲页面数据放入到resp中resp-SetContent(body,text/html);}//根据http来组织httpResponse各要素void WriteResponse(const PtrConnection conn,HttpRequest req,HttpResponse resp){//1.完善头部字段if(req.Close() true){resp.SetHeader(Connection,Close);}else{resp.SetHeader(Connection,keep-alive);}if(!resp._body.empty() !resp.HasHeader(Content-Length)){resp.SetHeader(Content-Length,std::to_string(resp._body.size()));}if(!resp._body.empty() !resp.HasHeader(Content-Type)){resp.SetHeader(Content-Type,application/octet-stream);}//是否设置重定向if(resp._redirect_flag){resp.SetHeader(Location,resp._redirect_url);}//2.填充resp信息 协议版本响应状态码响应状态码描述std::stringstream str;str req._version std::to_string(resp._statu) Util::StatuDesc(resp._statu) \r\n;//响应头部for(auto head : resp._headers){str head.first : head.second \r\n;}str \r\n;str resp._body;//3.发送数据conn-Send(str.str().c_str(),str.str().size());}//静态资源处理bool IsFilerHandler(const HttpRequest req){//1.必须设置静态资源根目录if(req._path.empty()){return false;}//2.必须是GET或者是HEAD方法if(req._method ! GET req._method ! HEAD){return false;}//3.必须是合法的资源路径if(!Util::IsValPath(req._path)){return false;}//4.资源必须存在//先添加上静态资源根目录防止后面出错用另外一个变量来接收std::string req_path _base_dir req._path;// / 最后一个字符是/那么添加上index.htmlif(req_path.back() /){req_path index.html;}//判断是否是一个文件if(!Util::IsRegular(req_path)){return false;}return true;}bool FilerHandler(HttpRequest req,HttpResponse* resp){//先确定路径std::string req_path _base_dir req._path;if(req_path.back() /){req_path index.html;}//把要获取的资源读取出来放到resp中的body中并设置mimebool ret Util::ReadFile(req_path,(resp-_body));if(ret false){return false;}//设置mimeresp-SetHeader(Content-Type,Util::GetMime(req_path));return true;}//功能性资源处理void Dispatcher(HttpRequest req,HttpResponse* resp,Handler handlers){//通过查看不同的方法的路由表如果找到方法就执行对应的方法如果没有找到就返回404for(auto handler:handlers){const std::regex re handler.first;const Functor func handler.second;bool ret std::regex_match(req._path,req._matches,re);if(ret false){continue; //没有找到继续查找}//找到就执行对应的函数return func(req,resp);}//返回404resp-_statu 404;} void Route(HttpRequest req,HttpResponse* resp){//分成两种 - 如果是静态资源请求则调用静态资源请求处理//如果是功能性资源请求则调用对应的函数处理if(IsFilerHandler(req)){FilerHandler(req,resp);}//功能性请求if(req._method GET || req._method HEAD){return Dispatcher(req,resp,_get_route);}else if(req._method POST){return Dispatcher(req,resp,_post_route);}else if(req._method PUT){return Dispatcher(req,resp,_put_route);}else if(req._method DELETE){return Dispatcher(req,resp,_delete_route);}//不是上述方法则返回405resp-_statu 405; //Method Not Allowed}//设置上下文void Onconnect(const PtrConnection conn){conn-SetContext(HttpContent());DLOG(New Connection %p,conn.get());}//解析处理void OnMessage(const PtrConnection conn,Buffer* buf){while(buf-ReadAbleSize() 0){//1.获取上下文HttpContent* content conn-Context()-getHttpContent();//2.通过解析上下文得到httprequestcontent-HttpRecvReqest(buf);HttpRequest req content-RecvReq();HttpResponse resp(content-Statu());//这里数据解析可能出错if(content-Statu() 400){//进行错误响应关闭连接ErrorHandler(req,resp); //填充一个错误的页面返回WriteResponse(conn,req,resp); //组织响应//关闭连接之前应该重置一下上下文防止状态一直处理错误的状态content-Reset();//这里出错了为了提高服务器的效率直接把缓冲区中的数据清空buf-MoveReadOffSet(buf-ReadAbleSize());conn-ShutDown(); //关闭连接return;}//这里有可能解析还没有完成重新开始if(content-RecvStatu() ! RECV_HTTP_OVER){return;}//3.请求路由Route(req,resp);//4.组织httpresponse并发送WriteResponse(conn,req,resp);//5.重置上下文content-Reset();//6.是否是短连接关闭if(resp.Close()) conn-ShutDown();}}
public:HttpServer(int port,int timeout DEFAULT_TIMEOUT):_server(port){_server.SetActiveRelease(timeout);_server.SetConnectCallback(std::bind(HttpServer::Onconnect,this,std::placeholders::_1));_server.SetMessageCallback(std::bind(HttpServer::OnMessage,this,std::placeholders::_1,std::placeholders::_2));}void SetBaseDir(const std::string path){assert(Util::IsDir(path) true);_base_dir path;}//建立正则表达式和对应处理函数的映射关系void Get(const std::string pattern,const Functor func){_get_route.push_back(std::make_pair(std::regex(pattern),func));}void Post(const std::string pattern,const Functor func){_post_route.push_back(std::make_pair(std::regex(pattern),func));}void Put(const std::string pattern,const Functor func){_put_route.push_back(std::make_pair(std::regex(pattern),func));}void Delete(const std::string pattern,const Functor func){_delete_route.push_back(std::make_pair(std::regex(pattern),func));}void SetThreadCount(int count){_server.SetThreadCount(count);}void Listen(){_server.Start();}
}; 服务器搭建并进行测试 搭建服务器 我们需要编写简单的业务处理函数来填写路由表对应的处理方法然后再调用接口来启动服务器即可 #include http.hpp#define WWWROOT ./wwwroot/std::string RequestStr(const HttpRequest req)
{std::stringstream ss;//请求方法 资源路径 协议版本ss req._method req._path req._version \r\n;//查询字符串for(auto it : req._parame){ss it.first : it.second \r\n;}//请求头部for(auto it : req._headers){ss it.first : it.second \r\n;}ss \r\n;//请求正文ss req._body;return ss.str();
}void GetFile(const HttpRequest req,HttpResponse* resp)
{resp-SetContent(RequestStr(req),text.plain);//sleep(15); //这里模拟服务器处理时间很长可能会导致的其他连接超时销毁
}
void Login(const HttpRequest req,HttpResponse* resp)
{resp-SetContent(RequestStr(req),text.plain);
}
void PutFile(const HttpRequest req,HttpResponse* resp)
{std::string req_path WWWROOT req._path;Util::WriteFile(req_path,req._body);
}
void DeleteFile(const HttpRequest req,HttpResponse* resp)
{resp-SetContent(RequestStr(req),text.plain);
}int main()
{HttpServer server(8080);server.SetBaseDir(WWWROOT);server.SetThreadCount(3);server.Get(/hello,GetFile);server.Post(/login,Login);server.Put(/submit.txt,PutFile);server.Delete(/submit.txt,DeleteFile);server.Listen();return 0;
} 长连接测试 #include ../source/server.hppint main()
{Socket cli_sock;cli_sock.CreateClient(8080, 127.0.0.1);std::string req GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n;while(1){assert(cli_sock.Send(req.c_str(),req.size()) ! -1);char buffer[1024];assert(cli_sock.Recv(buffer,1023) ! -1);DLOG([%s],buffer);sleep(3);}cli_sock.Close();return 0;
} 测试结果 这里服务器设置了30秒的超时连接销毁通过一定时间的观察我们可以得出长连接测试是没问题的。 测试超时连接是否销毁 #include ../source/server.hppint main()
{Socket cli_sock;cli_sock.CreateClient(8080, 127.0.0.1);std::string req GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n;while(1){sleep(40);}cli_sock.Close();return 0;
} 测试结果 我们可以看到服务器这里经过了30秒之后就自动关闭了而客户端那里由于设置了死循环所以没有任何反应但是这也足够测试出来结果是没有问题的 错误请求处理 //只发送一次小的请求服务器得不到完整的数据就不会进行业务处理
//给服务器发送多条小的请求服务器会把后面的请求当正文处理但是后面的请求会失败
#include ../source/server.hppint main()
{Socket cli_sock;cli_sock.CreateClient(8080, 127.0.0.1);std::string req GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 100\r\n\r\nhow are you?;while(1){assert(cli_sock.Send(req.c_str(),req.size()) ! -1);assert(cli_sock.Send(req.c_str(),req.size()) ! -1);assert(cli_sock.Send(req.c_str(),req.size()) ! -1);char buffer[1024];assert(cli_sock.Recv(buffer,1023) ! -1);DLOG([%s],buffer);sleep(3);}cli_sock.Close();return 0;
} 测试结果 我们可以看到确实这里只处理了一次把后面的请求也放入到了第一次请求的请求正文中了同时服务器也没有关闭所以测试还是没有问题的 服务器性能达到瓶颈的处理 理论上如果服务器性能达到瓶颈那么怎么处理应该都不为过这里我们采用这种方式来模拟服务器性能达到瓶颈通过请求GET方法但是让其睡眠来模拟 测试代码 //测试在服务器达到瓶颈的时候对连接的处理其他的连接可能因为这个连接处理而导致超时销毁#include ../source/server.hppint main()
{signal(SIGCHLD,SIG_IGN);for(int i 0;i10;i){pid_t pid fork();if(pid 0){DLOG(FORK ERROR);return -1;}else if(pid 0){//子进程Socket cli_sock;cli_sock.CreateClient(8080, 127.0.0.1);std::string req GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n;while(1){assert(cli_sock.Send(req.c_str(),req.size()) ! -1);char buffer[1024];assert(cli_sock.Recv(buffer,1023) ! -1);DLOG([%s],buffer);sleep(3);}cli_sock.Close();exit(0);}}while(1) sleep(1);return 0;
} 测试结果 client server 我们可以看到性能瓶颈并没有导致服务器进行关闭这些连接都得到了合理得处理结果 一次发送多个请求测试 //一次性向服务器发送多个请求查看服务器的响应情况
//每一条请求都应该得到正确的处理
#include ../source/server.hppint main()
{Socket cli_sock;cli_sock.CreateClient(8080, 127.0.0.1);std::string req GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n;req GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n;req GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n;while(1){assert(cli_sock.Send(req.c_str(),req.size()) ! -1);char buffer[1024];assert(cli_sock.Recv(buffer,1023) ! -1);DLOG([%s],buffer);sleep(3);}cli_sock.Close();return 0;
} 测试结果 它们都得到了预期的处理结果服务器也一直在处理中知道用户关闭客户端 测试大文件传输 #include ../source/http/http.hppint main()
{Socket cli_sock;cli_sock.CreateClient(8080, 127.0.0.1);std::string req PUT /submit.txt HTTP/1.1\r\nConnection: keep-alive\r\n;std::string body;bool ret Util::ReadFile(./hello.txt,body);req Content-Length: std::to_string(body.size()) \r\n\r\n;assert(cli_sock.Send(req.c_str(),req.size()) ! -1);//传输正文assert(cli_sock.Send(body.c_str(),body.size()) ! -1);char buffer[1024];assert(cli_sock.Recv(buffer,1023) ! -1);DLOG([%s],buffer);sleep(3);cli_sock.Close();return 0;
} 这里传送300M的文件给服务器因为这里使用的是云服务器资源没有那么多就使用300M来测试 通过上面的指令我们就可以申请到一个300M的文件大写这里为了让文件内容不全为0这里追加了字符到文件中 测试结果 我们在服务器这里也得到了一个文件 这是我们只需要通过md5sum来比较以下这两个文件是否相同即可 这两个文件内容也是一样的说明我们的测试是没有问题的 服务器性能测试 通过webbench来对服务器进行测试 测试环境服务器是两核2G带宽4M的云服务器 5000个客户端的情况下 使用webbench以5000并发量对服务器发送请求进行1分钟测试得出的QPS为2080 即每秒处理的包的数量 项目源码 项目源码