Redis支持多种数据结构和存储模式,其中包括:
字符串(String):字符串类型是Redis最基本的数据类型,它可以包含任何数据,比如文本、整数或二进制数据滚辩等。
哈希(Hash):哈希类型存储的是键值对集合,这些键值对可以是字符串类型的,也可以是数字类型的。
列表(List):列表类型是一个有序的字符串列表,可以添加、删除和插入元素。
集合(Set):集合类型存搏模储的是一组唯一的无序元素,支持添加、删除和查询 *** 作。
有序集合(Sorted Set):有序集合类型存储的是一组有序的元素,每个元素都有一个分数(score),可以根据分数进行排序。
此外,Redis还支持多种不同的持久化模式,包括:
RDB持久化模式:在指定时间间隔内将内存中的数据保存到磁盘中。
AOF持久化模式:将所有对Redis数据库的写 *** 作记录下来,可以通过回放这些日志文件来恢复数据库。
混合持久化模式:同时使用RDB和AOF两种持久化模式,以保证数大银缺据的可靠性和恢复速度。
这里所说的定时任务可以说是计时器任务,比如说用户触发了某个动作,那么从这个点开始过二十四小时要对这个动作做点什么。那么如果有 1000 个用户触发了这个动作,就会有 1000 个定时任务。于含让喊是这就不是 Cron 范畴里面的内容了。举个最简单的例子,一个用户推荐了另一个用户,定一个二十四小时之后的任务,看看被推荐的用户有没有来注册,如果没注册就给他搞一条短信过去。Σ>―(〃°ω°〃)♡→
最初的设想
一开始是想把这个计时器做在内存里面直接调用的。
考虑到 Node.js 的定时并不是那么准确(无论是setTimeout还是setInterval),所以本来打算自己维护这个定时器队列。
又考滑祥虑到 Node.js 原生对象比较耗内存。之前用JSON对象存了一本字典,约十二万多的词条,原文件大概也就五六兆,用 Node.js 的原生对象一存居然有五六百兆的内存占用——所以打算这个定时器队列用 C++ 来写 addon。
考虑到任何时候插入的任务都有可能在已有的任务之前或者之后,所以本来想用 C++ 来写一个 小根堆 。每次用户来一个任务的时候就将这个任务插入到堆中。
如果按照上述方法的话,再加上对时间要求掐得也不是那么紧,于是就是一个不断的process.nextTick()的过程。
在process.nextTick()当中执行这么一个函数:
从小根堆中不断获取堆顶的任务并处理,一直处理到堆顶任务的执行时间大于当前时间为止。
继续process.nextTick()来让下一个 tick 执行步骤 1 中的流程。
所以最后就是一边往小根堆插入任务,另一边通过不断process.nextTick()消费任务的这么一个过程。
最后,为了考虑到程序重启的时候内存数据会丢失,还应该做一个持久化的事情——在每次插入任务的时候顺便往持久化中间件中插一条副本,比如 MySQL、MongoDB、Redis、Riak 等等任何三方依赖。消费任务的时候顺便把中间件中的这条任务数据给删除。
也就是说中间件中永远存的就是当前尚未完成的任务。每当程序重启的时候都先从中间件中把所有任务读取进来重建一下堆,然后就能继续工作了。
如果当时没有发现 Redis 的这个妙用的话,上述的流程将会是实现定时任务的流程了。
Redis 妙用
在 Redis 的 2.8.0 版本之后,其推出了一个新的特性——键空间消息( Redis Keyspace Notifications ),它配合 2.0.0 版本之后的SUBSCRIBE就能完成这个定时任务的 *** 作了, 不过定时的单位是秒 。
Publish / Subscribe
Redis 在 2.0.0 之后推出了 Pub / Sub 的指令,大致就是说一边给 Redis 的特定频道发送消息,另一边从 Redis 的特定频道取值——形成谈野了一个简易的消息队列
比如可以往foo频道推一个消息bar,那么就可以直接:
PUBLISH foo bar
javascript
var Redis = require("ioredis")
var sub = new Redis(/** 连接信息 */)
sub.once("connect", function() {
// 假设需要选择 redis 的 db,因为实际上们不会去污染默认的 db 0
sub.select(DB_NUMBER, function(err) {
if(err) process.exit(4)
sub.subscribe("foo", function() {
//... 订阅频道成功
})
})
})
// 监听从 `foo` 来的消息
sub.on("message", function(channel, msg) {
console.log(channel, msg)
})
Redis Keyspace Notifications
在 Redis 里面有一些事件,比如键到期、键被删除等。然后可以通过配置一些东西来让 Redis 一旦触发这些事件的时候就往特定的 Channel 推一条消息。
本文所涉及到的需求的话所需要关心的事件是EXPIRE即过期事件。
大致的流程就是给 Redis 的某一个 db 设置过期事件,使其键一旦过期就会往特定频道推消息,在自己的客户端这边就一直消费这个频道就好了。
以后一来一条定时任务,就把这个任务状态压缩成一个键,并且过期时间为距这个任务执行的时间差。那么当键一旦到期,就到了任务该执行的时间,Redis 自然会把过期消息推去,的客户端就能接收到了。这样一来就起到了定时任务的作用。
消息类型
当达到一定条件后,有两种类型的这种消息会被触发,用哪个需要自己选了。举个例子,删除了在 db 0 中一个叫foo的键,那么系统会往两个频道推消息,一个是del事件频道推foo消息,另一个是foo频道推del消息,它们小俩口被系统推送的指令分别等价于:
PUBLISH __keyspace@0__:foo del PUBLISH __keyevent@0__:del foo
其中往foo推送del的频道名为__keyspace@0__:foo,即是"__keyspace@" + DB_NUMBER + "__:" + KEY_NAME;而del的频道名为"__keyevent@" + DB_NUMBER + "__:" + EVENT_NAME。
配置
即使你的 Redis 版本达标了,但是 Redis 默认是关闭这个功能的,你需要修改配置文件来打开它,或者直接在 CLI 里面通过指令修改。这里就说说配置文件的修改吧。。
首先打开 Redis 的配置文件,在不同的系统和安装方式下文件位置可能不同,比如通过brew安装的 MacOS 下可能是在/usr/local/etc/redis.conf下面,通过apt-get安装的 Ubuntu 下可能是在/etc/redis/redis.conf下,总之找到配置文件。或者自己写一个配置文件,启动的时候指定配置文件地址就好。
然后找到一项叫notify-keyspace-events的地方,如果找不到则自行添加,其值可以是Ex、Klg等等。这些字母的具体含义如下所示:
K ,表示keyspace事件,有这个字母表示会往__keyspace@<db>__频道推消息。
E ,表示keyevent事件,有这个字母表示会往__keyevent@<db>__频道推消息。
g ,表示一些通用指令事件支持,如DEL、EXPIRE、RENAME等等。
$ ,表示字符串(String)相关指令的事件支持。
l ,表示列表(List)相关指令事件支持。
s ,表示集合(Set)相关指令事件支持。
h ,哈希(Hash)相关指令事件支持。
z ,有序集(Sorted Set)相关指令事件支持。
x ,过期事件,与 g 中的EXPIRE不同的是, g 的EXPIRE是指执行EXPIRE key ttl这条指令的时候顺便触发的事件,而这里是指那个key刚好过期的这个时间点触发的事件。
e ,驱逐事件,一个key由于内存上限而被驱逐的时候会触发的事件。
A ,g$lshzxe的别名。也就是说AKE的意思就代表了所有的事件。
结合上述列表就能拼凑出自己所需要的事件支持字符串了,在需求中只需要Ex就可以满足了,所以配置项就是这样的:
notify-keyspace-events Ex
然后保存配置文件,启动 Redis 就启用了过期事件的支持了。
实践
先说任务的创造者吧。由于这里 Redis 的事件只会传键名,并不会传键值,而过期事件触发的时候那个键已经没了,你也无法获取键值,加上主系统和任务系统是分布式的,所以就把所有需要的信息往键名塞。
一个最简单的键名设计就是任务类型 + ":" + JSON.stringify 化后的参数数组;更有甚者可以直接把任务类型替换成所需的函数路径,比如需要执行这个任务的函数在task/foo/bar文件下面的baz函数,参数arguments数组为[ 1, 2 ],那么键名的设计可以是task/foo/bar.baz:[1,2],反正只需要触发这个键,用不着去查询这个键。等到真正过期了任务系统接收到这个键名的时候再一一解析,得到需要执行task/foo/bar.baz这个消息,并且网函数里面传入[1,2]这个arguments。
所以当接收到一个定时任务的时候,得到消息、函数名、过期时间参数,这个函数
/** 假设 redis 是一个 ioredis 的对象 */
var sampleTaskMaker = function(message, func, timeout) {
message = JSON.stringify(message)
console.log("Received a new task:", func, message, "after " + timeout + ".")
// 这里的 uuid 是 npm 一个包
// 生成一个唯一 uuid 的目的是为了防止两个任务用了相同的函数和参数,那么
// 键名可能会重复并覆盖的情况
// uuid 的文档为 https://www.npmjs.com/package/node-uuid
//
// 这里的 ❤️ 是一个分隔符,冒号是分割 uuid 和后面内容的,而 ❤️ 是分割函数名
// 和消息的
var key = uuid.v1().replace(/-/g, "") +
":❤️" + func + "❤️" + message
var content = ""
redis.multi()
.set(key, content)
.expire(key, timeout)
.exec(function(err) {
if(err) {
console.error("Failed to publish EXPIRE EVENT for " + content)
console.error(err)
return
}
})
}
// assign 是 sugarjs 里面的函数
// 把 db 塞到字符串里面的 {db} 里去
var subscribeKey = "__keyevent@{db}__:expired".assign({ db: 1 })
// 假设 sub 是 ioredis 的对象
sub.once("connect", function() {
// 假设需要选择 redis 的 db,因为实际上不会去污染默认的 db 0
sub.select(1, function(err) {
if(err) process.exit(4)
sub.subscribe("foo", function() {
//... 订阅频道成功
})
})
})
// 监听从 `foo` 来的消息
sub.on("message", sampleOnExpired)
注意:这里选择 db 1 是因为一旦开启过期事件监听,那么这个 db 的所有过期事件都会被发送。为了不跟正常使用的 redis 过期键混淆,为这个事情专门用一个新的 db。比如在自己正常使用的 db 0 里面监听了,那么不是任务触发的过期事件也会传过来,这个时候解析的键名就不对了。
最后就是sampleOnExpired函数了。
var sampleOnExpired = function(channel, key) {
// UUID:❤️func❤️params
var body = key.split("❤️")
if(body.length <3) return
// 取出 body 第一位为 func
var func = body[1]
// 推出前两位,后面剩下的有可能是参数里面自带 ❤️ 而被分割,所以要拼回去
body.shift()body.shift()
var params = body.join("❤️")
// 然后把 params 传入 func 去执行
// func:
// path1/path2.func
func = func.split(".")
if(func.length !== 2) {
console.error("Bad params for task:", func.join("."), "-", params)
return
}
var path = func[0]
func = func[1]
var mod
try {
mod = require("./tasks/" + path)
} catch(e) {
console.error("Failed to load module", path)
console.error(e.stack)
return
}
process.nextTick(function() {
try {
mod[func].apply(null, JSON.parse(params))
} catch(e) {
console.error("Failed to call function", path, "-", func, "-", params)
console.error(e.stack)
}
})
}
这个简易的架子搭好后,只需要去写一堆任务执行函数,然后在生成任务的时候把相应参数传给sampleTaskMaker就好了。Redis 会自动过期并且触发事件给sampleOnExpired函数,然后就会去执行相应的任务处理函数了
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)