Linux多线程Web服务器(C++实现)-创新互联
本文实现的是基于Reactor模式+epoll(边缘触发)+非阻塞socket+非阻塞IO+线程池的Web服务器,可以处理GET、POST两种请求,完成展示主页、获取图片、获取视频、登录及注册共五种功能。
创新互联公司作为成都网站建设公司,专注网站建设、网站设计,有关企业网站设计方案、改版、费用等问题,行业涉及阳光房等多个领域,已为上千家企业服务,得到了客户的尊重与认可。原理图:
上图为本文实现的服务器的原理图,采用了单Reactor多线程的模式,在主线程中用epoll监听一个listenFd与多个connFd。
若发生建立连接的事件,则交给accept单元处理,再把生成的connFd传给epoll管理;
若发生可读\可写事件,则添加到线程池的任务队列中,由池中空闲的子线程拿取任务并处理;
此外,若是客户端请求涉及数据库文件,则还需要从数据库连接池中拿出一个空闲的数据库连接,通过这个连接进行数据库文件的增删查改操作。
本文实现的服务器还采用了epoll的边缘触发模式,相比于水平触发减少了更多的epoll系统调用次数,在高并发的情况下效率更高。下面就ET(边缘触发)和LT(水平)触发进行介绍,并给出基于此实现的读写函数。
关于上图的详解:彻底学会使用epoll(一)——ET模式实现分析
因为在ET模式下,如果read一次没有读尽buffer中的数据,那么下次将得不到读就绪的通知,造成buffer中已有的数据无机会读出,除非有新的数据再次到达,write也同理。所以为了解决这一问题,就需要在读的时候尽力去读,写的时候尽力去写,即使用while循环去读写,直至出错或读写完毕。
使用while循环读写见: HTTP连接(读取请求+解析请求+生成响应+回送响应)
循环读:ssize_t HttpConn::read(int* errno_)
循环写:ssize_t HttpConn::write(int* errno_)
使用while循环去读写时,如果是阻塞IO就会因为 无数据可读/没空间可写 而一直阻塞在那里,所以采用非阻塞IO,一旦 无数据可读/没空间可写 就立刻返回-1,errno=EAGAIN(设置socket为非阻塞socke)。
bool WebServer::setFdNonblock(int fd)
{
int flags;
if((flags=fcntl(fd,F_GETFL,0))<0)
return false;
flags |= O_NONBLOCK;
if(fcntl(fd,F_SETFL,flags)<0)
return false;
return true;
}
因为ET模式下被唤醒(返回就绪)的条件为:
对于读取操作:
(1) 当buffer由不可读状态变为可读的时候,即由空变为不空的时候。
(2) 当有新数据到达时,即buffer中的待读内容变多的时候。
(3) 当buffer中有数据可读(即buffer不空)且用户对相应fd进行epoll_mod IN事件时。
对于写操作:
(1) 当buffer由不可写变为可写的时候,即由满状态变为不满状态的时候。
(2) 当有旧数据被发送走时,即buffer中待写的内容变少得时候。
(3) 当buffer中有可写空间(即buffer不满)且用户对相应fd进行epoll_mod OUT事件时
所以下面实现的逻辑为:
readFrom调用client->read(&readErrno)循环读取,直至读取完成/读取失败,读取完成也就是read返回0,代表服务器端read到了客户端发来的FIN请求,所以关闭连接;而读取失败又分为“暂时无数据可读”或”其他原因“,如果是其他原因则直接关闭连接,如果是"暂时无数据可读"(即没有新的HTTP请求发来)则进入process函数,判断读缓冲区内是否存在收集到的请求,有则解析请求+完成响应报文,然后更换监听的事件为EPOLLOUT,没有则继续监听EPOLLIN事件,继续读取客户端请求。
writeTo调用client->write(&writeErrno)循环写入,直至全部写完/写入失败,写入失败又分为”暂时无空间可写“或”其他原因“,如果是其他原因则直接关闭连接,如果是”暂时无空间可写“,则继续监听EPOLLOUT事件,继续写入响应。如果全部写完了,需要判断是否建立了HTTP长连接,如果建立了,那么就要尽量在一个TCP连接内完成多条HTTP报文的传送,所以进入process函数,判断读缓冲区内是否存在收集到的请求,有则继续监听EPOLLOUT,让本条请求的响应在下一轮epoll_wait中就绪;无则改为监听EPOLLIN,继续读取客户端请求;如果不建立HTTP长连接,则发送完毕HTTP响应后就应直接关闭连接。
void WebServer::readFrom(HttpConn* client)
{
int readErrno=0;
int ret=client->read(&readErrno);//client->raed()用while循环去尽力读
if(ret<=0&&readErrno!=EAGAIN)//读出错,关闭连接
{
closeConn(client);
return ;
}
process(client);
}
void WebServer::writeTo(HttpConn* client)
{
int writeErrno=0;
int ret=client->write(&writeErrno);//client->write()用while循环去尽力写
if(client->isWriteOver())//数据已全部写完
{
if(client->isKeepAlive()) //建立的是Http长连接
{
process(client);
return;
}
}
if(ret<0&&writeErrno==EAGAIN) //数据未读完,加入rdlist中下次继续读
{
epoller_p->modFd(client->getFd(), connEvent | EPOLLOUT);
return ;
}
closeConn(client);
}
void WebServer::process(HttpConn* client)
{
if(client->process()) //处理客户端请求成功
{
epoller_p->modFd(client->getFd(), connEvent | EPOLLOUT);//监听事件改为EPOLLOUT
}
else
{
epoller_p->modFd(client->getFd(), connEvent | EPOLLIN);//监听事件仍为EPOLLIN,继续读
}
}
本文实现的Web服务器还设置了优雅关闭的选项:
struct linger optLinger;
optLinger.l_onoff=1;
optLinger.l_linger=1;
if(setsockopt(listenFd,SOL_SOCKET,SO_LINGER,&optLinger,sizeof(optLinger))<0)
{
LOG_ERROR("%s","Set SO_LINGER Option Error!");
return false;
}
开启此套接字选项后,close()不会在调用后立刻返回,而是在延滞optLinger.l_linger 时间后才返回,在这一段时间内,close()函数并未关闭读写端,所以可以获取到客户端对于发往它的数据和FIN的ACK确认,故称优雅关闭。
当然,如果optLinger.l_linger设置不合理,在延滞时间内并未收到对端的确认,那么close返回-1,errno=EWOULDBLOCK,服务器端关闭,此后如果客户端的确认姗姗来迟,面对已经关闭的服务器端,只会收到RST,这是我们不想看到的。
所以,最好的办法其实是服务器端采用shutdown半关闭(关闭写端),这样读端read函数就会一直阻塞,可以读取到客户端发来的对于数据和FIN的确认,也可以在之后读取到客户端在处理完发来数据后调用close发出的FIN,服务器的读端read读到FIN后,直接返回,读取结束。
所以read的成功返回表明了:服务器端既获得了客户端对于发往它的数据和FIN的确认(TCP),又获得了客户端正确读取发来数据的确认(客户端的用户进程)。相比于close()+SO_LINGER,得到的消息更多。
此外,本文的Web服务器还设置了地址复用的选项:
int optReuseaddr=1;
if(setsockopt(listenFd,SOL_SOCKET,SO_REUSEADDR,&optReuseaddr,sizeof(optReuseaddr))<0)
{
LOG_ERROR("%s","Set SO_REUSEADDR Option Error!");
return false;
}
它的功能是,允许启动一个监听服务器并捆绑其众所周知的端口,即使以前建立的将该端口用作它们的本地端口的连接仍然存在。
这种情况常见于:
先启动一个监听服务器,连接请求到达,派生一个子进程来处理该客户,之后监听服务器终止,但是子进程仍然继续为该连接上的客户提供服务,重启监听服务器时就会失败。
这里重启失败是失败在bind函数绑定端口时,它试图绑定一个已有连接上的端口,所以失败。
当然,本文实现的基于多线程的Web服务器,一旦主线程的监听服务器终止,派生的子线程也会随之终止,大家都关闭了,也就不存在上述的重启失败问题。
以上就是本文实现的Web服务器的重点内容,下面是全部代码:
WebServer类结构 webserver.h
#ifndef WEBSERVER_H
#define WEBSERVER_H
#include#include#include
#include#include#include#include
#include "heaptimer.h"
#include "epoller.h"
#include "../pool/threadpool.h"
#include "../pool/sqlconnpool.h"
#include "../http/httpconn.h"
#include "../log/log.h"
class WebServer
{
public:
WebServer();
~WebServer();
bool loadConfigFile();
void start();
private:
void dealListen();
void dealRead(HttpConn* client);
void dealWrite(HttpConn* client);
void closeConn(HttpConn* client);
void flushTime(HttpConn* client);
void addClient(int fd,const sockaddr_in& addr);
void readFrom(HttpConn* client);
void writeTo(HttpConn* client);
void process(HttpConn* client);
bool setFdNonblock(int fd);
bool initSocket();
int port;
int timeOutMs;
bool isClose;
int listenFd;
char* srcDir;
uint32_t listenEvent;
uint32_t connEvent;
std::unique_ptrtimer_p;
std::unique_ptrthreadPool_p;
std::unique_ptrepoller_p;
std::unordered_mapusers;
const int maxUserNum = 65536;
};
#endif // !WEBSERVER_H
WebServer类实现 webserver.cpp
#include "webserver.h"
bool WebServer::loadConfigFile()//加载配置文件
{
FILE* fp = fopen("./webserver.ini", "r");
if(fp==nullptr)
{
return false;
}
while(!feof(fp))
{
char line[1024] = {0};
fgets(line, 1024, fp);
string str = line;
int idx = str.find('=', 0);
if (idx == -1)
continue;
int endidx = str.find('\n', idx);
string key = str.substr(0, idx);
string value = str.substr(idx+1,endidx-idx-1);
if (key == "port")
port = stoi(value);
else if (key == "timeOutMs")
timeOutMs = stoi(value);
else if (key == "sqlConnMaxNum")
SqlConnPool::getInstance(stoi(value));
else if (key == "threadNum")
threadPool_p=std::unique_ptr(new ThreadPool(stoi(value)));
else if (key == "logQueSize")
Log::getInstance()->init(stoi(value));
}
fclose(fp);
return true;
}
WebServer::WebServer():isClose(false),timer_p(new HeapTimer()),epoller_p(new Epoller())
{
if (!loadConfigFile())//配置失败
{
isClose=true;
LOG_ERROR("%s","Load Config File Fail!");
return ;
}
srcDir=getcwd(nullptr,256);
strncat(srcDir,"/resources/",15);
HttpConn::userCount=0;
HttpConn::srcDir=srcDir;
listenEvent=EPOLLRDHUP|EPOLLIN|EPOLLET;
connEvent=EPOLLONESHOT|EPOLLRDHUP|EPOLLET;
if(!initSocket())
isClose=true;
if(isClose)
LOG_ERROR("%s","========== Server init error!==========");
else
LOG_INFO("%s", "========== Server init success!========");
}
WebServer::~WebServer()
{
close(listenFd);
isClose=true;
free(srcDir);
}
void WebServer::start()
{
int timeMs=-1;
while(!isClose)
{
if(timeOutMs>0)
{
timeMs=timer_p->getNextTick();
}
int eventCnt=epoller_p->wait(timeMs);
for(int i=0;igetEventFd(i);
uint32_t events=epoller_p->getEvents(i);
if(fd==listenFd)
{
dealListen();
}
else if(events&EPOLLIN)
{
dealRead(&users[fd]);
}
else if(events&EPOLLOUT)
{
dealWrite(&users[fd]);
}
else if(events&(EPOLLRDHUP|EPOLLHUP|EPOLLERR))
{
closeConn(&users[fd]);
}
else
{
LOG_ERROR("%s","Unexpected Event Happen!");
}
}
}
}
void WebServer::dealListen()
{
struct sockaddr_in addr;
socklen_t len=sizeof(addr);
while(true)
{
int connfd=accept(listenFd,(struct sockaddr*)&addr,&len);
if(HttpConn::userCount>=maxUserNum)
{
close(connfd);
LOG_ERROR("%s","Server Users Full!");
return ;
}
if(connfd<=0)
{
return ;
}
else
{
addClient(connfd,addr);
}
}
}
void WebServer::dealRead(HttpConn* client)
{
flushTime(client);
threadPool_p->addTask(std::bind(&WebServer::readFrom,this,client));
}
void WebServer::dealWrite(HttpConn* client)
{
flushTime(client);
threadPool_p->addTask(std::bind(&WebServer::writeTo,this,client));
}
void WebServer::closeConn(HttpConn* client)
{
epoller_p->delFd(client->getFd());
client->closeConn();
}
void WebServer::flushTime(HttpConn* client)
{
if(timeOutMs>0)
{
timer_p->adjust(client->getFd(),timeOutMs);
}
}
void WebServer::addClient(int connfd,const sockaddr_in& addr)
{
users[connfd].init(connfd,addr);
if(timeOutMs>0)
{
timer_p->add(connfd,timeOutMs,std::bind(&WebServer::closeConn,this,&users[connfd]));
}
epoller_p->addFd(connfd,connEvent|EPOLLIN);
setFdNonblock(connfd);
}
void WebServer::readFrom(HttpConn* client)
{
int readErrno=0;
int ret=client->read(&readErrno);
if(ret<=0&&readErrno!=EAGAIN)
{
closeConn(client);
return ;
}
process(client);
}
void WebServer::writeTo(HttpConn* client)
{
int writeErrno=0;
int ret=client->write(&writeErrno);
if(client->isWriteOver())
{
if(client->isKeepAlive())
{
process(client);
return;
}
}
if(ret<0&&writeErrno==EAGAIN)
{
epoller_p->modFd(client->getFd(), connEvent | EPOLLOUT);
return ;
}
closeConn(client);
}
void WebServer::process(HttpConn* client)
{
if(client->process())
{
epoller_p->modFd(client->getFd(), connEvent | EPOLLOUT);
}
else
{
epoller_p->modFd(client->getFd(), connEvent | EPOLLIN);
}
}
bool WebServer::setFdNonblock(int fd)
{
int flags;
if((flags=fcntl(fd,F_GETFL,0))<0)
return false;
flags |= O_NONBLOCK;
if(fcntl(fd,F_SETFL,flags)<0)
return false;
return true;
}
bool WebServer::initSocket()
{
if(port>65535||port<1024)
{
LOG_ERROR("Select Port:%d Error!",port);
return false;
}
listenFd=socket(AF_INET,SOCK_STREAM,0);
if(listenFd<0)
{
LOG_ERROR("%s","Create Socket Error!");
return false;
}
struct linger optLinger;
optLinger.l_onoff=1;
optLinger.l_linger=1;
if(setsockopt(listenFd,SOL_SOCKET,SO_LINGER,&optLinger,sizeof(optLinger))<0)
{
LOG_ERROR("%s","Set SO_LINGER Option Error!");
return false;
}
int optReuseaddr=1;
if(setsockopt(listenFd,SOL_SOCKET,SO_REUSEADDR,&optReuseaddr,sizeof(optReuseaddr))<0)
{
LOG_ERROR("%s","Set SO_REUSEADDR Option Error!");
return false;
}
struct sockaddr_in addr;
addr.sin_family=AF_INET;
addr.sin_addr.s_addr=htonl(INADDR_ANY);
addr.sin_port=htons(port);
if(bind(listenFd,(struct sockaddr*)&addr,sizeof(addr))<0)
{
LOG_ERROR("%s","Bind Port:%d Error!",port);
return false;
}
if(listen(listenFd,5)<0)
{
LOG_ERROR("%s","Listen Port:%d Error!",port);
return false;
}
if(!epoller_p->addFd(listenFd,listenEvent))
{
LOG_ERROR("%s","Add ListenFd:%d in Epoll Error!",listenFd);
return false;
}
if(!setFdNonblock(listenFd))
{
LOG_ERROR("%s","Set ListenFd:%d Nonblock Error!",listenFd);
return false;
}
return true;
}
webserver.ini
# WEB服务器配置文件
#端口号
port=8888
#超时时间
timeOutMs=500
#大sql连接数
sqlConnMaxNum=1000
#线程数
threadNum=8
#日志队列大容量
logQueSize=1024
本项目已在github开源,全部代码见:
1410138/MyWebServer: C++ Linux Web服务器 (github.com)
其余部分的介绍及部分代码见本专栏:
webServer_{(sunburst)}的博客-博客
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
网站名称:Linux多线程Web服务器(C++实现)-创新互联
文章网址:http://myzitong.com/article/egish.html