说道消息队列,你肯定会想到Kafka、Rabbitmq等消息中间件,这些专业的消息中间件提供了很多功能特性,当然他的部署使用维护都是比较麻烦的。如果你对消息队列没那么高要求,想要轻量级的,使用Redis就没错啦。
Redis通过list数据结构来实现消息队列.主要使用到如下命令:
-
lpush和rpush入队列
-
lpop和rpop出队列
-
blpop和brpop阻塞式出队列
废话补不多说上代码:
connect('127.0.0.1', 6379); //消费消息 while (true) { try { $msg = $redis->rPop($list); if (!$msg) { sleep(1); } //业务处理 } catch (Exception $e) { echo $e->getMessage(); } }
以上代码会有个问题,如果队列长时间是空的,pop就不会不断的循环,会导致redis的QPS升高,影响性能。所以我们使用sleep来解决,当没有消息的时候阻塞一段时间。但其实这样还会带来另一个问题,就是sleep会导致消息的处理延迟增加的机率。这个问题我们可以通过blpop/brpop来阻塞读取队列。blpop/brpop在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零。用blpop/brpop替代前面的lpop/rpop,就完美解决了上面的问题。还有一个需要注意的点是我们需要是用try/catch来进行异常捕获,如果一直阻塞在那里,Redis服务器一般会主动断开掉空链接,来减少闲置资源的占用 可以使用 ping检查redis心跳
延时队列定义:
延迟队列就是个带延迟功能的消息队列,相对于普通队列,它可以在指定时间消费掉消息。
应用场景:
- 1、新用户注册,10分钟后发送邮件或站内信。
- 2、用户下单后,30分钟未支付,订单自动作废。
- 3、用户下单后,在抢单大厅订单进行补贴 10s 30s 90s 不同下单时长的订单进行不同的补贴策略。[我们公司目前遇到的场景]
实现方案
方式一:数据库实现
最简单的方式,定时扫表。例如对于订单支付失效要求比较高的,每2S扫表一次检查过期的订单进行主动关单 *** 作。优点是简单,缺点是每分钟全局扫表,浪费资源,如果遇到表数据订单量即将过期的订单量很大,会造成关单延迟。。
方式二:redis的有序集合sort set
步骤:
- 1.产生消息
- 用时间戳作为score,使用zadd key score1 value1 命令生产消息
- 2.读取消息
- withscores limit 0 1消费消息最早的一条消息。
- 3.消费消息并删除
- 实现简单的延迟队列,将消息数据序列化,作为zset的value,把消息处理时间作为score,每次通过zRangeByScore获取一条消息进行处理。
redis 客户端命令
ZADD key score1 member1 [score2 member2]
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT]
ZREM key member [member ...]
rabbitMQ的死信队列
消息在以下情况下会变成死信息,会被DXL(Dead-Letter-Exchane)死信交换机投递到死信队列:
- 1.消息被拒绝。
- 2.消息未被及时消费或者消费了不ack,直接过期。
- 3.队列达到最大长度。
- 死信队列的实现:
- 消息(设置ttl)--->交换机-->队列(消息过期)-->死信交换机-->死信队列-->消费
整体思路:通过redis的有序集合zset来实现简单的延迟队列,将消息数据序列化,作为zset的value,把消息处理时间作为score,每次通过zRangeByScore获取一条消息进行处理。
key = $this->prefix . $queue; $this->redis = new Redis(); $this->redis->connect($config['host'], $config['port'], $config['timeout']); $this->redis->auth($config['auth']); } public function getRedis() { return $this->redis; } public static function getInstance($queue, $config = []) { // 检查对象是否已经存在,不存在则实例化后保存到$instance属性 if(!(self::$_instance instanceof self)){ //内部实例化对象 self::$_instance = new self($queue,$config); } return self::$_instance; } private function __clone(){} private function __wakeup(){} public function delTask($value) { return $this->redis->zRem($this->key, $value); } public function getTask() { //获取任务,以0和当前时间为区间,返回一条记录 return $this->redis->zRangeByScore($this->key, 0, time(), ['limit' => [0, 1]]); } public function addTask($name, $time, $data) { //添加任务,以时间作为score,对任务队列按时间从小到大排序 return $this->redis->zAdd( $this->key, $time, json_encode([ 'task_name' => $name, 'task_time' => $time, 'task_params' => $data, ], JSON_UNESCAPED_UNICODE) ); } public function run() { //每次只取一条任务 $task = $this->getTask(); if (empty($task)) { return false; } $task = isset($task[0]) ? $task[0] : []; //有并发的可能,这里通过zrem返回值判断谁抢到该任务 if ($task && $this->delTask($task)) { $task = json_decode($task, true); //处理任务 echo '任务:' . $task['task_name'] . ' 运行时间:' . date('Y-m-d H:i:s') . PHP_EOL; return true; } return false; } } //生产使用 $Queue = DelayQueue::getInstance('payment_order',[ 'host' => '127.0.0.1', 'port' => 6379, 'auth' => '', 'timeout' => 60, ]); $Queue->addTask('payment_order_1', time() + 30, ['order_id' => '1']); $Queue->addTask('payment_order_2', time() + 60, ['order_id' => '2']); $Queue->addTask('payment_order_3', time() + 90, ['order_id' => '3']);
写一个php脚本,用来处理队列中的任务。
'127.0.0.1', 'port' => 6379, 'auth' => '', 'timeout' => 60, ]); //处理任务 while (true) { $Queue1->run(); usleep(100000); }
这里又产生了一个问题,同一个任务可能会被多个进程取到之后再使用 zrem 进行争抢,那些没抢到的进程都是白取了一次任务,这是浪费。解决办法:将 zrangebyscore 和 zrem 使用 lua 脚本进行原子化 *** 作,这样多个进程之间争抢任务时就不会出现这种浪费了。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)