suishen-queue是一个基于redis sortedset的事件消费队列。
- 通过注解方便快速的与spring集成;
- 满足at least onece,at most onece消费模式;
- 支持同步消费和异步消费,异步消费支持按事件id分片,同分片下顺序消费,不同分片并发消费;同分片下,同id严格的顺序消费;
- 支持普通的即时消费队列和定时的延迟消费队列;
- 支持单点消费和集群的多节点消费;
- 提供队列消费状态的实时监控,及消费失败和队列积压的钉钉告警。
2、启用队列服务suishen suishen-queue{version}
@EnableSuishenQueue(basePackage = "weli.wormhole", url = "m1.redis.wormhole.wl.com:6379", consume = true, slaveUrl = "s1.redis.wormhole.wl.com:6379", nameSpace="wormhole",consumeType = QueueConsumeType.COLLECTION)
通过spring Registrar机制注册所有相关bean,初始化队列配置信息
3、定义事件绑定处理器@SuishenQueue(delayTime = 1000L, groupCount = 20, handler = TestDelaySourceEventHandler.class) public class PostWashEvent implements SourceEvent { public Long id; @Override public long getId() { return id; } } @Component public class PostWashHandler extends SourceEventHandler三、单点消费{ @Override public boolean handle(PostWashEvent sourceEvent) { return result; } }
- 通过spring SmartLifecycle机制,随容器的启停,manager开启和停止消费;
- 单线程轮询所有队列状态,发现有就绪状态队列,交由线程池开始消费当前队列;
- 节点向redis定时发送心跳,更新活跃时间;
- 从redis获取当前活跃节点列表;
- 更新本地消费节点列表;
- 感知到远程活跃节点与本地缓存活跃节点不一致,触发重分配;
- 通过一致性hash算法,为所有节点分配消费队列;
- 更新当前节点缓存的消费分配队列;
- 停止未再当前节点分配的队列消费,开启新的在当前节点分配的队列消费,无变化的不做作处理;
一致性hash说明:
a、发生队列重分配时,队列消费出现迁移,使用一致性hash可以减少迁移的队列,从而降低重分配发生时产生的消费暂停; b、节点ip+port计算hash值,ip+port相同时会认为是同一节点; c、每个节点会产生4个虚拟节点,保证队列分配的大体均匀。3、队列消费的并发安全问题
当有新节点的上线或下线导致重分配时,不同节点对事件发生的感知有延迟,或导致中间某个时刻,不同节点分配的消费队列不一致,引发队列的并发安全问题。
通过redis的lua脚本实现了分布式环境下的可重入锁:
- 队列开始消费前,需获取锁;
- 队列消费过程中,通过定时任务定期获取锁,完成对队列消费的续期;
- 队列停止消费后,释放锁;
使用分布式锁后,当发生不一致时,新节点的消费开启需等待旧节点更新后,释放队列的消费,最终会保证各个节点的一致性。
lua可重入锁说明:
a、使用scriptLoad预加载lua脚本,避免每次锁执行时lua脚本的重复传递; b、通过对value的值进行比较,相同时,可重新上锁,达到可重入; c、分布式redis执行lua需要保证所有key均在同一分片下,通过对key拼上后缀{queue},可以保证这些key均在一个分片下,从而保证分布式下锁的争取性4、与单点消费对比
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)