redis的zset是有序集合,默认根据score升序排序。并且可以根据scope范围查询,因此可以启动一个线程循环执行范围查询,获取当前时间之前的数据,即要执行任务,(因为不是严格按照时间匹配的,因此可能会有一点时间偏差,但一般情况下不会有影响),处理完后删除缓存。考虑到线程有可能会异常退出(比如redis连接异常等),因此使用监听者模式设计了线程重启方案,监听者会监听线程,当线程出现异常时监听者会重启线程。下面是具体代码。
@Configuration
public class RedisTemplateConfig {
@Bean(name = "myredis")
public RedisTemplate redisTemplate(RedisConnectionFactory factory){
RedisTemplate<String, Object> redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(factory);
// 使用StringRedisSerializer来序列化和反序列化redis的key值
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
// 使用GenericFastJsonRedisSerializer 来序列化和反序列化redis的value值
GenericFastJsonRedisSerializer genericFastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
redisTemplate.setValueSerializer(genericFastJsonRedisSerializer);
redisTemplate.setHashValueSerializer(genericFastJsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
public class DelayTaskConsumerThread extends Observable implements Runnable {
@Override
public void run() {
System.out.println("线程启动:"+Thread.currentThread().getId());
while (true){
try {
long time = System.currentTimeMillis();
consumeMsg("testDelayTask",time);
} catch (Exception e) {
e.printStackTrace();
System.out.println("线程异常:"+Thread.currentThread().getId());
doBusiness();
break;
}
}
}
private void consumeMsg(String key,long time){
RedisTemplate redisTemplate = (RedisTemplate) SpringContextUtil.getBean("myredis");
Set set = redisTemplate.opsForZSet().rangeByScore(key,0l, time);
if (set.size() > 0){
System.out.println("线程:"+Thread.currentThread().getId()+","+"获取到["+set.size()+"]条数据,[time="+time+"]");
Iterator iterator = set.iterator();
String next="";
while (iterator.hasNext()){
try {
next = (String) iterator.next();
System.out.println("线程:"+Thread.currentThread().getId()+","+"消费消息:[value="+ next +"]");
}catch (Exception e){
//这里可以对消费异常的数据做进一步处理
throw e;
}finally {
redisTemplate.opsForZSet().remove(key,next);
}
}
}else {
System.out.println("线程:"+Thread.currentThread().getId()+","+"没有消息[time="+time+"]");
}
}
public void doBusiness(){
if(true){
super.setChanged();
}
notifyObservers();
}
}
public class DelayTaskConsumerListener implements Observer {
@Override
public void update(Observable o, Object arg) {
System.out.println("DelayTaskConsumerThread线程异常退出,重新启动");
DelayTaskConsumerThread run = new DelayTaskConsumerThread();
run.addObserver(this);
ThreadPoolUtil.execute(run);
System.out.println("DelayTaskConsumerThread线程已重启");
}
}
@Component
public class DelayTaskConsumerRunner implements ApplicationRunner {
@Autowired
@Qualifier(value = "myredis")
private RedisTemplate redisTemplate;
@Override
public void run(ApplicationArguments args) {
redisTemplate.delete("testDelayTask");
DelayTaskConsumerThread taskConsumerThread = new DelayTaskConsumerThread();
DelayTaskConsumerListener taskConsumerListener = new DelayTaskConsumerListener();
taskConsumerThread.addObserver(taskConsumerListener);
ThreadPoolUtil.execute(taskConsumerThread);
}
}
@Component
public class SpringContextUtil implements ApplicationContextAware {
/**
* 上下文对象实例
*/
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringContextUtil.applicationContext = applicationContext;
}
/**
* 获取applicationContext
*
* @return
*/
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
/**
* 通过name获取 Bean.
*
* @param name
* @return
*/
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
/**
* 通过class获取Bean.
*
* @param clazz
* @param
* @return
*/
public static <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
/**
* 通过name,以及Clazz返回指定的Bean
*
* @param name
* @param clazz
* @param
* @return
*/
public static <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
@GetMapping(value = "/test", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public CommonResponse test() {
produceMsg("testDelayTask", UUID.randomUUID().toString(),System.currentTimeMillis());//这里的时间戳实际是需要从页面选择的任务执行时间,这里为了方便直接使用了系统当前时间戳
return new CommonResponse();
}
/**
*
* @param key
* @param value
* @param score 设置的任务执行时间
*/
private void produceMsg(String key,String value,long score){
Boolean add = redisTemplate.opsForZSet().add(key, value, score);
if (add){
System.out.println("发送消息:[value="+value+",score="+score+"]");
}else {
System.out.println("消息发送失败:[value="+value+",score="+score+"]");
}
}
缺点:
1、需要一直频繁的访问redis,就算redis中没有要执行的任务了也会一直访问,造成资源浪费
2、时间控制不精准
优化思路:
可以借鉴zookeeper中的节点监听机制,当某个数据节点被删除时,由服务端主动通知监听此节点的客户端,这样就不需要客户端一直轮询访问了。但zookeeper没有redis中的超时删除功能,因此不合适。幸运的是,etcd相比于zookeeper,增加了超时删除功能,另外相比于redis,增加了回调功能,因此可以考虑采用etcd或相关思想进行优化。
知识点:
1、redis的zset数据结构
2、java监听者模式
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)