- redis集群概述
- 客户端重定向
- 槽位迁移
- redis cluster 的运行流程
哨兵、主从、集群,一串下来。
redis cluster 主要作用如下(虽然是显而易见了):
数据分片,流量分发。
cluster 将不同的数据分发给不同的节点,不过没有使用一致性hash算法,而是引入了Hash槽位的概念。cluster有16384个槽位,每个槽位只能指派给一个节点。
所以本文的重心也就很明确了:redis集群是如何实现通信及数据分片、流量分发的
关于实 *** :【redis】闲得无聊,来聊聊当下爆火的 redis集群,顺便搭一个玩玩呗
客户端重定向
如果cluster中的某个节点收到客户端请求,但请求中查询的键不是当前节点负责的,则它将通知客户端进行重定向,客户端重新发送请求给真正的数据存储节点。
那是怎么实现?包发过来,不在我这儿,我告诉她在他那儿,让她重发给他?
还是包发过来,不在我这儿,我告诉她不在我这儿,因为我也不知道在谁那儿,让她一个一个自己去试试?
还是什么其他的方式?
我想是第一种,但是现实是什么样的,再看看。
int processCommand(client *c) { ...... if (server.cluster_enabled && !(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_LUA && server.lua_caller->flags & CLIENT_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)) { int hashslot; int error_code; //查找真正的存储节点 clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, &hashslot,&error_code); if (n == NULL || n != server.cluster->myself) { if (c->cmd->proc == execCommand) { discardTransaction(c); } else { flagTransaction(c); } //返回 ASK 或 MOBED 转向标志及重定向目标节点,通知客户端重定向 clusterRedirectClient(c,n,hashslot,error_code); return C_OK; } } ...... }
getNodeByQuery函数负责查找数据存储节点:
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) { clusterNode *n = NULL; robj *firstkey = NULL; int multiple_keys = 0; multiState *ms, _ms; multiCmd mc; int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0; if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) return myself; if (error_code) *error_code = CLUSTER_REDIR_NONE; if (cmd->proc == execCommand) { if (!(c->flags & CLIENT_MULTI)) return myself; ms = &c->mstate; } else { ms = &_ms; _ms.commands = &mc; _ms.count = 1; mc.argv = argv; mc.argc = argc; mc.cmd = cmd; } for (i = 0; i < ms->count; i++) { struct redisCommand *mcmd; robj **margv; int margc, *keyindex, numkeys, j; mcmd = ms->commands[i].cmd; margc = ms->commands[i].argc; margv = ms->commands[i].argv; keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys); for (j = 0; j < numkeys; j++) { robj *thiskey = margv[keyindex[j]]; int thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr)); if (firstkey == NULL) { firstkey = thiskey; slot = thisslot; n = server.cluster->slots[slot]; if (n == NULL) { getKeysFreeResult(keyindex); if (error_code) *error_code = CLUSTER_REDIR_DOWN_UNBOUND; return NULL; } if (n == myself && server.cluster->migrating_slots_to[slot] != NULL) { migrating_slot = 1; } else if (server.cluster->importing_slots_from[slot] != NULL) { importing_slot = 1; } } else { if (!equalStringObjects(firstkey,thiskey)) { if (slot != thisslot) { getKeysFreeResult(keyindex); if (error_code) *error_code = CLUSTER_REDIR_CROSS_SLOT; return NULL; } else { multiple_keys = 1; } } } if ((migrating_slot || importing_slot) && lookupKeyRead(&server.db[0],thiskey) == NULL) { missing_keys++; } } getKeysFreeResult(keyindex); } if (n == NULL) return myself; if (server.cluster->state != CLUSTER_OK) { if (!server.cluster_allow_reads_when_down) { if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE; return NULL; } else if (!(cmd->flags & CMD_READONLY) && !(cmd->proc == evalCommand) && !(cmd->proc == evalShaCommand)) { if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE; return NULL; } else { } } if (hashslot) *hashslot = slot; if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand) return myself; if (migrating_slot && missing_keys) { if (error_code) *error_code = CLUSTER_REDIR_ASK; return server.cluster->migrating_slots_to[slot]; } if (importing_slot && (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING)) { if (multiple_keys && missing_keys) { if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE; return NULL; } else { return myself; } } if (c->flags & CLIENT_READonLY && (cmd->flags & CMD_READonLY || cmd->proc == evalCommand || cmd->proc == evalShaCommand) && nodeIsSlave(myself) && myself->slaveof == n) { return myself; } if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED; return n; }
槽位迁移
什么是槽位迁移?rehash,就这样理解嘛。
什么时候发生?比方我某个节点挂了、比方我某个节点又复活可以分摊压力了;
void migrateCommand(client *c) { migrateCachedSocket *cs; int copy = 0, replace = 0, j; char *username = NULL; char *password = NULL; long timeout; long dbid; robj **ov = NULL; robj **kv = NULL; robj **newargv = NULL; rio cmd, payload; int may_retry = 1; int write_error = 0; int argv_rewritten = 0; int first_key = 3; int num_keys = 1; for (j = 6; j < c->argc; j++) { int moreargs = (c->argc-1) - j; if (!strcasecmp(c->argv[j]->ptr,"copy")) { copy = 1; } else if (!strcasecmp(c->argv[j]->ptr,"replace")) { replace = 1; } else if (!strcasecmp(c->argv[j]->ptr,"auth")) { if (!moreargs) { addReply(c,shared.syntaxerr); return; } j++; password = c->argv[j]->ptr; } else if (!strcasecmp(c->argv[j]->ptr,"auth2")) { if (moreargs < 2) { addReply(c,shared.syntaxerr); return; } username = c->argv[++j]->ptr; password = c->argv[++j]->ptr; } else if (!strcasecmp(c->argv[j]->ptr,"keys")) { if (sdslen(c->argv[3]->ptr) != 0) { addReplyError(c, "When using MIGRATE KEYS option, the key argument" " must be set to the empty string"); return; } first_key = j+1; num_keys = c->argc - j - 1; break; } else { addReply(c,shared.syntaxerr); return; } } if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK || getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK) { return; } if (timeout <= 0) timeout = 1000; ov = zrealloc(ov,sizeof(robj*)*num_keys); kv = zrealloc(kv,sizeof(robj*)*num_keys); int oi = 0; for (j = 0; j < num_keys; j++) { if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) { kv[oi] = c->argv[first_key+j]; oi++; } } num_keys = oi; if (num_keys == 0) { zfree(ov); zfree(kv); addReplySds(c,sdsnew("+NOKEYrn")); return; } try_again: write_error = 0; cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout); if (cs == NULL) { zfree(ov); zfree(kv); return; } rioInitWithBuffer(&cmd,sdsempty()); if (password) { int arity = username ? 3 : 2; serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity)); serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4)); if (username) { serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username, sdslen(username))); } serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password, sdslen(password))); } int select = cs->last_dbid != dbid; if (select) { serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); } int non_expired = 0; for (j = 0; j < num_keys; j++) { long long ttl = 0; long long expireat = getExpire(c->db,kv[j]); if (expireat != -1) { ttl = expireat-mstime(); if (ttl < 0) { continue; } if (ttl < 1) ttl = 1; } kv[non_expired++] = kv[j]; serverAssertWithInfo(c,NULL, rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); if (server.cluster_enabled) serverAssertWithInfo(c,NULL, rioWriteBulkString(&cmd,"RESTORE-ASKING",14)); else serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); serverAssertWithInfo(c,NULL,sdsEncodedObject(kv[j])); serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,kv[j]->ptr, sdslen(kv[j]->ptr))); serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); createDumpPayload(&payload,ov[j],kv[j]); serverAssertWithInfo(c,NULL, rioWriteBulkString(&cmd,payload.io.buffer.ptr, sdslen(payload.io.buffer.ptr))); sdsfree(payload.io.buffer.ptr); if (replace) serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); } num_keys = non_expired; errno = 0; { sds buf = cmd.io.buffer.ptr; size_t pos = 0, towrite; int nwritten = 0; while ((towrite = sdslen(buf)-pos) > 0) { towrite = (towrite > (64*1024) ? (64*1024) : towrite); nwritten = connSyncWrite(cs->conn,buf+pos,towrite,timeout); if (nwritten != (signed)towrite) { write_error = 1; goto socket_err; } pos += nwritten; } } char buf0[1024]; char buf1[1024]; char buf2[1024]; if (password && connSyncReadLine(cs->conn, buf0, sizeof(buf0), timeout) <= 0) goto socket_err; if (select && connSyncReadLine(cs->conn, buf1, sizeof(buf1), timeout) <= 0) goto socket_err; int error_from_target = 0; int socket_error = 0; int del_idx = 1; if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1)); for (j = 0; j < num_keys; j++) { if (connSyncReadLine(cs->conn, buf2, sizeof(buf2), timeout) <= 0) { socket_error = 1; break; } if ((password && buf0[0] == '-') || (select && buf1[0] == '-') || buf2[0] == '-') { if (!error_from_target) { cs->last_dbid = -1; char *errbuf; if (password && buf0[0] == '-') errbuf = buf0; else if (select && buf1[0] == '-') errbuf = buf1; else errbuf = buf2; error_from_target = 1; addReplyErrorFormat(c,"Target instance replied with error: %s", errbuf+1); } } else { if (!copy) { dbDelete(c->db,kv[j]); signalModifiedKey(c,c->db,kv[j]); notifyKeyspaceEvent(NOTIFY_GENERIC,"del",kv[j],c->db->id); server.dirty++; newargv[del_idx++] = kv[j]; incrRefCount(kv[j]); } } } if (!error_from_target && socket_error && j == 0 && may_retry && errno != ETIMEDOUT) { goto socket_err; } if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]); if (!copy) { if (del_idx > 1) { newargv[0] = createStringObject("DEL",3); replaceClientCommandVector(c,del_idx,newargv); argv_rewritten = 1; } else { zfree(newargv); } newargv = NULL; } if (!error_from_target && socket_error) { may_retry = 0; goto socket_err; } if (!error_from_target) { cs->last_dbid = dbid; addReply(c,shared.ok); } else { } sdsfree(cmd.io.buffer.ptr); zfree(ov); zfree(kv); zfree(newargv); return; socket_err: sdsfree(cmd.io.buffer.ptr); if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]); zfree(newargv); newargv = NULL; if (errno != ETIMEDOUT && may_retry) { may_retry = 0; goto try_again; } zfree(ov); zfree(kv); addReplySds(c, sdscatprintf(sdsempty(), "-IOERR error or timeout %s to target instancern", write_error ? "writing" : "reading")); return; }
redis cluster 的运行流程
这个启动过程我就不放代码了,和前面的主从、哨兵有很多相似之处。
1、节点启动
2、节点握手,参见主从握手
3、指派槽位
4、建立主从关系
5、节点通信:Gossip算法
6、故障转移,故障转移机制与哨兵基本一致
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)