文章标题 原创 翻译 转载 文章内容 我们只实现两个需求 * master选举,当master切换后需要收到通知 * 注册服务信息 注册到zookeeper上的目录结构如下: ``` /myproject # 项目顶层节点 /myproject/master # 临时节点,用来实现master选举 /myproject/nodes # node父节点 /myproject/nodes/node # 序列节点,所有节点都在这里 ``` 代码: ``` // ZKNode.h #pragma once #include <string> #include <memory> #include <zookeeper/zookeeper.h> #include <boost/function.hpp> #include <boost/thread.hpp> #include <boost/algorithm/string.hpp> class ZKData { public: ZKData& set(const std::string &key, const std::string &value); std::string get(const std::string &key) const; std::string encode() const; void decode(const std::string &str); void decode(const char *data, int len); private: std::map<std::string, std::string> data_; }; class ZKNode { public: typedef boost::function<void(const ZKData &data)> OnHandle; ZKNode(const std::string &host, ZooLogLevel level = ZOO_LOG_LEVEL_WARN); ~ZKNode(); void setOnMaster(const ZKData &masterData, const OnHandle &handle); void setOnSlave(const ZKData &slaveData, const OnHandle &handle); bool run(); void close(); bool isConnected() const; void setConnected(bool yes); void setExpired(bool yes); const std::string& path() const; private: ZKNode(const ZKNode&); ZKNode& operator=(const ZKNode&); bool bootstrap(); void runForMaster(); void checkMaster(); int getMasterData(ZKData &data); void takeMaster(); void takeSlave(); void masterExists(); static void masterCreateCompletion(int rc, const char *value, const void *data); static void masterCheckCompletion(int rc, const char *value, int value_len, const struct Stat *stat, const void *data); static void masterExistsWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx); static void masterExistsCompletion(int rc, const struct Stat *stat, const void *data); static void mainWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx); private: zhandle_t *handle_; std::string host_; int timeout_; std::string path_; bool connected_; bool expired_; ZKData masterData_; ZKData slaveData_; OnHandle onMaster_; OnHandle onSlave_; }; ``` ``` // ZKNode.cpp #include "ZKNode.h" #include <assert.h> #include <time.h> #include <sstream> #include <malloc.h> #include "include/logger.h" const char* PATH_NEW_BOND = "/myproject"; // 顶层节点 const char* PATH_MASTER = "/myproject/master"; // 临时节点 const char* PATH_NODES_PARENT = "/myproject/nodes"; // node父节点 const char* PATH_NODES_CHILDREN = "/myproject/nodes/node"; // 序列节点 const int MAX_BUFFER_LEN = 1024; ZKData& ZKData::set(const std::string &key, const std::string &value) { data_[key] = value; return *this; } std::string ZKData::get(const std::string &key) const { std::map<std::string, std::string>::const_iterator iter = data_.find(key); if (iter != data_.cend()) { return iter->second; } return ""; } std::string ZKData::encode() const { std::string result; std::map<std::string, std::string>::const_iterator iter = data_.cbegin(); while (iter != data_.cend()) { result += iter->first + ":" + iter->second + ";"; ++iter; } return result; } void ZKData::decode(const std::string &str) { data_.clear(); std::vector<std::string> values; boost::split(values, str, boost::is_any_of(";")); for (std::vector<std::string>::iterator iter = values.begin(); iter != values.end(); ++iter) { std::vector<std::string> temp; boost::split(temp, *iter, boost::is_any_of(":")); if (temp.size() == 2) { data_[temp[0]] = temp[1]; } } } void ZKData::decode(const char *data, int len) { std::string str; if (data && len) { str.resize(len + 1); memcpy(&str[0], data, len); } decode(str); } ////////////////////////////////////////////////////////////////////////// ZKNode::ZKNode(const std::string &host, ZooLogLevel level) : handle_(NULL), host_(host), timeout_(3000), connected_(false), expired_(false) { zoo_set_debug_level(level); } ZKNode::~ZKNode() { } void ZKNode::setOnMaster(const ZKData &masterData, const OnHandle &handle) { masterData_ = masterData; onMaster_ = handle; } void ZKNode::setOnSlave(const ZKData &slaveData, const OnHandle &handle) { slaveData_ = slaveData; onSlave_ = handle; } bool ZKNode::run() { LOGGER_INFO("zknode run..."); handle_ = zookeeper_init(host_.c_str(), ZKNode::mainWatcher, timeout_, 0, this, 0); if (!handle_) { LOGGER_INFO("zookeeper_init error"); return false; } int elapsed = 0; const int MS_INTERVAL = 100; while (!isConnected() && elapsed < timeout_) { boost::this_thread::sleep_for(boost::chrono::milliseconds(MS_INTERVAL)); elapsed += MS_INTERVAL; } if (!isConnected()) { LOGGER_INFO("zookeeper connect timeout"); return false; } if (!bootstrap()) { return false; } runForMaster(); return true; } void ZKNode::close() { LOGGER_INFO("ZKNode::close"); if (handle_) { zookeeper_close(handle_); handle_ = NULL; } } bool ZKNode::isConnected() const { return connected_; } void ZKNode::setConnected(bool yes) { connected_ = yes; } void ZKNode::setExpired(bool yes) { expired_ = yes; } const std::string& ZKNode::path() const { return path_; } bool ZKNode::bootstrap() { LOGGER_INFO("bootstrap start"); int ret = zoo_create(handle_, PATH_NEW_BOND, 0, 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0); if (ret != ZOK && ret != ZNODEEXISTS) { LOGGER_ERROR("zoo create error, path:" << PATH_NEW_BOND); return false; } ret = zoo_create(handle_, PATH_NODES_PARENT, 0, 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0); if (ret != ZOK && ret != ZNODEEXISTS) { LOGGER_ERROR("zoo create error, path:" << PATH_NODES_PARENT); return false; } char pathBuffer[MAX_BUFFER_LEN] = { 0 }; int pathBufferLen = sizeof pathBuffer; ret = zoo_create(handle_, PATH_NODES_CHILDREN, 0, 0, &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE | ZOO_EPHEMERAL, pathBuffer, pathBufferLen); if (ret != ZOK) { LOGGER_ERROR("zoo create error, path:" << PATH_NODES_CHILDREN); return false; } path_ = pathBuffer; slaveData_.set("path", path_); std::string data = slaveData_.encode(); ret = zoo_set(handle_, path_.c_str(), data.c_str(), data.size(), -1); if (ret != ZOK) { LOGGER_ERROR("zoo set nodes child data error, data:" << data); return false; } LOGGER_INFO("bootstrap end path:" << path_); return true; } void ZKNode::runForMaster() { LOGGER_INFO("runForMaster"); if (!connected_) { LOGGER_ERROR("runForMaster error is not connected"); return; } masterData_.set("path", path_); std::string data = masterData_.encode(); zoo_acreate(handle_, PATH_MASTER, data.c_str(), data.size(), &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, ZKNode::masterCreateCompletion, this); } void ZKNode::checkMaster() { LOGGER_INFO("checkMaster"); zoo_aget(handle_, PATH_MASTER, 0, ZKNode::masterCheckCompletion, this); } int ZKNode::getMasterData(ZKData &data) { LOGGER_INFO("getMasterData"); char buffer[MAX_BUFFER_LEN] = { 0 }; int bufferLen = sizeof buffer; struct Stat stat; int ret = zoo_get(handle_, PATH_MASTER, 0, buffer, &bufferLen, &stat); data.decode(buffer, bufferLen); return ret; } void ZKNode::takeMaster() { LOGGER_INFO("==========================this is master===================="); ZKData data; getMasterData(data); if (onMaster_) { onMaster_(data); } } void ZKNode::takeSlave() { LOGGER_INFO("==========================this is slave===================="); ZKData data; getMasterData(data); if (onSlave_) { onSlave_(data); } } void ZKNode::masterExists() { zoo_awexists(handle_, PATH_MASTER, ZKNode::masterExistsWatcher, this, ZKNode::masterExistsCompletion, this); } void ZKNode::masterCreateCompletion(int rc, const char *value, const void *data) { LOGGER_INFO("masterCreateCompletion rc:" << rc); ZKNode *zk = static_cast<ZKNode*>(const_cast<void*>(data)); switch (rc) { case ZCONNECTIONLOSS: case ZOPERATIONTIMEOUT: zk->checkMaster(); break; case ZOK: zk->takeMaster(); zk->masterExists(); break; case ZNODEEXISTS: zk->takeSlave(); zk->masterExists(); break; default: LOGGER_ERROR("Something went wrong when running for master"); break; } } void ZKNode::masterCheckCompletion(int rc, const char *value, int value_len, const struct Stat *stat, const void *data) { std::string strvalue = value ? value : ""; LOGGER_INFO("masterCheckCompletion rc:" << rc << ", value:" << strvalue); ZKNode *zk = static_cast<ZKNode*>(const_cast<void*>(data)); switch (rc) { case ZCONNECTIONLOSS: case ZOPERATIONTIMEOUT: zk->checkMaster(); break; case ZOK: if (!strvalue.empty() && strvalue == zk->path_) { zk->takeMaster(); } else { zk->masterExists(); } break; case ZNONODE: zk->runForMaster(); break; default: LOGGER_ERROR("Something went wrong when checking the master lock, rc:" << rc); break; } } void ZKNode::masterExistsWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) { LOGGER_INFO("masterExistsWatcher type:" << type); ZKNode *zk = static_cast<ZKNode*>(watcherCtx); if (type == ZOO_DELETED_EVENT) { assert(!strcmp(path, PATH_MASTER)); zk->runForMaster(); } } void ZKNode::masterExistsCompletion(int rc, const struct Stat *stat, const void *data) { ZKNode *zk = static_cast<ZKNode*>(const_cast<void*>(data)); switch (rc) { case ZCONNECTIONLOSS: case ZOPERATIONTIMEOUT: zk->masterExists(); break; case ZOK: break; case ZNONODE: LOGGER_INFO("masterExistsCompletion previous master is gone, running for master"); zk->runForMaster(); break; default: LOGGER_ERROR("Something went wrong when executing exists, rc:" << rc); break; } } void ZKNode::mainWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx) { LOGGER_INFO("ZKNode::mainWatcher type:" << type << ", state:" << state); ZKNode *zk = static_cast<ZKNode*>(watcherCtx); assert(zk != NULL); if (type == ZOO_SESSION_EVENT) { if (state == ZOO_CONNECTED_STATE) { zk->setConnected(true); } else if (state == ZOO_CONNECTING_STATE) { if (zk->isConnected()) { LOGGER_WARN("ZKNode::mainWatcher Disconnected."); } zk->setConnected(false); } else if (state == ZOO_EXPIRED_SESSION_STATE) { LOGGER_WARN("ZKNode::mainWatcher session expired will close"); zk->setExpired(true); zk->setConnected(false); zk->close(); } } } ``` 文章类别 Python Mobile Android Java Shell Life Database Bug Windows IOS Tools Boost Node.js Mac Product Tips C/C++ Golang Javascript React Qt MQ MongoDB Design Web Linux LLM ChatGPT RAG AI 提交