我们只实现两个需求
- master选举,当master切换后需要收到通知
- 注册服务信息
注册到zookeeper上的目录结构如下:
/myproject # 项目顶层节点
/myproject/master # 临时节点,用来实现master选举
/myproject/nodes # node父节点
/myproject/nodes/node # 序列节点,所有节点都在这里
代码:
// ZKNode.h
#pragma once
#include <string>
#include <memory>
#include <zookeeper/zookeeper.h>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <boost/algorithm/string.hpp>
class ZKData {
public:
ZKData& set(const std::string &key, const std::string &value);
std::string get(const std::string &key) const;
std::string encode() const;
void decode(const std::string &str);
void decode(const char *data, int len);
private:
std::map<std::string, std::string> data_;
};
class ZKNode
{
public:
typedef boost::function<void(const ZKData &data)> OnHandle;
ZKNode(const std::string &host, ZooLogLevel level = ZOO_LOG_LEVEL_WARN);
~ZKNode();
void setOnMaster(const ZKData &masterData, const OnHandle &handle);
void setOnSlave(const ZKData &slaveData, const OnHandle &handle);
bool run();
void close();
bool isConnected() const;
void setConnected(bool yes);
void setExpired(bool yes);
const std::string& path() const;
private:
ZKNode(const ZKNode&);
ZKNode& operator=(const ZKNode&);
bool bootstrap();
void runForMaster();
void checkMaster();
int getMasterData(ZKData &data);
void takeMaster();
void takeSlave();
void masterExists();
static void masterCreateCompletion(int rc, const char *value, const void *data);
static void masterCheckCompletion(int rc, const char *value, int value_len, const struct Stat *stat, const void *data);
static void masterExistsWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx);
static void masterExistsCompletion(int rc, const struct Stat *stat, const void *data);
static void mainWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx);
private:
zhandle_t *handle_;
std::string host_;
int timeout_;
std::string path_;
bool connected_;
bool expired_;
ZKData masterData_;
ZKData slaveData_;
OnHandle onMaster_;
OnHandle onSlave_;
};
// ZKNode.cpp
#include "ZKNode.h"
#include <assert.h>
#include <time.h>
#include <sstream>
#include <malloc.h>
#include "include/logger.h"
const char* PATH_NEW_BOND = "/myproject"; // 顶层节点
const char* PATH_MASTER = "/myproject/master"; // 临时节点
const char* PATH_NODES_PARENT = "/myproject/nodes"; // node父节点
const char* PATH_NODES_CHILDREN = "/myproject/nodes/node"; // 序列节点
const int MAX_BUFFER_LEN = 1024;
ZKData& ZKData::set(const std::string &key, const std::string &value)
{
data_[key] = value;
return *this;
}
std::string ZKData::get(const std::string &key) const
{
std::map<std::string, std::string>::const_iterator iter = data_.find(key);
if (iter != data_.cend()) {
return iter->second;
}
return "";
}
std::string ZKData::encode() const
{
std::string result;
std::map<std::string, std::string>::const_iterator iter = data_.cbegin();
while (iter != data_.cend()) {
result += iter->first + ":" + iter->second + ";";
++iter;
}
return result;
}
void ZKData::decode(const std::string &str)
{
data_.clear();
std::vector<std::string> values;
boost::split(values, str, boost::is_any_of(";"));
for (std::vector<std::string>::iterator iter = values.begin(); iter != values.end(); ++iter) {
std::vector<std::string> temp;
boost::split(temp, *iter, boost::is_any_of(":"));
if (temp.size() == 2) {
data_[temp[0]] = temp[1];
}
}
}
void ZKData::decode(const char *data, int len) {
std::string str;
if (data && len) {
str.resize(len + 1);
memcpy(&str[0], data, len);
}
decode(str);
}
//////////////////////////////////////////////////////////////////////////
ZKNode::ZKNode(const std::string &host, ZooLogLevel level)
: handle_(NULL), host_(host), timeout_(3000), connected_(false), expired_(false)
{
zoo_set_debug_level(level);
}
ZKNode::~ZKNode()
{
}
void ZKNode::setOnMaster(const ZKData &masterData, const OnHandle &handle)
{
masterData_ = masterData;
onMaster_ = handle;
}
void ZKNode::setOnSlave(const ZKData &slaveData, const OnHandle &handle)
{
slaveData_ = slaveData;
onSlave_ = handle;
}
bool ZKNode::run()
{
LOGGER_INFO("zknode run...");
handle_ = zookeeper_init(host_.c_str(), ZKNode::mainWatcher, timeout_, 0, this, 0);
if (!handle_) {
LOGGER_INFO("zookeeper_init error");
return false;
}
int elapsed = 0;
const int MS_INTERVAL = 100;
while (!isConnected() && elapsed < timeout_) {
boost::this_thread::sleep_for(boost::chrono::milliseconds(MS_INTERVAL));
elapsed += MS_INTERVAL;
}
if (!isConnected()) {
LOGGER_INFO("zookeeper connect timeout");
return false;
}
if (!bootstrap()) {
return false;
}
runForMaster();
return true;
}
void ZKNode::close()
{
LOGGER_INFO("ZKNode::close");
if (handle_) {
zookeeper_close(handle_);
handle_ = NULL;
}
}
bool ZKNode::isConnected() const
{
return connected_;
}
void ZKNode::setConnected(bool yes)
{
connected_ = yes;
}
void ZKNode::setExpired(bool yes)
{
expired_ = yes;
}
const std::string& ZKNode::path() const
{
return path_;
}
bool ZKNode::bootstrap()
{
LOGGER_INFO("bootstrap start");
int ret = zoo_create(handle_, PATH_NEW_BOND, 0, 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
if (ret != ZOK && ret != ZNODEEXISTS) {
LOGGER_ERROR("zoo create error, path:" << PATH_NEW_BOND);
return false;
}
ret = zoo_create(handle_, PATH_NODES_PARENT, 0, 0, &ZOO_OPEN_ACL_UNSAFE, 0, 0, 0);
if (ret != ZOK && ret != ZNODEEXISTS) {
LOGGER_ERROR("zoo create error, path:" << PATH_NODES_PARENT);
return false;
}
char pathBuffer[MAX_BUFFER_LEN] = { 0 };
int pathBufferLen = sizeof pathBuffer;
ret = zoo_create(handle_, PATH_NODES_CHILDREN, 0, 0, &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE | ZOO_EPHEMERAL, pathBuffer, pathBufferLen);
if (ret != ZOK) {
LOGGER_ERROR("zoo create error, path:" << PATH_NODES_CHILDREN);
return false;
}
path_ = pathBuffer;
slaveData_.set("path", path_);
std::string data = slaveData_.encode();
ret = zoo_set(handle_, path_.c_str(), data.c_str(), data.size(), -1);
if (ret != ZOK) {
LOGGER_ERROR("zoo set nodes child data error, data:" << data);
return false;
}
LOGGER_INFO("bootstrap end path:" << path_);
return true;
}
void ZKNode::runForMaster()
{
LOGGER_INFO("runForMaster");
if (!connected_) {
LOGGER_ERROR("runForMaster error is not connected");
return;
}
masterData_.set("path", path_);
std::string data = masterData_.encode();
zoo_acreate(handle_, PATH_MASTER, data.c_str(), data.size(), &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, ZKNode::masterCreateCompletion, this);
}
void ZKNode::checkMaster()
{
LOGGER_INFO("checkMaster");
zoo_aget(handle_, PATH_MASTER, 0, ZKNode::masterCheckCompletion, this);
}
int ZKNode::getMasterData(ZKData &data)
{
LOGGER_INFO("getMasterData");
char buffer[MAX_BUFFER_LEN] = { 0 };
int bufferLen = sizeof buffer;
struct Stat stat;
int ret = zoo_get(handle_, PATH_MASTER, 0, buffer, &bufferLen, &stat);
data.decode(buffer, bufferLen);
return ret;
}
void ZKNode::takeMaster()
{
LOGGER_INFO("==========================this is master====================");
ZKData data;
getMasterData(data);
if (onMaster_) {
onMaster_(data);
}
}
void ZKNode::takeSlave()
{
LOGGER_INFO("==========================this is slave====================");
ZKData data;
getMasterData(data);
if (onSlave_) {
onSlave_(data);
}
}
void ZKNode::masterExists()
{
zoo_awexists(handle_, PATH_MASTER, ZKNode::masterExistsWatcher, this, ZKNode::masterExistsCompletion, this);
}
void ZKNode::masterCreateCompletion(int rc, const char *value, const void *data) {
LOGGER_INFO("masterCreateCompletion rc:" << rc);
ZKNode *zk = static_cast<ZKNode*>(const_cast<void*>(data));
switch (rc) {
case ZCONNECTIONLOSS:
case ZOPERATIONTIMEOUT:
zk->checkMaster();
break;
case ZOK:
zk->takeMaster();
zk->masterExists();
break;
case ZNODEEXISTS:
zk->takeSlave();
zk->masterExists();
break;
default:
LOGGER_ERROR("Something went wrong when running for master");
break;
}
}
void ZKNode::masterCheckCompletion(int rc, const char *value, int value_len, const struct Stat *stat, const void *data)
{
std::string strvalue = value ? value : "";
LOGGER_INFO("masterCheckCompletion rc:" << rc << ", value:" << strvalue);
ZKNode *zk = static_cast<ZKNode*>(const_cast<void*>(data));
switch (rc) {
case ZCONNECTIONLOSS:
case ZOPERATIONTIMEOUT:
zk->checkMaster();
break;
case ZOK:
if (!strvalue.empty() && strvalue == zk->path_) {
zk->takeMaster();
} else {
zk->masterExists();
}
break;
case ZNONODE:
zk->runForMaster();
break;
default:
LOGGER_ERROR("Something went wrong when checking the master lock, rc:" << rc);
break;
}
}
void ZKNode::masterExistsWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx)
{
LOGGER_INFO("masterExistsWatcher type:" << type);
ZKNode *zk = static_cast<ZKNode*>(watcherCtx);
if (type == ZOO_DELETED_EVENT) {
assert(!strcmp(path, PATH_MASTER));
zk->runForMaster();
}
}
void ZKNode::masterExistsCompletion(int rc, const struct Stat *stat, const void *data)
{
ZKNode *zk = static_cast<ZKNode*>(const_cast<void*>(data));
switch (rc) {
case ZCONNECTIONLOSS:
case ZOPERATIONTIMEOUT:
zk->masterExists();
break;
case ZOK:
break;
case ZNONODE:
LOGGER_INFO("masterExistsCompletion previous master is gone, running for master");
zk->runForMaster();
break;
default:
LOGGER_ERROR("Something went wrong when executing exists, rc:" << rc);
break;
}
}
void ZKNode::mainWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx)
{
LOGGER_INFO("ZKNode::mainWatcher type:" << type << ", state:" << state);
ZKNode *zk = static_cast<ZKNode*>(watcherCtx);
assert(zk != NULL);
if (type == ZOO_SESSION_EVENT) {
if (state == ZOO_CONNECTED_STATE) {
zk->setConnected(true);
} else if (state == ZOO_CONNECTING_STATE) {
if (zk->isConnected()) {
LOGGER_WARN("ZKNode::mainWatcher Disconnected.");
}
zk->setConnected(false);
} else if (state == ZOO_EXPIRED_SESSION_STATE) {
LOGGER_WARN("ZKNode::mainWatcher session expired will close");
zk->setExpired(true);
zk->setConnected(false);
zk->close();
}
}
}