zookeeper c client封装

Table of Contents

    我们只实现两个需求

    • master选举,当master切换后需要收到通知
    • 注册服务信息

    注册到zookeeper上的目录结构如下:

    /myproject                # 项目顶层节点
    /myproject/master         # 临时节点,用来实现master选举
    /myproject/nodes          # node父节点
    /myproject/nodes/node     # 序列节点,所有节点都在这里
    

    代码:

    // ZKNode.h
    #pragma once
    #include <string>
    #include <memory>
    #include <zookeeper/zookeeper.h>
    #include <boost/function.hpp>
    #include <boost/thread.hpp>
    #include <boost/algorithm/string.hpp>
    
    class ZKData {
    public:
        ZKData& set(const std::string &key, const std::string &value);
        std::string get(const std::string &key) const;
        std::string encode() const;
        void decode(const std::string &str);
        void decode(const char *data, int len);
    
    private:
        std::map<std::string, std::string> data_;
    };
    
    class ZKNode
    {
    public:
        typedef boost::function<void(const ZKData &data)> OnHandle;
    
        ZKNode(const std::string &host, ZooLogLevel level = ZOO_LOG_LEVEL_WARN);
        ~ZKNode();
    
        void setOnMaster(const ZKData &masterData, const OnHandle &handle);
        void setOnSlave(const ZKData &slaveData, const OnHandle &handle);
        bool run();
    
        void close();
        bool isConnected() const;
        void setConnected(bool yes);
        void setExpired(bool yes);
        const std::string& path() const;
    
    private:
        ZKNode(const ZKNode&);
        ZKNode& operator=(const ZKNode&);
        bool bootstrap();
        void runForMaster();
        void checkMaster();
        int getMasterData(ZKData &data);
        void takeMaster();
        void takeSlave();
        void masterExists();
        static void masterCreateCompletion(int rc, const char *value, const void *data);
        static void masterCheckCompletion(int rc, const char *value, int value_len, const struct Stat *stat, const void *data);
        static void masterExistsWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx);
        static void masterExistsCompletion(int rc, const struct Stat *stat, const void *data);
        static void mainWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx);
    
    private:
        zhandle_t *handle_;
        std::string host_;
        int timeout_;
        std::string path_;
        bool connected_;
        bool expired_;
        ZKData masterData_;
        ZKData slaveData_;
        OnHandle onMaster_;
        OnHandle onSlave_;
    };
    
    
    // ZKNode.cpp
    #include "ZKNode.h"
    #include <assert.h>
    #include <time.h>
    #include <sstream>
    #include <malloc.h>
    #include "include/logger.h"
    
    const char* PATH_NEW_BOND =          "/myproject";            // 顶层节点
    const char* PATH_MASTER =            "/myproject/master";     // 临时节点
    const char* PATH_NODES_PARENT =      "/myproject/nodes";      // node父节点
    const char* PATH_NODES_CHILDREN =    "/myproject/nodes/node"; // 序列节点
    
    const int MAX_BUFFER_LEN = 1024;
    
    ZKData& ZKData::set(const std::string &key, const std::string &value)
    {
        data_[key] = value;
        return *this;
    }
    
    std::string ZKData::get(const std::string &key) const
    {
        std::map<std::string, std::string>::const_iterator iter = data_.find(key);
        if (iter != data_.cend()) {
            return iter->second;
        }
        return "";
    }
    
    std::string ZKData::encode() const
    {
        std::string result;
        std::map<std::string, std::string>::const_iterator iter = data_.cbegin();
        while (iter != data_.cend()) {
            result += iter->first + ":" + iter->second + ";";
            ++iter;
        }
        return result;
    }
    
    void ZKData::decode(const std::string &str)
    {
        data_.clear();
        std::vector<std::string> values;
        boost::split(values, str, boost::is_any_of(";"));
        for (std::vector<std::string>::iterator iter = values.begin(); iter != values.end(); ++iter) {
            std::vector<std::string> temp;
            boost::split(temp, *iter, boost::is_any_of(":"));
            if (temp.size() == 2) {
                data_[temp[0]] = temp[1];
            }
        }
    }
    
    void ZKData::decode(const char *data, int len) {
        std::string str;
        if (data && len) {
            str.resize(len + 1);
            memcpy(&str[0], data, len);
        }
        decode(str);
    }
    
    //////////////////////////////////////////////////////////////////////////
    ZKNode::ZKNode(const std::string &host, ZooLogLevel level)
        : handle_(NULL), host_(host), timeout_(3000), connected_(false), expired_(false)
    {
        zoo_set_debug_level(level);
    }
    
    ZKNode::~ZKNode()
    {
    }
    
    void ZKNode::setOnMaster(const ZKData &masterData, const OnHandle &handle)
    {
        masterData_ = masterData;
        onMaster_ = handle;
    }
    
    void ZKNode::setOnSlave(const ZKData &slaveData, const OnHandle &handle)
    {
        slaveData_ = slaveData;
        onSlave_ = handle;
    }
    
    bool ZKNode::run()
    {
        LOGGER_INFO("zknode run...");
        handle_ = zookeeper_init(host_.c_str(), ZKNode::mainWatcher, timeout_, 0, this, 0);
        if (!handle_) {
            LOGGER_INFO("zookeeper_init error");
            return false;
        }
    
        int elapsed = 0;
        const int MS_INTERVAL = 100;
        while (!isConnected() && elapsed < timeout_) {
            boost::this_thread::sleep_for(boost::chrono::milliseconds(MS_INTERVAL));
            elapsed += MS_INTERVAL;
        }
    
        if (!isConnected()) {
            LOGGER_INFO("zookeeper connect timeout");
            return false;
        }
    
        if (!bootstrap()) {
            return false;
        }
    
        runForMaster();
        return true;
    }
    
    void ZKNode::close()
    {
        LOGGER_INFO("ZKNode::close");
        if (handle_) {
            zookeeper_close(handle_);
            handle_ = NULL;
        }
    }
    
    bool ZKNode::isConnected() const
    {
        return connected_;
    }
    
    void ZKNode::setConnected(bool yes)
    {
        connected_ = yes;
    }
    
    void ZKNode::setExpired(bool yes)
    {
        expired_ = yes;
    }
    
    const std::string& ZKNode::path() const
    {
        return path_;
    }
    
    bool ZKNode::bootstrap()
    {
        LOGGER_INFO("bootstrap start");
        int ret = zoo_create(handle_, PATH_NEW_BOND, 0, 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
        if (ret != ZOK && ret != ZNODEEXISTS) {
            LOGGER_ERROR("zoo create error, path:" << PATH_NEW_BOND);
            return false;
        }
        
        ret = zoo_create(handle_, PATH_NODES_PARENT, 0, 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
        if (ret != ZOK && ret != ZNODEEXISTS) {
            LOGGER_ERROR("zoo create error, path:" << PATH_NODES_PARENT);
            return false;
        }
        
        char pathBuffer[MAX_BUFFER_LEN] = { 0 };
        int pathBufferLen = sizeof pathBuffer;
        ret = zoo_create(handle_, PATH_NODES_CHILDREN, 0, 0, &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE | ZOO_EPHEMERAL, pathBuffer, pathBufferLen);
        if (ret != ZOK) {
            LOGGER_ERROR("zoo create error, path:" << PATH_NODES_CHILDREN);
            return false;
        }
        
        path_ = pathBuffer;
        slaveData_.set("path", path_);
        std::string data = slaveData_.encode();
        ret = zoo_set(handle_, path_.c_str(), data.c_str(), data.size(), -1);
        if (ret != ZOK) {
            LOGGER_ERROR("zoo set nodes child data error, data:" << data);
            return false;
        }
        LOGGER_INFO("bootstrap end path:" << path_);
        return true;
    }
    
    void ZKNode::runForMaster()
    {
        LOGGER_INFO("runForMaster");
        if (!connected_) {
            LOGGER_ERROR("runForMaster error is not connected");
            return;
        }
    
        masterData_.set("path", path_);
        std::string data = masterData_.encode();
        zoo_acreate(handle_, PATH_MASTER, data.c_str(), data.size(), &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, ZKNode::masterCreateCompletion, this);
    }
    
    void ZKNode::checkMaster()
    {
        LOGGER_INFO("checkMaster");
        zoo_aget(handle_, PATH_MASTER, 0, ZKNode::masterCheckCompletion, this);
    }
    
    int ZKNode::getMasterData(ZKData &data)
    {
        LOGGER_INFO("getMasterData");
        char buffer[MAX_BUFFER_LEN] = { 0 };
        int bufferLen = sizeof buffer;
        struct Stat stat;
        int ret = zoo_get(handle_, PATH_MASTER, 0, buffer, &bufferLen, &stat);
        data.decode(buffer, bufferLen);
        return ret;
    }
    
    void ZKNode::takeMaster()
    {
        LOGGER_INFO("==========================this is master====================");
        ZKData data;
        getMasterData(data);
        if (onMaster_) {
            onMaster_(data);
        }
    }
    
    void ZKNode::takeSlave()
    {
        LOGGER_INFO("==========================this is slave====================");
        ZKData data;
        getMasterData(data);
        if (onSlave_) {
            onSlave_(data);
        }
    }
    
    void ZKNode::masterExists()
    {
        zoo_awexists(handle_, PATH_MASTER, ZKNode::masterExistsWatcher, this, ZKNode::masterExistsCompletion, this);
    }
    
    void ZKNode::masterCreateCompletion(int rc, const char *value, const void *data) {
        LOGGER_INFO("masterCreateCompletion rc:" << rc);
        ZKNode *zk = static_cast<ZKNode*>(const_cast<void*>(data));
        switch (rc) {
        case ZCONNECTIONLOSS:
        case ZOPERATIONTIMEOUT:
            zk->checkMaster();
            break;
        case ZOK:
            zk->takeMaster();
            zk->masterExists();
            break;
        case ZNODEEXISTS:
            zk->takeSlave();
            zk->masterExists();
            break;
        default:
            LOGGER_ERROR("Something went wrong when running for master");
            break;
        }
    }
    
    void ZKNode::masterCheckCompletion(int rc, const char *value, int value_len, const struct Stat *stat, const void *data)
    {
        std::string strvalue = value ? value : "";
        LOGGER_INFO("masterCheckCompletion rc:" << rc << ", value:" << strvalue);
        ZKNode *zk = static_cast<ZKNode*>(const_cast<void*>(data));
        switch (rc) {
        case ZCONNECTIONLOSS:
        case ZOPERATIONTIMEOUT:
            zk->checkMaster();
            break;
        case ZOK:
            if (!strvalue.empty() && strvalue == zk->path_) {
                zk->takeMaster();
            } else {
                zk->masterExists();
            }
            break;
        case ZNONODE:
            zk->runForMaster();
            break;
        default:
            LOGGER_ERROR("Something went wrong when checking the master lock, rc:" << rc);
            break;
        }
    }
    
    void ZKNode::masterExistsWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx)
    {
        LOGGER_INFO("masterExistsWatcher type:" << type);
        ZKNode *zk = static_cast<ZKNode*>(watcherCtx);
        if (type == ZOO_DELETED_EVENT) {
            assert(!strcmp(path, PATH_MASTER));
            zk->runForMaster();
        }
    }
    
    void ZKNode::masterExistsCompletion(int rc, const struct Stat *stat, const void *data)
    {
        ZKNode *zk = static_cast<ZKNode*>(const_cast<void*>(data));
        switch (rc) {
        case ZCONNECTIONLOSS:
        case ZOPERATIONTIMEOUT:
            zk->masterExists();
            break;
        case ZOK:
            break;
        case ZNONODE:
            LOGGER_INFO("masterExistsCompletion previous master is gone, running for master");
            zk->runForMaster();
            break;
        default:
            LOGGER_ERROR("Something went wrong when executing exists, rc:" << rc);
    
            break;
        }
    }
    
    void ZKNode::mainWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx)
    {
        LOGGER_INFO("ZKNode::mainWatcher type:" << type << ", state:" << state);
        ZKNode *zk = static_cast<ZKNode*>(watcherCtx);
        assert(zk != NULL);
        if (type == ZOO_SESSION_EVENT) {
            if (state == ZOO_CONNECTED_STATE) {
                zk->setConnected(true);
            } else if (state == ZOO_CONNECTING_STATE) {
                if (zk->isConnected()) {
                    LOGGER_WARN("ZKNode::mainWatcher Disconnected.");
                }
                zk->setConnected(false);
            } else if (state == ZOO_EXPIRED_SESSION_STATE) {
                LOGGER_WARN("ZKNode::mainWatcher session expired will close");
                zk->setExpired(true);
                zk->setConnected(false);
                zk->close();
            }
        }
    }