方了方便客户端服务端网络部分的开发,使用boost asio和protobuf封装了一个通信库。
特点
- 接口简单
- 高性能
- 自动重连,客户端支持多IP寻址
- 支持心跳
- 包含glog日志库可以直接使用
- 允许设置压缩包
- 库自定义通信协议,用户只需要使用protobuf定义数据结构就可以了
- 服务端多线程处理消息支持有序和无序应答
- 支持请求应答和订阅推送两种模式
- 支持同步请求异步请求
- 线程安全
- 支持跨平台,测试环境编译(windows,linux)
请求应答代码演示
客户端:
#include <iostream>
#include "easyio/proto/pb_simple.pb.h"
#include "easyio/msgclient.h"
#include "easyio/msgbus.h"
#include "easyio/tool/util.h"
#include "easyio/tool/strutil.h"
using namespace easyio;
const std::string ADDRESS = "127.0.0.1:5577";
int main(int argc, char* argv[])
{
srand((unsigned int)time(NULL));
setting::initlog(argv[0], true, 1);
setting::initEnableCompressSize(1024);
LOG(INFO) << "client start...";
std::vector<std::string> addressList{ ADDRESS };
MsgClient client(addressList, 0);
client.start();
client.waitConnected();
auto reqMsg = std::make_shared<ProtoSimple::PingPongReq>();
reqMsg->set_content(strutil::random_string(100));
std::cout << "send:" << reqMsg->content() << std::endl;
client.postMessage(reqMsg, [](int error, const PackagePtr &req, const PackagePtr &rsp) {
if (error == 0) {
auto rspMsg = static_cast<ProtoSimple::PingPongRsp*>(rsp->msg());
const std::string &rspstr = rspMsg->content();
std::cout << "response:" << rspstr << std::endl;
}
});
util::sleep(1000);
std::cout << "exit" << std::endl;
return 0;
}
发送异步请求,这是一个PingPong消息,服务端将请求的数据原样返回。请求和应答在同一个代码块,应答使用lambda表达式来处理。
主要步骤是先要定义ProtoSimple::PingPongReq和ProtoSimple::PingPongRsp这两个protobuf结构,然后将req发送出去,就可以获得rsp了,必须要配对,有请求就应该有应答,即使应答的内容为空。
服务端:
#include <iostream>
#include <easyio/proto/pb_simple.pb.h>
#include <easyio/msgclient.h>
#include <easyio/msgserver.h>
#include <easyio/msgbus.h>
#include <easyio/tool/util.h>
using namespace easyio;
int main(int argc, char* argv[])
{
setting::initlog(argv[0], true);
setting::initEnableCompressSize();
LOG(INFO) << "server start...";
MsgServer server(5577);
server.addHandleMessage(TYPE_NAME(ProtoSimple::PingPongReq), [](const SessionPtr &session, const PackagePtr &request) {
auto req = static_cast<ProtoSimple::PingPongReq*>(request->msgPtr.get());
auto rsp = std::make_shared<ProtoSimple::PingPongRsp>();
rsp->set_content(req->content());
session->replyMessage(request, rsp);
});
server.run();
LOG(INFO) << "server exit!!!";
return 0;
}
服务端看起来更简单,增加PingPongReq的消息处理,获取请求的数据将其赋值给应答,然后通过session调用replyMessage。
推送订阅代码演示
订阅方:
#include <iostream>
#include "easyio/proto/pb_simple.pb.h"
#include "easyio/msgclient.h"
#include "easyio/msgbus.h"
#include "easyio/tool/util.h"
#include "easyio/tool/strutil.h"
using namespace easyio;
const std::string ADDRESS = "127.0.0.1:5577";
int main(int argc, char* argv[])
{
setting::initlog(argv[0], true, 1);
setting::initEnableCompressSize(1024);
LOG(INFO) << "client start...";
std::vector<std::string> addressList{ ADDRESS };
MsgClient client(addressList, 0);
client.addHandlePublish(TYPE_NAME(ProtoSimple::ServerInfoPub), [](int error, const MessagePtr &pubMsgPtr) {
if (error == 0) {
auto info = static_cast<ProtoSimple::ServerInfoPub*>(pubMsgPtr.get());
std::cout << "pub:" << info->hello() << std::endl;
}
});
client.start();
client.waitConnected();
util::sleep(10000);
std::cout << "exit" << std::endl;
return 0;
}
使用addHandlePublish处理订阅的消息,就相当于你订阅了这个消息,在client start之后就会自动向服务端发送订阅的消息。
推送方:
#include <iostream>
#include <easyio/proto/pb_simple.pb.h>
#include <easyio/msgclient.h>
#include <easyio/msgserver.h>
#include <easyio/msgbus.h>
#include <easyio/tool/util.h>
using namespace easyio;
int main(int argc, char* argv[])
{
setting::initlog(argv[0], true);
setting::initEnableCompressSize();
LOG(INFO) << "server start...";
MsgServer server(5577);
std::thread t([&server]() {
while (1) {
auto info = std::make_shared<ProtoSimple::ServerInfoPub>();
std::string strtime = util::getFormatTime(util::currentMillisecond());
std::cout << "pub time:" << strtime << std::endl;
info->set_hello(strtime);
server.publishMessage(info);
util::sleep(1000);
}
});
t.detach();
server.run();
LOG(INFO) << "server exit!!!";
return 0;
}
推送方每秒推送当前时间给订阅方。
注意:server run之后才能进行推送,而run是一个io serivce会阻塞当前线程,所以这里是单独开一个线程来循环推送消息。
项目结构介绍
我给这个库取了个名字:easyio
easyio
internal
// 核心部分封装了通信逻辑,协议,线程池,任务管理
proto
// 内部protobuf协议定义,订阅、心跳协议等
tool
// 工具类
msgclient.h // 客户端调用的外部接口类
msgserver.h // 服务端调用的外部接口类
自定义协议
包有包头和包体两部分组合,如下:
#pragma pack(push,1)
struct PacHeader {
static const char flag0;
static const char flag1;
enum MsgType {
REQREP = 1,
PUBSUB,
DELIVER,
};
char flag[2]{flag0, flag1};
short msgType{ 0 };
short typeNameLen{ 0 };
union {
short extInfo{ 0 };
struct {
unsigned char iszip : 1;
unsigned char isorder : 1;
};
};
int msgId{ 0 };
int msgSize{ 0 };
int pacSize{ 0 };
};
#pragma pack(pop)
const int kPackageHeaderSize = sizeof(PacHeader);
const int kMsTimeout = 5000;
struct Package {
PacHeader header;
std::string typeName;
#ifdef MSGBUS
std::vector<char> body;
#endif
MessagePtr msgPtr;
inline google::protobuf::Message* msg() { return msgPtr.get(); }
};
包头
包头PacHeader固定大小20个字节,字节分别是:
1:flag0,固定标识(用来分隔不同的包)
2: flag1,固定标识
34:msgType,消息类型MsgType6:typeNameLen,protobuf名长度
5
78:extInfo,扩展信息,包括iszip是否压缩,isorder是否是order message12:msgId,消息ID
9
1316:msgSize,消息大小20:pacSize,包的大小
17
包体
包括:typename + msgbody
typename:protobuf的名字
msgbody:protobuf的二进制内容
包的编解码
编码将消包(包头+包体)转换为二进制流存入buffer类。
解码将网络过来的二进制流转换为包结构
需要注意的是,由于包头格式是固定的解码没什么问题,但是包体是各种数据结构,要一个一个的解析太麻烦了,而且不好扩展,这里就体现protobuf的优势来了。
protobuf支持反射可以根据protobuf类名创建出protobuf对象,然后调用protobuf对象的方法名传入二进制流参数进行初始化,方法如下:
google::protobuf::Message* ProtoHelp::createMessage(const std::string & typeName)
{
google::protobuf::Message *message = NULL;
const google::protobuf::Descriptor* desc = google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(typeName);
if (desc) {
const google::protobuf::Message *prototype = google::protobuf::MessageFactory::generated_factory()->GetPrototype(desc);
if (prototype) {
message = prototype->New();
}
}
return message;
}
google::protobuf::Message *msg = createMessage(typeName);
parseResult = msg->ParseFromArray("二进制流");
编码代码:
bool ProtoHelp::encode(const PackagePtr &package, BufferPtr &buffer)
{
if (package->typeName.empty()) {
return false;
}
std::string content;
content.resize(package->msgPtr->ByteSizeLong());
if (package->msgPtr->SerializePartialToArray(&content[0], content.size())) {
// need compress
if (ProtoHelp::enableCompressSize() >= 0 && (int)content.size() > ProtoHelp::enableCompressSize()) {
unsigned long bufferSize = compressBound(content.size());
std::string compressedBuffer(bufferSize, 0);
int errcode = compress((unsigned char*)&compressedBuffer[0], &bufferSize, (unsigned char*)&content[0], content.size());
if (errcode == Z_OK) {
LOG(INFO) << "compress success from:" << content.size() << ", to:" << bufferSize;
package->header.iszip = 1;
content.assign(&compressedBuffer[0], bufferSize);
} else {
LOG(ERROR) << "compress error:" << errcode << ", size:" << content.size();
}
}
package->typeName = package->msgPtr->GetTypeName();
package->header.typeNameLen = (short)package->typeName.size();
package->header.msgSize = package->msgPtr->ByteSizeLong();
package->header.pacSize = kPackageHeaderSize + package->typeName.size() + content.size();
bufferAppendHeader(*buffer.get(), package->header);
buffer->append(&package->typeName[0], package->header.typeNameLen);
buffer->append(content);
return true;
}
return false;
}
解码代码:
PackagePtr ProtoHelp::decode(Buffer &buf)
{
if (!gotoFlag(buf)) {
return PackagePtr();
}
PacHeader header;
bufferPeekHeader(buf, header);
// 数据不够等待下次读取更多的数据
if (buf.readableBytes() < (std::size_t)header.pacSize) {
return PackagePtr();
}
buf.retrieve(kPackageHeaderSize);
if (header.pacSize < 0 || header.pacSize > kMaxPackageLen) {
LOG(INFO) << "pac size error:" << header.pacSize;
return decode(buf);
}
if (header.typeNameLen <= 0 || header.typeNameLen > 1024) {
LOG(INFO) << "type name len error:" << header.typeNameLen;
return decode(buf);
}
std::string typeName = buf.retrieveAsString(header.typeNameLen);
google::protobuf::Message *msg = createMessage(typeName);
if (!msg) {
LOG(ERROR) << "create message failed:" << typeName;
return decode(buf);
}
bool parseResult = false;
int remainSize = header.pacSize - kPackageHeaderSize - header.typeNameLen;
if (header.iszip) {
unsigned long bufferSize = header.msgSize;
std::string unCompressedBuffer(bufferSize, 0);
int errcode = uncompress((unsigned char*)&unCompressedBuffer[0], &bufferSize, (const unsigned char*)buf.peek(), remainSize);
if (errcode == Z_OK) {
LOG(INFO) << "uncompress success, from:" << remainSize << ", to:" << unCompressedBuffer.size();
header.iszip = false;
header.pacSize = kPackageHeaderSize + header.typeNameLen + header.msgSize;
parseResult = msg->ParseFromArray(&unCompressedBuffer[0], bufferSize);
} else {
LOG(ERROR) << "uncompress error:" << errcode << ", size:" << bufferSize;
}
} else {
parseResult = msg->ParseFromArray(buf.peek(), header.msgSize);
}
if (parseResult) {
buf.retrieve(remainSize);
auto result = std::make_shared<Package>();
result->header = header;
result->typeName = typeName;
result->msgPtr = MessagePtr(msg);
return result;
}
LOG(ERROR) << "decode failed:" << typeName;
return PackagePtr();;
}
client封装
客户端只需要提供:
- 发送消息(同步、异步还有一个顺序消息),处理应答接口
- 设置连接成功后的回调
- 设置订阅推送消息的处理
namespace easyio
{
class PublishHandler;
class MsgClient {
public:
explicit MsgClient(const std::vector<std::string> &addressList, int heartbeatSeconds = 5);
~MsgClient();
MsgClient(const MsgClient&) = delete;
MsgClient& operator=(const MsgClient&) = delete;
void setConnectNotify(const std::function<void(bool)> &cb);
void addHandlePublish(const std::string &typeName, const PublishFunc &func);
void start();
void stop();
bool waitConnected(int timeoutSecond = 3);
int sendMessage(const MessagePtr &msgPtr, MessagePtr &rspPtr, int msTimeout = kMsTimeout);
int postMessage(const MessagePtr &msgPtr, const Response &res, int msTimeout = kMsTimeout);
int postOrderMessage(const MessagePtr &msgPtr, const Response &res, int msTimeout = kMsTimeout);
private:
std::shared_ptr<PublishHandler> publishHandler_;
D_PRIVATE(AsioClient);
};
}
服务端封装
服务端需要处理:
- 处理请求应答
- 提供推送消息的接口
class MsgServer {
public:
explicit MsgServer(unsigned short port);
explicit MsgServer(unsigned short port, unsigned int workSize, unsigned int poolSize);
~MsgServer();
MsgServer(const MsgServer&) = delete;
MsgServer& operator=(const MsgServer&) = delete;
void run();
void stop();
void addMethodHanlder(const MsgHandlerPtr &handler);
void addHandleMessage(const std::string &protoName, const Task &task);
void publishMessage(const MessagePtr &msg);
private:
std::vector<MsgHandlerPtr> handlers_;
D_PRIVATE(AsioServer)
};
session管理
每个客户端与服务端建立的连接我们就认为是一个session,用SessionRoom来管理session的新增和删除以及订阅推送消息。
class Session : public Publisher, public std::enable_shared_from_this<Session>
{
public:
explicit Session(tcp::socket socket, SessionRoom &room, TaskManager &taskManager);
Session(const Session&) = delete;
Session& operator=(const Session&) = delete;
~Session();
void start();
void replyMessage(const PackagePtr &req, const MessagePtr &rspMsg);
std::string remoteEndpoint() const;
void publishMessage(const MessagePtr &msg) override;
void deliverPackage(const PackagePtr &pac) override;
int id() const override;
void doRead();
void doWrite(boost::system::error_code ec, std::size_t length);
void writeBuffer(BufferPtr buffer);
void postPackage(const PackagePtr &pack);
private:
std::string endpoint_;
std::unique_ptr<SessionData> d;
SessionRoom &room_;
TaskManager &taskManager_;
};
任务管理
对于服务端而言,它面对的是成千上万的客户端连接以及大量的消息,既然有消息过来那就是要做事情,也就是有任务了,要想高效的处理就需要多线程。
对于底层的连接以及消息收发的性能有boost的asio来保证,这里不用我们来操心。
我们需要关注的业务处理的性能,也就是任务管理了
namespace easyio
{
class ThreadPool;
class Worker;
typedef std::shared_ptr<Worker> WorkerPtr;
typedef std::shared_ptr<ThreadPool> ThreadPoolPtr;
class TaskManager
{
public:
class TaskManagerPrivate {
public:
TaskManagerPrivate(unsigned int workSize, unsigned int poolSize);
void init(unsigned int workSize, unsigned int poolSize);
ThreadPoolPtr pool_;
std::map<std::string, Task> handler_;
std::vector<WorkerPtr> workers_;
};
TaskManager(unsigned int workSize, unsigned int poolSize);
TaskManager(TaskManager&) = delete;
TaskManager& operator=(TaskManager&) = delete;
void addHandleTask(const std::string &protoName, const Task &task);
void setPreHandleTask(const Task &task);
void handleMessage(const PackagePtr &msgPtr, const SessionPtr &sessionPtr);
void destory();
private:
Task preTask_{ nullptr };
D_PRIVATE(TaskManagerPrivate)
};
}
任务管理用到了线程池,多线程来处理任务,虽然很高效但是会带来一个问题,就是处理消息的顺序和应答的顺序并不是顺序的。
比如请求的顺序是:A,B,C
应答的顺序可能是:C,B,A
因为你只有处理完了消息才能应答,任务处理花费的时间越长应答越慢。
当然,这里说的是同一个session,不同session请求消息的顺序这里不考虑。
为了解决这个问题,对于要保证顺序的消息,我们根据session id把它路由到相同的线程处理,缺点是如果某个任务花费的时间较长会影响后续任务的执行,所以对于order message要尽量少用。
Buffer二进制流处理
Buffer类可以很方便的处理二进制流,自动扩容,基本类型的读写,读写指定大小的字节数。
要注意网络字节序与主机字节序的转换
日志
日志打印使用glog库
包压缩
使用zlib库解压缩,当包的大小超过设置的值时。太小的包不值得压缩。
msgbus
项目里有一些msgbus相关的代码,是用来实验消息代理转发的,这里可以不考虑。
源码地址
https://gitee.com/tujiaw/easyio.git
有兴趣的Star一下吧。