Skip to content

zookeeper c client封装

Published: at 07:20 AM | 6 min read

我们只实现两个需求

注册到zookeeper上的目录结构如下:

/myproject                # 项目顶层节点
/myproject/master         # 临时节点,用来实现master选举
/myproject/nodes          # node父节点
/myproject/nodes/node     # 序列节点,所有节点都在这里

代码:

// ZKNode.h
#pragma once
#include <string>
#include <memory>
#include <zookeeper/zookeeper.h>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <boost/algorithm/string.hpp>

class ZKData {
public:
    ZKData& set(const std::string &key, const std::string &value);
    std::string get(const std::string &key) const;
    std::string encode() const;
    void decode(const std::string &str);
    void decode(const char *data, int len);

private:
    std::map<std::string, std::string> data_;
};

class ZKNode
{
public:
    typedef boost::function<void(const ZKData &data)> OnHandle;

    ZKNode(const std::string &host, ZooLogLevel level = ZOO_LOG_LEVEL_WARN);
    ~ZKNode();

    void setOnMaster(const ZKData &masterData, const OnHandle &handle);
    void setOnSlave(const ZKData &slaveData, const OnHandle &handle);
    bool run();

    void close();
    bool isConnected() const;
    void setConnected(bool yes);
    void setExpired(bool yes);
    const std::string& path() const;

private:
    ZKNode(const ZKNode&);
    ZKNode& operator=(const ZKNode&);
    bool bootstrap();
    void runForMaster();
    void checkMaster();
    int getMasterData(ZKData &data);
    void takeMaster();
    void takeSlave();
    void masterExists();
    static void masterCreateCompletion(int rc, const char *value, const void *data);
    static void masterCheckCompletion(int rc, const char *value, int value_len, const struct Stat *stat, const void *data);
    static void masterExistsWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx);
    static void masterExistsCompletion(int rc, const struct Stat *stat, const void *data);
    static void mainWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx);

private:
    zhandle_t *handle_;
    std::string host_;
    int timeout_;
    std::string path_;
    bool connected_;
    bool expired_;
    ZKData masterData_;
    ZKData slaveData_;
    OnHandle onMaster_;
    OnHandle onSlave_;
};

// ZKNode.cpp
#include "ZKNode.h"
#include <assert.h>
#include <time.h>
#include <sstream>
#include <malloc.h>
#include "include/logger.h"

const char* PATH_NEW_BOND =          "/myproject";            // 顶层节点
const char* PATH_MASTER =            "/myproject/master";     // 临时节点
const char* PATH_NODES_PARENT =      "/myproject/nodes";      // node父节点
const char* PATH_NODES_CHILDREN =    "/myproject/nodes/node"; // 序列节点

const int MAX_BUFFER_LEN = 1024;

ZKData& ZKData::set(const std::string &key, const std::string &value)
{
    data_[key] = value;
    return *this;
}

std::string ZKData::get(const std::string &key) const
{
    std::map<std::string, std::string>::const_iterator iter = data_.find(key);
    if (iter != data_.cend()) {
        return iter->second;
    }
    return "";
}

std::string ZKData::encode() const
{
    std::string result;
    std::map<std::string, std::string>::const_iterator iter = data_.cbegin();
    while (iter != data_.cend()) {
        result += iter->first + ":" + iter->second + ";";
        ++iter;
    }
    return result;
}

void ZKData::decode(const std::string &str)
{
    data_.clear();
    std::vector<std::string> values;
    boost::split(values, str, boost::is_any_of(";"));
    for (std::vector<std::string>::iterator iter = values.begin(); iter != values.end(); ++iter) {
        std::vector<std::string> temp;
        boost::split(temp, *iter, boost::is_any_of(":"));
        if (temp.size() == 2) {
            data_[temp[0]] = temp[1];
        }
    }
}

void ZKData::decode(const char *data, int len) {
    std::string str;
    if (data && len) {
        str.resize(len + 1);
        memcpy(&str[0], data, len);
    }
    decode(str);
}

//////////////////////////////////////////////////////////////////////////
ZKNode::ZKNode(const std::string &host, ZooLogLevel level)
    : handle_(NULL), host_(host), timeout_(3000), connected_(false), expired_(false)
{
    zoo_set_debug_level(level);
}

ZKNode::~ZKNode()
{
}

void ZKNode::setOnMaster(const ZKData &masterData, const OnHandle &handle)
{
    masterData_ = masterData;
    onMaster_ = handle;
}

void ZKNode::setOnSlave(const ZKData &slaveData, const OnHandle &handle)
{
    slaveData_ = slaveData;
    onSlave_ = handle;
}

bool ZKNode::run()
{
    LOGGER_INFO("zknode run...");
    handle_ = zookeeper_init(host_.c_str(), ZKNode::mainWatcher, timeout_, 0, this, 0);
    if (!handle_) {
        LOGGER_INFO("zookeeper_init error");
        return false;
    }

    int elapsed = 0;
    const int MS_INTERVAL = 100;
    while (!isConnected() && elapsed < timeout_) {
        boost::this_thread::sleep_for(boost::chrono::milliseconds(MS_INTERVAL));
        elapsed += MS_INTERVAL;
    }

    if (!isConnected()) {
        LOGGER_INFO("zookeeper connect timeout");
        return false;
    }

    if (!bootstrap()) {
        return false;
    }

    runForMaster();
    return true;
}

void ZKNode::close()
{
    LOGGER_INFO("ZKNode::close");
    if (handle_) {
        zookeeper_close(handle_);
        handle_ = NULL;
    }
}

bool ZKNode::isConnected() const
{
    return connected_;
}

void ZKNode::setConnected(bool yes)
{
    connected_ = yes;
}

void ZKNode::setExpired(bool yes)
{
    expired_ = yes;
}

const std::string& ZKNode::path() const
{
    return path_;
}

bool ZKNode::bootstrap()
{
    LOGGER_INFO("bootstrap start");
    int ret = zoo_create(handle_, PATH_NEW_BOND, 0, 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
    if (ret != ZOK && ret != ZNODEEXISTS) {
        LOGGER_ERROR("zoo create error, path:" << PATH_NEW_BOND);
        return false;
    }
    
    ret = zoo_create(handle_, PATH_NODES_PARENT, 0, 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
    if (ret != ZOK && ret != ZNODEEXISTS) {
        LOGGER_ERROR("zoo create error, path:" << PATH_NODES_PARENT);
        return false;
    }
    
    char pathBuffer[MAX_BUFFER_LEN] = { 0 };
    int pathBufferLen = sizeof pathBuffer;
    ret = zoo_create(handle_, PATH_NODES_CHILDREN, 0, 0, &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE | ZOO_EPHEMERAL, pathBuffer, pathBufferLen);
    if (ret != ZOK) {
        LOGGER_ERROR("zoo create error, path:" << PATH_NODES_CHILDREN);
        return false;
    }
    
    path_ = pathBuffer;
    slaveData_.set("path", path_);
    std::string data = slaveData_.encode();
    ret = zoo_set(handle_, path_.c_str(), data.c_str(), data.size(), -1);
    if (ret != ZOK) {
        LOGGER_ERROR("zoo set nodes child data error, data:" << data);
        return false;
    }
    LOGGER_INFO("bootstrap end path:" << path_);
    return true;
}

void ZKNode::runForMaster()
{
    LOGGER_INFO("runForMaster");
    if (!connected_) {
        LOGGER_ERROR("runForMaster error is not connected");
        return;
    }

    masterData_.set("path", path_);
    std::string data = masterData_.encode();
    zoo_acreate(handle_, PATH_MASTER, data.c_str(), data.size(), &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, ZKNode::masterCreateCompletion, this);
}

void ZKNode::checkMaster()
{
    LOGGER_INFO("checkMaster");
    zoo_aget(handle_, PATH_MASTER, 0, ZKNode::masterCheckCompletion, this);
}

int ZKNode::getMasterData(ZKData &data)
{
    LOGGER_INFO("getMasterData");
    char buffer[MAX_BUFFER_LEN] = { 0 };
    int bufferLen = sizeof buffer;
    struct Stat stat;
    int ret = zoo_get(handle_, PATH_MASTER, 0, buffer, &bufferLen, &stat);
    data.decode(buffer, bufferLen);
    return ret;
}

void ZKNode::takeMaster()
{
    LOGGER_INFO("==========================this is master====================");
    ZKData data;
    getMasterData(data);
    if (onMaster_) {
        onMaster_(data);
    }
}

void ZKNode::takeSlave()
{
    LOGGER_INFO("==========================this is slave====================");
    ZKData data;
    getMasterData(data);
    if (onSlave_) {
        onSlave_(data);
    }
}

void ZKNode::masterExists()
{
    zoo_awexists(handle_, PATH_MASTER, ZKNode::masterExistsWatcher, this, ZKNode::masterExistsCompletion, this);
}

void ZKNode::masterCreateCompletion(int rc, const char *value, const void *data) {
    LOGGER_INFO("masterCreateCompletion rc:" << rc);
    ZKNode *zk = static_cast<ZKNode*>(const_cast<void*>(data));
    switch (rc) {
    case ZCONNECTIONLOSS:
    case ZOPERATIONTIMEOUT:
        zk->checkMaster();
        break;
    case ZOK:
        zk->takeMaster();
        zk->masterExists();
        break;
    case ZNODEEXISTS:
        zk->takeSlave();
        zk->masterExists();
        break;
    default:
        LOGGER_ERROR("Something went wrong when running for master");
        break;
    }
}

void ZKNode::masterCheckCompletion(int rc, const char *value, int value_len, const struct Stat *stat, const void *data)
{
    std::string strvalue = value ? value : "";
    LOGGER_INFO("masterCheckCompletion rc:" << rc << ", value:" << strvalue);
    ZKNode *zk = static_cast<ZKNode*>(const_cast<void*>(data));
    switch (rc) {
    case ZCONNECTIONLOSS:
    case ZOPERATIONTIMEOUT:
        zk->checkMaster();
        break;
    case ZOK:
        if (!strvalue.empty() && strvalue == zk->path_) {
            zk->takeMaster();
        } else {
            zk->masterExists();
        }
        break;
    case ZNONODE:
        zk->runForMaster();
        break;
    default:
        LOGGER_ERROR("Something went wrong when checking the master lock, rc:" << rc);
        break;
    }
}

void ZKNode::masterExistsWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx)
{
    LOGGER_INFO("masterExistsWatcher type:" << type);
    ZKNode *zk = static_cast<ZKNode*>(watcherCtx);
    if (type == ZOO_DELETED_EVENT) {
        assert(!strcmp(path, PATH_MASTER));
        zk->runForMaster();
    }
}

void ZKNode::masterExistsCompletion(int rc, const struct Stat *stat, const void *data)
{
    ZKNode *zk = static_cast<ZKNode*>(const_cast<void*>(data));
    switch (rc) {
    case ZCONNECTIONLOSS:
    case ZOPERATIONTIMEOUT:
        zk->masterExists();
        break;
    case ZOK:
        break;
    case ZNONODE:
        LOGGER_INFO("masterExistsCompletion previous master is gone, running for master");
        zk->runForMaster();
        break;
    default:
        LOGGER_ERROR("Something went wrong when executing exists, rc:" << rc);

        break;
    }
}

void ZKNode::mainWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx)
{
    LOGGER_INFO("ZKNode::mainWatcher type:" << type << ", state:" << state);
    ZKNode *zk = static_cast<ZKNode*>(watcherCtx);
    assert(zk != NULL);
    if (type == ZOO_SESSION_EVENT) {
        if (state == ZOO_CONNECTED_STATE) {
            zk->setConnected(true);
        } else if (state == ZOO_CONNECTING_STATE) {
            if (zk->isConnected()) {
                LOGGER_WARN("ZKNode::mainWatcher Disconnected.");
            }
            zk->setConnected(false);
        } else if (state == ZOO_EXPIRED_SESSION_STATE) {
            LOGGER_WARN("ZKNode::mainWatcher session expired will close");
            zk->setExpired(true);
            zk->setConnected(false);
            zk->close();
        }
    }
}