当前位置:C++技术网 > 精选软件 > libuv服务器端包装类源代码分享(修正Linux服务器连续死循环版)

libuv服务器端包装类源代码分享(修正Linux服务器连续死循环版)

更新时间:2017-06-05 16:49:57浏览次数:1+次

        在文章《Linux下libuv服务器端包装类源代码分享》中分享了第一版的Linux系统下对Libuv库的包装类,让TCP网络编程变得更加简单。但是在服务器就同一个请求连续回发数据的时候,服务器陷入了死循环,CPU使用率达到了99%,无法再处理其他请求了。效果如下图所示:

    libuv服务器端包装类源代码分享(修正Linux服务器连续死循环版)

        Libuv相关的源码下载、编译以及其他的源代码分享,见文章开头的链接文章,就不在这里重复了。

        下面给出修正后的代码,解决了Linux下Libuv服务器连续回复客户端两次及以上时服务器出现了死循环的问题:

    头文件tcpServer.h:

#pragma once
/*---------------------------------------
- 文件 tcpServer.h
- 简介 基于libuv封装的tcp服务端的类
- 来源 C++技术网 http://www.cjjjs.com
- 作者 codexia (精简+修复Bug)
- 封装 phata, wqvbjhc@gmail.com (原始封装作者)
- 日期 2017-6-5
- 说明 在phata封装libuv库的基础上精简了不必要的代码,主要是日志,然后整理了排版,更符合C++排版风格,并将服务器端类和客户端分离为两个类
- lib VS2010版的lib静态库的使用,项目中要忽略LIBCMT.lib,否则出现导出的符号重复
- 修复 Linux环境下,服务器在回复客户端请求时,如果连续发两次数据,就出现死循环。
-----------------------------------------*/
#include "uv.h"
#include <string>
#include <cstring>
#include <map>
#include <stdio.h>
#include <cstdlib>

typedef void(*newconnect)(int clientid);
typedef void(*server_recvcb)(int cliendid, const char* buf, int bufsize);
#define BUFFERSIZE (1024*1024)
class clientdata;
class CTcpServer
{
public:
    CTcpServer(uv_loop_t* loop = uv_default_loop());
    virtual ~CTcpServer();

    static std::string GetUVError(int retcode)
    {
        std::string err;
        err = uv_err_name(retcode);
        err += ":";
        err += uv_strerror(retcode);
        return std::move(err);
    }

public:
    //基本函数
    bool Start(const char *ip, int port);//启动服务器,地址为IP4
    bool Start6(const char *ip, int port);//启动服务器,地址为IP6
    void close();

    bool setNoDelay(bool enable);
    bool setKeepAlive(int enable, unsigned int delay);

    const char* GetLastErrMsg() const {    return errmsg_.c_str();    };

    int send(int clientid, const char* data, std::size_t len, bool is_send = true);//默认直接发送
    void setnewconnectcb(newconnect cb);
    void setrecvcb(int clientid, server_recvcb cb);//设置接收回调函数,每个客户端各有一个
protected:
    int GetAvailaClientID()const;//获取可用的client id
    bool DeleteClient(int clientid);//删除链表中的客户端
                                    //静态回调函数
    static void AfterServerRecv(uv_stream_t *client, ssize_t nread, const uv_buf_t* buf);
    static void AfterSend(uv_write_t *req, int status);
    static void onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
    static void AfterServerClose(uv_handle_t *handle);
    static void AfterClientClose(uv_handle_t *handle);
    static void acceptConnection(uv_stream_t *server, int status);

private:
    bool init();
    bool run(int status = UV_RUN_DEFAULT);
    bool bind(const char* ip, int port);
    bool bind6(const char* ip, int port);
    bool listen(int backlog = 1024);

    uv_tcp_t server_;//服务器链接
    std::map<int, clientdata*> clients_list_;//子客户端链接
    uv_mutex_t mutex_handle_;//保护clients_list_
    uv_loop_t *loop_;
    std::string errmsg_;
    newconnect newconcb_;
    bool isinit_;//是否已初始化,用于close函数中判断
};

class clientdata
{
public:
    clientdata(int clientid) :client_id(clientid), recvcb_(nullptr) 
 {
        client_handle = (uv_tcp_t*)malloc(sizeof(*client_handle));
        client_handle->data = this;
        readbuffer = uv_buf_init((char*)malloc(BUFFERSIZE), BUFFERSIZE);
        writebuffer = uv_buf_init((char*)malloc(BUFFERSIZE), BUFFERSIZE);
    }
    virtual ~clientdata() 
 {
        free(readbuffer.base);
        readbuffer.base = nullptr;
        readbuffer.len = 0;

        free(writebuffer.base);
        writebuffer.base = nullptr;
        writebuffer.len = 0;

        free(client_handle);
        client_handle = nullptr;
    }
    int client_id;//客户端id,惟一
    uv_tcp_t* client_handle;//客户端句柄
    CTcpServer* tcp_server;//服务器句柄(保存是因为某些回调函数需要到)
    uv_buf_t readbuffer;//接受数据的buf
    uv_buf_t writebuffer;//写数据的buf
    server_recvcb recvcb_;//接收数据回调给用户的函数
 //uv_write_t *write_req;[修复点:删除]
 };

源文件tcpServer.cpp:

    

    

#include "tcpServer.h"
#include "dxLog_Base.h"
#include <signal.h>
CTcpServer::CTcpServer(uv_loop_t* loop) :newconcb_(nullptr), isinit_(false)
{
 loop_ = loop;
 m_pbuf_send = 0;
 m_buf_send_size = 0;
}
CTcpServer::~CTcpServer()
{
 close();
}

bool CTcpServer::init()
{
 //调用uv初始化TCP服务
 //下面的错误代码可调用GetUVError(iret)返回错误信息

 if (isinit_)return true;
 //loop循环为空
 if (!loop_)return false;

 int iret = uv_mutex_init(&mutex_handle_);
 if (iret) return false;
 
 iret = uv_tcp_init(loop_, &server_);
 if (iret) return false;

 isinit_ = true;
 server_.data = this;

 //iret = setNoDelay(true);//启动后导致绑定ip失败。
 //if (iret) return false;

 //调用uv_tcp_keepalive,后续函数会调用出错
 //iret = uv_tcp_keepalive(&server_, 1, 60);
 //if (iret) return false;

 return true;
}
void CTcpServer::close()
{
 //关闭服务器
 for (auto it = clients_list_.begin(); it != clients_list_.end(); ++it) 
 {
 auto data = it->second;
 uv_close((uv_handle_t*)data->client_handle, AfterClientClose);
 }
 clients_list_.clear();
 if (isinit_)uv_close((uv_handle_t*)&server_, AfterServerClose);
 isinit_ = false;
 uv_mutex_destroy(&mutex_handle_);
}
bool CTcpServer::run(int status)
{
 //开始运行
 int iret = uv_run(loop_, (uv_run_mode)status);
 if (iret)return false;
 return true;
}
bool CTcpServer::setNoDelay(bool enable)
{
 //属性设置--服务器与客户端一致
 int iret = uv_tcp_nodelay(&server_, enable ? 1 : 0);
 if (iret)return false;
 return true;
}
bool CTcpServer::setKeepAlive(int enable, unsigned int delay)
{
 int iret = uv_tcp_keepalive(&server_, enable, delay);
 if (iret)return false;
 return true;
}

bool CTcpServer::bind(const char* ip, int port)
{
 //服务器绑定端口ipv4
 struct sockaddr_in bind_addr;
 int iret = uv_ip4_addr(ip, port, &bind_addr);
 if (iret)return false;
 iret = uv_tcp_bind(&server_, (const struct sockaddr*)&bind_addr, 0);
 if (iret)return false;
 return true;
}
bool CTcpServer::bind6(const char* ip, int port)
{
 //服务器绑定端口ipv6
 struct sockaddr_in6 bind_addr;
 int iret = uv_ip6_addr(ip, port, &bind_addr);
 if (iret)return false;
 iret = uv_tcp_bind(&server_, (const struct sockaddr*)&bind_addr, 0);
 if (iret)return false;
 return true;
}
bool CTcpServer::listen(int backlog)
{
 //监听TCP
 int iret = uv_listen((uv_stream_t*)&server_, backlog, acceptConnection);
 if (iret)return false;
 return true;
}
string itoa(int num)
{
 char buf[100]={0};
 sprintf(buf,"%d",num);
 return string(buf);
}

bool CTcpServer::Start(const char *ip, int port)
{
 //开始运行TCP服务,ipv4版
 
 close();
 if (!init())return false;
 if (!bind(ip, port))return false;
 if (!listen(SOMAXCONN))return false;
 if (!run())return false;
 return true;
}
bool CTcpServer::Start6(const char *ip, int port)
{
 //开始运行TCP服务,ipv6版
 close();
 if (!init())return false;
 if (!bind6(ip, port))return false;
 if (!listen(SOMAXCONN))return false;
 if (!run())return false;
 return true;
}

int CTcpServer::send(int clientid, const char* data, std::size_t len,bool is_send)
{
 //服务器发送数据给客户端函数
 auto itfind = clients_list_.find(clientid);
 //找不到指定ID的客户端
 if (itfind == clients_list_.end()) return -1;
 //自己控制data的生命周期直到write结束
 if (itfind->second->writebuffer.len < len) 
 {
 itfind->second->writebuffer.base = (char*)realloc(itfind->second->writebuffer.base, len);
 itfind->second->writebuffer.len = len;
 }
 memcpy(itfind->second->writebuffer.base, data, len);
 uv_buf_t buf = uv_buf_init((char*)itfind->second->writebuffer.base, len);
 uv_write_t *write_req1 = new uv_write_t;//在回调函数AfterSend中会释放[修复点:新增]
 int iret = uv_write(write_req1, (uv_stream_t*)itfind->second->client_handle, &buf, 1, AfterSend);[修复点:修改]

 if (iret)return 1;//失败
 return 0;
}

void CTcpServer::AfterSend(uv_write_t *req, int status)
{
 if (req)delete req;//不管是否发送成功,都要delete,以免内存泄漏[修复点:新增]
 if (status < 0) {
 //运行到这里,说明发送出现问题
 exit(-1);
 }
}
void CTcpServer::acceptConnection(uv_stream_t *server, int status)
{
 //服务器接收客户端连接
 if (!server->data)return;
 CTcpServer *tcpsock = (CTcpServer *)server->data;
 int clientid = tcpsock->GetAvailaClientID();
 clientdata* cdata = new clientdata(clientid);//uv_close回调函数中释放
 cdata->tcp_server = tcpsock;//保存服务器的信息
 int iret = uv_tcp_init(tcpsock->loop_, cdata->client_handle);//析构函数释放
 if (iret) 
 {
 delete cdata;
 return;
 }
 iret = uv_accept((uv_stream_t*)&tcpsock->server_, (uv_stream_t*)cdata->client_handle);
 if (iret) 
 {
 uv_close((uv_handle_t*)cdata->client_handle, NULL);
 delete cdata;
 return;
 }
 //加入到链接队列
 tcpsock->clients_list_.insert(std::make_pair(clientid, cdata));
 if (tcpsock->newconcb_)tcpsock->newconcb_(clientid);//调用我们自己提供的回调函数
 //服务器开始接收客户端的数据
 iret = uv_read_start((uv_stream_t*)cdata->client_handle, onAllocBuffer, AfterServerRecv);
 return;
}

void CTcpServer::setrecvcb(int clientid, server_recvcb cb)
{
 //设置服务器端接收客户端发过来的数据的回调函数
 auto itfind = clients_list_.find(clientid);
 if (itfind != clients_list_.end())itfind->second->recvcb_ = cb;
}
void CTcpServer::setnewconnectcb(newconnect cb)
{
 //设置服务器处理客户端连接的回调函数
 newconcb_ = cb;
}

void CTcpServer::onAllocBuffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{
 //服务器分析空间函数
 if (!handle->data)return;
 clientdata *client = (clientdata*)handle->data;
 *buf = client->readbuffer;
}

void CTcpServer::AfterServerRecv(uv_stream_t *handle, ssize_t nread, const uv_buf_t* buf)
{
 if (!handle->data)return;
 clientdata *client = (clientdata*)handle->data;//服务器的recv带的是clientdata
 if (nread < 0) 
 {
 /* 错误 或 EOF结尾 */
 CTcpServer *server = (CTcpServer *)client->tcp_server;
 string log_str;
 //char tmp[512] = { 0 };
 if (nread == UV_EOF) 
 {
 log_str = "client("+itoa(client->client_id)+") is disconnected,close this client.";

 //sprintf(tmp, "client(%d)连接断开,关闭此客户端", client->client_id);
 }
 else if (nread == UV_ECONNRESET)
 {
 log_str = "client("+itoa(client->client_id)+") abortive disconnect.";
 //sprintf(tmp, "客户端(%d)异常断开", client->client_id);
 }
 else 
 {
 log_str = "client("+itoa(client->client_id)+"):"+GetUVError(nread).c_str();
 //sprintf(tmp, "客户端(%d):%s", client->client_id, GetUVError(nread).c_str());
 }
 dxLog_Base::WriteLineStr(log_str);
 //连接断开,关闭客户端
 server->DeleteClient(client->client_id);
 return;
 }
 else if (0 == nread) 
 {
 /* 一切正常,只是没有读取任何数据 */
 }
 else if (client->recvcb_) 
 {
 client->recvcb_(client->client_id, buf->base, nread);
 }
}

void CTcpServer::AfterServerClose(uv_handle_t *handle)
{
 //服务器关闭
 //新增2017-3-22
 clientdata *cdata = (clientdata*)handle->data;
 if(cdata)delete cdata;
}
void CTcpServer::AfterClientClose(uv_handle_t *handle)
{
 clientdata *cdata = (clientdata*)handle->data;
 delete cdata;
}

int CTcpServer::GetAvailaClientID() const
{
 static int s_id = 0;
 return ++s_id;
}
bool CTcpServer::DeleteClient(int clientid)
{
 uv_mutex_lock(&mutex_handle_);
 auto itfind = clients_list_.find(clientid);
 if (itfind == clients_list_.end()) 
 {
 errmsg_ = "can't find client ";
 errmsg_ += std::to_string((long long)clientid);
 uv_mutex_unlock(&mutex_handle_);
 return false;
 }
 if (uv_is_active((uv_handle_t*)itfind->second->client_handle)) 
 uv_read_stop((uv_stream_t*)itfind->second->client_handle);
 uv_close((uv_handle_t*)itfind->second->client_handle, AfterClientClose);

 clients_list_.erase(itfind);
 uv_mutex_unlock(&mutex_handle_);
 return true;
}

CTcpServer g_srv;

void recv_cb(int client_id, const char* buf, int buf_size)
{
 //处理接受消息
 char* tmp = new char[buf_size*2];
 memcpy(tmp, buf, buf_size*2);
 g_srv.send(client_id, (const char*)tmp, buf_size*2);
 
 char* tmp2 = new char[buf_size];
 memcpy(tmp2, buf, buf_size);
 g_srv.send(client_id, (const char*)tmp2, buf_size);
 delete[] tmp;
 tmp = 0;
 delete[] tmp2;
 tmp2 = 0;
 return;
}
void new_conn_cb(int client_id)
{
 //设置新连接接收数据回调函数
 g_srv.setrecvcb(client_id, recv_cb);
}
int main()
{
 g_srv.setnewconnectcb(new_conn_cb);
 g_srv.Start("0.0.0.0", 6111);
 return 0;
}

    

        修复点:分别包括头文件中删除了uv_write_t变量,在send函数中使用new来创建变量,在AfterSend中删除变量。就是因为send时使用成员变量无法在Linux中实现连续回复数据,而在Windows中是正常的。修复之后,在Windows和Linux都可以连续回复数据了。修复点后面的描述是改动操作,如[修复点:删除],即删掉了的代码,不要再用了。修改和新增就是修改了原有的代码,以及补充必要的新增代码。其他代码没有改动,和之前分享的版本一样。

    说明:在cpp文件中我顺便提供了main函数来示范使用包装类。0.0.0.0表示服务器的任意IP地址,端口号是6102。
    编译tcpServer.cpp文件即可,编译命令为:
g++ -std=c++0x -I/usr/include tcpServer.cpp -L/usr/lib/ -luv -o tcpsrv
    在移植到linux的过程中,主要涉及到nullptr和auto等C++11标准的关键字不被支持。如果你使用的gcc是4.8.1或更高版本,那么就可以直接支持。-std=c++0x可以支持auto关键字,但是不支持nullptr,所以用0代替了nullptr。
    相对于Windows版的类,增加了cstdlib和cstring头文件。移除了lib库的导入指令。在linux中在g++命令后面指定动态库路径和名称,也指定头文件的路径。