假设你已经完成了复杂的Zookeeper Server和C client安装。
参考资料:http://zookeeper.apache.org/doc/r3.5.9/zookeeperProgrammers.html
https://www.cnblogs.com/haippy/archive/2013/02/23/2921100.html
背景因为要做分布式框架,看大家说ZooKeeper适合做服务注册与发现模块,所以考察了一下这个Zoo Keeper,发现它的设计非常精简,使用节点路径的模型提供了大多数分布式服务的解决方案。
核心设计——node可以把Zookeeper看做一颗树,那么每个组成这棵树的节点叫做znode,每个znode都有关联的数据(字符串形式)、acl、版本号等信息。
每个znode都通过绝对路径来访问(ZK中没有相对路径),例如/a/b、/c等。
znode可以有子节点,在子节点被删除前不能删除父节点。
客户端可以一次性地监视某个znode(即使它还不存在),当该znode发生变化(数据修改、添加、删除)时服务器就会发起一个事件通知客户端。这也是服务发现的原理
节点主要有两种类型:
-
Ephemeral Nodes,临时节点,生命期和连接会话一样长,连接关闭时即被自动删除,它也不能创建子节点。
-
Persistent Nodes,会话关闭时仍保留。
除此之外,还有Sequence Nodes,创建顺序节点时,服务器会在用户指定的节点名后附加%010d格式的序号,每个用户都得到不同的序号,序号的大小关系构成了这些节点的全序关系。
举个例子,如果两个客户端同时创建顺序节点/xyz,那么他们实际上创建的节点名可能是/xyz0000000001,/xyz0000000002。这个特殊节点可以实现很多功能,例如分布式锁、分布式队列、两阶段提交、主节点选举等等。
API简单起见,这里只介绍同步API,记得#define THREADED来打开它们。
这里是下文会涉及到的类型定义:
typedef void (*watcher_fn) (zhandle_t *zh, int type, int state, const char *path,void *watcherCtx); struct Stat { int64_t czxid; int64_t mzxid; int64_t ctime; int64_t mtime; int32_t version; int32_t cversion; int32_t aversion; int64_t ephemeralOwner; int32_t dataLength; int32_t numChildren; int64_t pzxid; }; struct String_vector { int32_t count; char * *data; };
使用zookeeper_init连接ZK服务器:
zhandle_t *zookeeper_init(const char *host, watcher_fn fn, int recv_timeout, const clientid_t *clientid, void *context, int flags);
- host参数是以逗号分隔的ip:port列表,例如"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"。
- fn,全局的watcher回调函数,任何监听的事件被触发时,它就会被调用。
- 如果这是一条新连接,那么clientid参数应该被设置为0。
- context指向用户数据,用户可以自定义zhandle_t的附属数据,它可以通过zoo_get_context被获取。
- flags参数暂时无用,设置为0。
对一个节点,ZK提供了create、get、exists、delete、get_children等 *** 作。
int zoo_create(zhandle_t *zh, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int mode, char *path_buffer, int path_buffer_len);
-
path,节点路径名。
-
value,节点相关的数据。
-
valuelen,节点名的长度。
当path == NULL且value == -1时,节点数据被设为NULL
-
acl,节点的访问权限。如果信任所有客户端,那么此值可以设置为ZOO_OPEN_ACL_UNSAFE。
-
mode,节点类型,例如ZOO_EPHEMERAL、ZOO_PERSISTENT等。
-
path_buffer,保存被创建的节点名的缓冲区,因为顺序节点的名字会有所变化。
-
path_buffer_len,缓冲区大小。
int zoo_get(zhandle_t *zh, const char *path, int watch, char *buffer, int* buffer_len, struct Stat *stat);
- zh参数是zookeeper_init返回的句柄。
- path,节点路径名。
- watch,0或1,表示是否要监视此节点。
- buffer和buffer_len,保存返回的节点名。
- stat,不为NULL时,它将保存该节点的状态信息。
int zoo_exists(zhandle_t *zh, const char *path, int watch, struct Stat *stat);
-
zh参数是zookeeper_init返回的句柄
-
path,节点路径名。
-
watch,0或1,表示是否要监视此节点。
-
stat,不为NULL时,它将保存该节点的状态信息。
zoo_exists()返回ZOK表示节点存在,返回ZNONODE表示节点不存在。
int zoo_delete(zhandle_t *zh, const char *path, int version);
- zh参数是zookeeper_init返回的句柄
- path,要删除的节点路径名。
- version,zoo_get或zoo_exists时获得的stat.version,只有当版本号匹配时才能成功删除。
int zoo_get_children(zhandle_t *zh, const char *path, int watch, struct String_vector *strings);
- zh参数是zookeeper_init返回的句柄
- path,节点路径名。
- watch,0或1,表示是否要监视此节点。
- strings,返回的孩子列表。
使用zookeeper_close()来主动关闭ZK连接:
int zookeeper_close(zhandle_t *zh);
- zh参数是zookeeper_init返回的句柄
#define THREADED #include#include #include #include #include using namespace std; static zhandle_t *zh; void watcher(zhandle_t *zzh, int type, int state, const char *path, void *watcherCtx) { cout << "in watcher..." << endl; if (type == ZOO_SESSION_EVENT) { // state refers to states of zookeeper connection. // To keep it simple, we would demonstrate these 3: ZOO_EXPIRED_SESSION_STATE, ZOO_CONNECTED_STATE, ZOO_NOTCONNECTED_STATE if (state == ZOO_CONNECTED_STATE) { cout << "connected!" << endl; } else if (state == ZOO_NOTCONNECTED_STATE) { cout << "unconnected!" << endl; } else if (state == ZOO_EXPIRED_SESSION_STATE) { cout << "session expired!" << endl; zookeeper_close(zzh); } } } #define CHECK() do {if (rc) fprintf(stderr, "Error %d for %dn", rc, __LINE__);} while(0) int main(int argc, char **argv) { char buffer[512]; zoo_set_debug_level(ZOO_LOG_LEVEL_ERROR); zh = zookeeper_init("localhost:2181", watcher, 10000, 0, 0, 0); if (!zh) { return errno; } // 节点可以是: 持久的ZOO_PERSISTENT 或者 临时的ZOO_EPHEMERAL,后者不能有children int rc = zoo_create(zh, "/xyz", "value", 5, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, buffer, sizeof(buffer) - 1); CHECK(); struct Stat stat; int buflen = sizeof(buffer); rc = zoo_get(zh, "/xyz", 0, buffer, &buflen, &stat); CHECK(); fprintf(stdout, "get from /xyz: %sn", buffer); rc = zoo_get(zh, "/zk_test", 1, buffer, &buflen, &stat); CHECK(); fprintf(stdout, "get from /xyz: %sn", buffer); //fprintf(stderr, "%d %d %d %d %d %dn", stat.czxid, stat.ctime, stat.version, stat.ephemeralOwner, stat.dataLength, stat.numChildren); rc = zoo_create(zh, "/zk_test/child", "Iamchlid", 8, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL_SEQUENTIAL, buffer, sizeof(buffer) - 1); CHECK(); rc = zoo_create(zh, "/zk_test/child", "Iamchlid", 8, &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL_SEQUENTIAL, buffer, sizeof(buffer) - 1); CHECK(); char buf_name[512]; fprintf(stderr, "new child:%sn", buffer); rc = zoo_get(zh, buffer, 0, buf_name, &buflen, &stat); CHECK(); rc = zoo_delete(zh, buffer, stat.version); CHECK(); rc = zoo_exists(zh, "/xyz", 0, nullptr); printf("is exists?%dn", rc == ZOK); struct String_vector ss; rc = zoo_get_children(zh, "/zk_test", 0, &ss); CHECK(); cout << "count: " << ss.count << endl; for (int i = 0; i < ss.count; i++) cout << ss.data[i] << endl; sleep(120); zookeeper_close(zh); // 会话结束时,ephemeral节点被删除 return 0; }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)