Skip to content

boost asio + protobuf封装通信库

Published: at 10:35 AM | 13 min read

方了方便客户端服务端网络部分的开发,使用boost asio和protobuf封装了一个通信库。

特点

请求应答代码演示

客户端:

#include <iostream>
#include "easyio/proto/pb_simple.pb.h"
#include "easyio/msgclient.h"
#include "easyio/msgbus.h"
#include "easyio/tool/util.h"
#include "easyio/tool/strutil.h"

using namespace easyio;
const std::string ADDRESS = "127.0.0.1:5577";
int main(int argc, char* argv[])
{
    srand((unsigned int)time(NULL));
    setting::initlog(argv[0], true, 1);
    setting::initEnableCompressSize(1024);
    LOG(INFO) << "client start...";

    std::vector<std::string> addressList{ ADDRESS };
    MsgClient client(addressList, 0);
    client.start();
    client.waitConnected();

    auto reqMsg = std::make_shared<ProtoSimple::PingPongReq>();
    reqMsg->set_content(strutil::random_string(100));
    std::cout << "send:" << reqMsg->content() << std::endl;
    client.postMessage(reqMsg, [](int error, const PackagePtr &req, const PackagePtr &rsp) {
        if (error == 0) {
            auto rspMsg = static_cast<ProtoSimple::PingPongRsp*>(rsp->msg());
            const std::string &rspstr = rspMsg->content();
            std::cout << "response:" << rspstr << std::endl;
        }
    });

    util::sleep(1000);
    std::cout << "exit" << std::endl;
    return 0;
}

发送异步请求,这是一个PingPong消息,服务端将请求的数据原样返回。请求和应答在同一个代码块,应答使用lambda表达式来处理。

主要步骤是先要定义ProtoSimple::PingPongReq和ProtoSimple::PingPongRsp这两个protobuf结构,然后将req发送出去,就可以获得rsp了,必须要配对,有请求就应该有应答,即使应答的内容为空。

服务端:

#include <iostream>
#include <easyio/proto/pb_simple.pb.h>
#include <easyio/msgclient.h>
#include <easyio/msgserver.h>
#include <easyio/msgbus.h>
#include <easyio/tool/util.h>

using namespace easyio;
int main(int argc, char* argv[])
{
    setting::initlog(argv[0], true);
    setting::initEnableCompressSize();
    LOG(INFO) << "server start...";

    MsgServer server(5577);
    server.addHandleMessage(TYPE_NAME(ProtoSimple::PingPongReq), [](const SessionPtr &session, const PackagePtr &request) {
        auto req = static_cast<ProtoSimple::PingPongReq*>(request->msgPtr.get());
        auto rsp = std::make_shared<ProtoSimple::PingPongRsp>();
        rsp->set_content(req->content());
        session->replyMessage(request, rsp);
    });

    server.run();
    LOG(INFO) << "server exit!!!";

    return 0;
}

服务端看起来更简单,增加PingPongReq的消息处理,获取请求的数据将其赋值给应答,然后通过session调用replyMessage。

推送订阅代码演示

订阅方:

#include <iostream>
#include "easyio/proto/pb_simple.pb.h"
#include "easyio/msgclient.h"
#include "easyio/msgbus.h"
#include "easyio/tool/util.h"
#include "easyio/tool/strutil.h"

using namespace easyio;
const std::string ADDRESS = "127.0.0.1:5577";
int main(int argc, char* argv[])
{
    setting::initlog(argv[0], true, 1);
    setting::initEnableCompressSize(1024);
    LOG(INFO) << "client start...";

    std::vector<std::string> addressList{ ADDRESS };
    MsgClient client(addressList, 0);
    client.addHandlePublish(TYPE_NAME(ProtoSimple::ServerInfoPub), [](int error, const MessagePtr &pubMsgPtr) {
        if (error == 0) {
            auto info = static_cast<ProtoSimple::ServerInfoPub*>(pubMsgPtr.get());
            std::cout << "pub:" << info->hello() << std::endl;
        }
    });

    client.start();
    client.waitConnected();

    util::sleep(10000);
    std::cout << "exit" << std::endl;
    return 0;
}

使用addHandlePublish处理订阅的消息,就相当于你订阅了这个消息,在client start之后就会自动向服务端发送订阅的消息。

推送方:

#include <iostream>
#include <easyio/proto/pb_simple.pb.h>
#include <easyio/msgclient.h>
#include <easyio/msgserver.h>
#include <easyio/msgbus.h>
#include <easyio/tool/util.h>

using namespace easyio;
int main(int argc, char* argv[])
{
    setting::initlog(argv[0], true);
    setting::initEnableCompressSize();
    LOG(INFO) << "server start...";

    MsgServer server(5577);
    std::thread t([&server]() {
        while (1) {
            auto info = std::make_shared<ProtoSimple::ServerInfoPub>();
            std::string strtime = util::getFormatTime(util::currentMillisecond());
            std::cout << "pub time:" << strtime << std::endl;
            info->set_hello(strtime);
            server.publishMessage(info);
            util::sleep(1000);
        }
    });  
    t.detach();  
    server.run();
    LOG(INFO) << "server exit!!!";

    return 0;
}

推送方每秒推送当前时间给订阅方。
注意:server run之后才能进行推送,而run是一个io serivce会阻塞当前线程,所以这里是单独开一个线程来循环推送消息。

项目结构介绍

我给这个库取了个名字:easyio

easyio
  internal
    // 核心部分封装了通信逻辑,协议,线程池,任务管理
  proto
    // 内部protobuf协议定义,订阅、心跳协议等
  tool
    // 工具类
  msgclient.h // 客户端调用的外部接口类
  msgserver.h // 服务端调用的外部接口类

自定义协议

包有包头和包体两部分组合,如下:

#pragma pack(push,1)
    struct PacHeader {
        static const char flag0;
        static const char flag1;

        enum MsgType {
            REQREP = 1,
            PUBSUB,
            DELIVER,
        };

        char flag[2]{flag0, flag1};
        short msgType{ 0 };
        short typeNameLen{ 0 };
        union {
            short extInfo{ 0 };
            struct {
                unsigned char iszip : 1;
                unsigned char isorder : 1;
            };
        };
        int msgId{ 0 };
        int msgSize{ 0 };
        int pacSize{ 0 };
    };
#pragma pack(pop)

    const int kPackageHeaderSize = sizeof(PacHeader);
    const int kMsTimeout = 5000;

    struct Package {
        PacHeader header;
        std::string typeName;
#ifdef MSGBUS
        std::vector<char> body;
#endif
        MessagePtr msgPtr;
        inline google::protobuf::Message* msg() { return msgPtr.get(); }
    };

包头

包头PacHeader固定大小20个字节,字节分别是:
1:flag0,固定标识(用来分隔不同的包)
2: flag1,固定标识
34:msgType,消息类型MsgType
5
6:typeNameLen,protobuf名长度
78:extInfo,扩展信息,包括iszip是否压缩,isorder是否是order message
9
12:msgId,消息ID
1316:msgSize,消息大小
17
20:pacSize,包的大小

包体

包括:typename + msgbody
typename:protobuf的名字
msgbody:protobuf的二进制内容

包的编解码

编码将消包(包头+包体)转换为二进制流存入buffer类。

解码将网络过来的二进制流转换为包结构

需要注意的是,由于包头格式是固定的解码没什么问题,但是包体是各种数据结构,要一个一个的解析太麻烦了,而且不好扩展,这里就体现protobuf的优势来了。

protobuf支持反射可以根据protobuf类名创建出protobuf对象,然后调用protobuf对象的方法名传入二进制流参数进行初始化,方法如下:

    google::protobuf::Message* ProtoHelp::createMessage(const std::string & typeName)
    {
        google::protobuf::Message *message = NULL;
        const google::protobuf::Descriptor* desc = google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(typeName);
        if (desc) {
            const google::protobuf::Message *prototype = google::protobuf::MessageFactory::generated_factory()->GetPrototype(desc);
            if (prototype) {
                message = prototype->New();
            }
        }
        return message;
    }
    
    google::protobuf::Message *msg = createMessage(typeName);
    parseResult = msg->ParseFromArray("二进制流");

编码代码:

bool ProtoHelp::encode(const PackagePtr &package, BufferPtr &buffer)
    {
        if (package->typeName.empty()) {
            return false;
        }

        std::string content;
        content.resize(package->msgPtr->ByteSizeLong());
        if (package->msgPtr->SerializePartialToArray(&content[0], content.size())) {
            // need compress
            if (ProtoHelp::enableCompressSize() >= 0 && (int)content.size() > ProtoHelp::enableCompressSize()) {
                unsigned long bufferSize = compressBound(content.size());
                std::string compressedBuffer(bufferSize, 0);
                int errcode = compress((unsigned char*)&compressedBuffer[0], &bufferSize, (unsigned char*)&content[0], content.size());
                if (errcode == Z_OK) {
                    LOG(INFO) << "compress success from:" << content.size() << ", to:" << bufferSize;
                    package->header.iszip = 1;
                    content.assign(&compressedBuffer[0], bufferSize);
                } else {
                    LOG(ERROR) << "compress error:" << errcode << ", size:" << content.size();
                }
            }

            package->typeName = package->msgPtr->GetTypeName();
            package->header.typeNameLen = (short)package->typeName.size();
            package->header.msgSize = package->msgPtr->ByteSizeLong();
            package->header.pacSize = kPackageHeaderSize + package->typeName.size() + content.size();

            bufferAppendHeader(*buffer.get(), package->header);
            buffer->append(&package->typeName[0], package->header.typeNameLen);
            buffer->append(content);
            return true;
        }
        return false;
    }

解码代码:

PackagePtr ProtoHelp::decode(Buffer &buf)
    {
        if (!gotoFlag(buf)) {
            return PackagePtr();
        }

        PacHeader header;
        bufferPeekHeader(buf, header);

        // 数据不够等待下次读取更多的数据
        if (buf.readableBytes() < (std::size_t)header.pacSize) {
            return PackagePtr();
        }

        buf.retrieve(kPackageHeaderSize);
        if (header.pacSize < 0 || header.pacSize > kMaxPackageLen) {
            LOG(INFO) << "pac size error:" << header.pacSize;
            return decode(buf);
        }

        if (header.typeNameLen <= 0 || header.typeNameLen > 1024) {
            LOG(INFO) << "type name len error:" << header.typeNameLen;
            return decode(buf);
        }

        std::string typeName = buf.retrieveAsString(header.typeNameLen);
        google::protobuf::Message *msg = createMessage(typeName);
        if (!msg) {
            LOG(ERROR) << "create message failed:" << typeName;
            return decode(buf);
        }

        bool parseResult = false;
        int remainSize = header.pacSize - kPackageHeaderSize - header.typeNameLen;
        if (header.iszip) {
            unsigned long bufferSize = header.msgSize;
            std::string unCompressedBuffer(bufferSize, 0);
            int errcode = uncompress((unsigned char*)&unCompressedBuffer[0], &bufferSize, (const unsigned char*)buf.peek(), remainSize);
            if (errcode == Z_OK) {
                LOG(INFO) << "uncompress success, from:" << remainSize << ", to:" << unCompressedBuffer.size();
                header.iszip = false;
                header.pacSize = kPackageHeaderSize + header.typeNameLen + header.msgSize;
                parseResult = msg->ParseFromArray(&unCompressedBuffer[0], bufferSize);
            } else {
                LOG(ERROR) << "uncompress error:" << errcode << ", size:" << bufferSize;
            }
        } else {
            parseResult = msg->ParseFromArray(buf.peek(), header.msgSize);
        }

        if (parseResult) {
            buf.retrieve(remainSize);
            auto result = std::make_shared<Package>();
            result->header = header;
            result->typeName = typeName;
            result->msgPtr = MessagePtr(msg);
            return result;
        }

        LOG(ERROR) << "decode failed:" << typeName;
        return PackagePtr();;
    }

client封装

客户端只需要提供:

namespace easyio
{
    class PublishHandler;
    class MsgClient {
    public:
        explicit MsgClient(const std::vector<std::string> &addressList, int heartbeatSeconds = 5);
        ~MsgClient();
        MsgClient(const MsgClient&) = delete;
        MsgClient& operator=(const MsgClient&) = delete;

        void setConnectNotify(const std::function<void(bool)> &cb);
        void addHandlePublish(const std::string &typeName, const PublishFunc &func);
        void start();
        void stop();
        bool waitConnected(int timeoutSecond = 3);

        int sendMessage(const MessagePtr &msgPtr, MessagePtr &rspPtr, int msTimeout = kMsTimeout);
        int postMessage(const MessagePtr &msgPtr, const Response &res, int msTimeout = kMsTimeout);
        int postOrderMessage(const MessagePtr &msgPtr, const Response &res, int msTimeout = kMsTimeout);

    private:
        std::shared_ptr<PublishHandler> publishHandler_;
        D_PRIVATE(AsioClient);
    };
}

服务端封装

服务端需要处理:

    class MsgServer {
    public:
        explicit MsgServer(unsigned short port);
        explicit MsgServer(unsigned short port, unsigned int workSize, unsigned int poolSize);
        ~MsgServer();
        MsgServer(const MsgServer&) = delete;
        MsgServer& operator=(const MsgServer&) = delete;

        void run();
        void stop();
        void addMethodHanlder(const MsgHandlerPtr &handler);
        void addHandleMessage(const std::string &protoName, const Task &task);
        void publishMessage(const MessagePtr &msg);

    private:
        std::vector<MsgHandlerPtr> handlers_;
        D_PRIVATE(AsioServer)
    };

session管理

每个客户端与服务端建立的连接我们就认为是一个session,用SessionRoom来管理session的新增和删除以及订阅推送消息。

    class Session : public Publisher, public std::enable_shared_from_this<Session>
    {
    public:
        explicit Session(tcp::socket socket, SessionRoom &room, TaskManager &taskManager);
        Session(const Session&) = delete;
        Session& operator=(const Session&) = delete;
        ~Session();

        void start();
        void replyMessage(const PackagePtr &req, const MessagePtr &rspMsg);
        std::string remoteEndpoint() const;

        void publishMessage(const MessagePtr &msg) override;
        void deliverPackage(const PackagePtr &pac) override;
        int id() const override;

        void doRead();
        void doWrite(boost::system::error_code ec, std::size_t length);
        void writeBuffer(BufferPtr buffer);
        void postPackage(const PackagePtr &pack);

    private:
        std::string endpoint_;
        std::unique_ptr<SessionData> d;
        SessionRoom &room_;
        TaskManager &taskManager_;
    };

任务管理

对于服务端而言,它面对的是成千上万的客户端连接以及大量的消息,既然有消息过来那就是要做事情,也就是有任务了,要想高效的处理就需要多线程。

对于底层的连接以及消息收发的性能有boost的asio来保证,这里不用我们来操心。

我们需要关注的业务处理的性能,也就是任务管理了

namespace easyio
{
    class ThreadPool;
    class Worker;
    typedef std::shared_ptr<Worker> WorkerPtr;
    typedef std::shared_ptr<ThreadPool> ThreadPoolPtr;

    class TaskManager
    {
    public:
        class TaskManagerPrivate {
        public:
            TaskManagerPrivate(unsigned int workSize, unsigned int poolSize);
            void init(unsigned int workSize, unsigned int poolSize);
            ThreadPoolPtr pool_;
            std::map<std::string, Task> handler_;
            std::vector<WorkerPtr> workers_;
        };

        TaskManager(unsigned int workSize, unsigned int poolSize);
        TaskManager(TaskManager&) = delete;
        TaskManager& operator=(TaskManager&) = delete;
        void addHandleTask(const std::string &protoName, const Task &task);
        void setPreHandleTask(const Task &task);
        void handleMessage(const PackagePtr &msgPtr, const SessionPtr &sessionPtr);
        void destory();

    private:
        Task preTask_{ nullptr };
        D_PRIVATE(TaskManagerPrivate)
    };
}

任务管理用到了线程池,多线程来处理任务,虽然很高效但是会带来一个问题,就是处理消息的顺序和应答的顺序并不是顺序的。

比如请求的顺序是:A,B,C
应答的顺序可能是:C,B,A

因为你只有处理完了消息才能应答,任务处理花费的时间越长应答越慢。

当然,这里说的是同一个session,不同session请求消息的顺序这里不考虑。

为了解决这个问题,对于要保证顺序的消息,我们根据session id把它路由到相同的线程处理,缺点是如果某个任务花费的时间较长会影响后续任务的执行,所以对于order message要尽量少用。

Buffer二进制流处理

Buffer类可以很方便的处理二进制流,自动扩容,基本类型的读写,读写指定大小的字节数。

要注意网络字节序与主机字节序的转换

日志

日志打印使用glog库

包压缩

使用zlib库解压缩,当包的大小超过设置的值时。太小的包不值得压缩。

msgbus

项目里有一些msgbus相关的代码,是用来实验消息代理转发的,这里可以不考虑。

源码地址

https://gitee.com/tujiaw/easyio.git

有兴趣的Star一下吧。