业务背景:
后台定时任务刷新Redis的数据到数据库中,有多台机器开启了此定时同步的任务,但是需要其中一台工作,其他的作为备用,提高可用性。使用Redis分布式锁进行限制,拿到锁的机器去执行具体业务,拿不到锁的继续轮询。
分布式锁原理
分布式锁:当多个进程不在同一个系统中,多个进程共同竞争同一个资源,用分布式锁控制多个进程对资源的互斥访问。采用Redis服务器存储锁信息(即SET一个Key表示已加锁),可以实现多进程的并发读锁的状态,如果没有锁,则只允许一个进程加锁。
Redis分布式锁实现的关键点:
实现方案
方案一:采用Redis的原子性命令“SET key value EX expire-time NX”可以实现分布式锁的基本功能,其中的NX(Not Exist)即判断是否已存在锁,如果不存在key则可进行 *** 作,SET key value 等同于加锁,EX expire-time即设置超时时间,可以避免死锁,但是超时时间的设置需要根据具体业务设置一个合理的经验值,避免锁超时时间到了,业务没执行完的问题。
方案二:采用Lua脚本实现,Redis会将整个脚本作为一个整体执行,因此Lua脚本可以实现原子性 *** 作。相较于方案一,此处增加了心跳线程,不断更新锁超时时间,解决锁超时时间设置不合理的问题;生成UUID(或者是随机数字符串)作为锁的值,用于保证锁与Client的一一对应;采用轮询来实现断线自动重连。
实现方案1:SET EX NX
加锁流程图:
定义锁的变量名为lock,那么对应Redis命令:
判断是否加锁的命令:GET lock
加锁的命令:SET lock
设置超时时间的命令:EXPIRE expire-time
三条命令分开执行是不具有原子性的,比如可能会出现一个进程执行GET lock得到的结果为nil即尚未加锁,在其执行SET lock前另一个进程也执行了SET lock,导致两个进程都认为是可以加锁的,失去互斥性。
因此判断尚未加锁、加锁、设置超时时间必须原子 *** 作,使用Redis的命令“SET key value EX expire-time NX”可以实现该原子 *** 作。
package main import ( "fmt" "time" "github.com/gomodule/redigo/redis" ) func main() { rds, err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil { fmt.Println("Connect to redis error", err) return } defer rds.Close() for true { //检查是否有所与加锁必须是原子性 *** 作 result, err := rds.Do("SET", "lock", 1, "EX", 5, "NX") if err != nil { fmt.Println("redis set error.", err) return } result, err = redis.String(result, err) // 加锁失败,继续轮询 if result != "OK" { fmt.Println("SET lock failed.") time.Sleep(5 * time.Second) continue } // 加锁成功 fmt.Println("work begining...") // 此处处理业务 fmt.Println("work end"); // 业务处理结束后释放锁 result, err := rds.Do("del", "lock") break; } }
此方法弊端是对超时时间的设置有要求,需要根据具体业务设置一个合理的经验值,避免锁超时时间到了,业务没执行完的问题。
实现方案2:Lua脚本
package main import ( "fmt" "time" "github.com/satori/go.uuid" "github.com/gomodule/redigo/redis" ) var uuidClient uuid.UUID const ( script_LOCK = ` local res=redis.call('GET', KEYS[1]) if res then return 0 else redis.call('SET',KEYS[1],ARGV[1]); redis.call('EXPIRE',KEYS[1],ARGV[2]) return 1 end ` script_EXPIRE = ` local res=redis.call('GET', KEYS[1]) if not res then return -1 end if res==ARGV[1] then redis.call('EXPIRE', KEYS[1], ARGV[2]) return 1 else return 0 end ` script_DEL = ` local res=redis.call('GET', KEYS[1]) if not res then return -1 end if res==ARGV[1] then redis.call('DEL', KEYS[1]) else return 0 end ` ) func ResetExpire() { fmt.Println("Reset expire begin...") rds, err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil { fmt.Println("Connect to redis server error.", err) return } for true { luaExpire := redis.Newscript(1, script_EXPIRE) result, err := redis.Int(luaExpire.Do(rds, "lock", uuidClient.String(), 5)) if err != nil { fmt.Println("luaExpire exec error", err) break } if result != 1 { fmt.Println("Reset expire failed.") break } else { fmt.Println("Reset expire succeed.") } time.Sleep(3 * time.Second) } fmt.Println("Reset expire end.") } func main() { for true { rds, err := redis.Dial("tcp", "127.0.0.1:6379") if err != nil { fmt.Println("Connect to redis server error.", err) time.Sleep(5 * time.Second) continue } defer rds.Close() // 生成UUID标识锁与Client的对应关系 uuidClient,err = uuid.NewV4() //也可以生成随机数字符串来代替 if err != nil { fmt.Println("New uuid error.", err) return } luaLock := redis.Newscript(1, script_LOCK) luaDel:= redis.Newscript(1, script_DEL) for true { result, err := redis.Int(luaLock.Do(rds, "lock", uuidClient.String(), 5)) if err != nil { fmt.Println("luaLock exec error.", err) break } if result == 0 { fmt.Println("Set lock failed.") time.Sleep(5 * time.Second) continue } fmt.Println("Set lock succeed.") go ResetExpire() // 加锁成功 fmt.Println("work begining...") // 此处处理业务 fmt.Println("work end"); // 业务处理结束后释放锁 result, err = redis.Int(luaDel.Do(rds, "lock", uuidClient.String())) return } } }
Redis采用Lua脚本可以执行更多的个性化的原子 *** 作,在我项目中就采用这种容错性更高的方式。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)