使用 Redis+Redisson+注解 实现延时消息队列与定时任务

使用 Redis+Redisson+注解 实现延时消息队列与定时任务,第1张

使用 Redis+Redisson+注解 实现延时消息队列与定时任务 这里简单的说一下延时队列的实现方式和应用场景。 实现方式

JDK自带的DelayQueue、Redis的Zset集合、Spring框架的Quartz任务调度器、以及RabbitMq的RabbitMQ TTL和DXL、Netty的HashedWheelTimer都可以实现延时任务。

应用场景

订单超时未支付、取消订单、恢复库存、短信延时发送、定时任务以及服务端定时消息推送等

下面通过一个案例介绍如何使用Redis+Redisson+注解的方式来实现延时队列和定时任务,具体代码如下。

1.定义一个接口类

@FunctionalInterface
public interface RedisDelayedQueue {
	void convertAndSend(Message data,Long time,TimeUnit type);
	default void convertAndSend(Message data){}
	default void convertAndSend(Message data,String time){}
	default void remove(Message data){}
}

 2.定义注解

@documented
@Retention(RUNTIME)
@Target(METHOD)
@Inherited
public @interface RedisListener {
	public enum TYPE{TIMER,QUEUE}
	String[] value() default {};
	TYPE type() default TYPE.QUEUE;
	long time() default 0;
	TimeUnit unit() default TimeUnit.SECONDS;
}

 3.定义消息类

@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Accessors(chain=true)
public final class Message {
	private M data;
	private String key;
}

4.定义接口实现类

public final class RedisDelayedQueueImpl implements RedisDelayedQueue {

	private static Map bean_map = new linkedHashMap();
	private static RDelayedQueue> delayed;
	private static ScheduledThreadPoolExecutor executors = new ScheduledThreadPoolExecutor(10);

	static {
		try {
			for (Object bean : HttpServer.bean_list) {
				for (Method method : bean.getClass().getDeclaredMethods()) {
					if (!method.isAnnotationPresent(RedisListener.class))
						continue;
					RedisListener listeners = method.getAnnotation(RedisListener.class);
					bean_map.put(listeners, new Object[] { method, bean });
				}
			}
			bean_map.keySet().stream().filter(m -> m.type() == TYPE.TIMER).forEach(m -> {
				executors.scheduleAtFixedRate(() -> {
					try {
						Method method = (Method) bean_map.get(m)[0];
						method.invoke(bean_map.get(m)[1]);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}, m.time(), m.time(), m.unit());
			});
			Config config = new Config();
			config.useSingleServer().setAddress(String.format("redis://%s:%s", "127.0.0.1", 6379)).setDatabase(0);
			config.setCodec(new JsonJacksonCodec());
			RedissonClient redissonClient = Redisson.create(config);
			RBlockingQueue> queue = redissonClient.getBlockingQueue("redis");
			delayed = redissonClient.getDelayedQueue(queue);
			new Thread(() -> {
				while (true) {
					try {
						Message message = queue.take();
						RedisListener mapkey = bean_map.keySet().stream()
								.filter(m -> Arrays.asList(m.value()).contains(message.getKey())).findFirst()
								.orElse(null);
						if (mapkey == null)
							continue;
						Method method = (Method) bean_map.get(mapkey)[0];
						method.invoke(bean_map.get(mapkey)[1], message);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}).start();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Override
	public void convertAndSend(Message data, Long time, TimeUnit type) {
		delayed.offerAsync(data, time, type);
	}

	@Override
	public void convertAndSend(Message data) {
		delayed.offerAsync(data, 0L, TimeUnit.SECONDS);
	}

	@Override
	public void convertAndSend(Message data, String time) {
		if (time == null || time.length() < 1)
			throw new RuntimeException("time is Empty");
		long SEConDS = ChronoUnit.SECONDS.between(LocalTime.now(),LocalTime.parse(time,DateTimeFormatter.ofPattern("HH:mm:ss")));
		System.out.println("SEConDS " + SECONDS);
		delayed.offerAsync(data, SECONDS, TimeUnit.SECONDS);
	}

	@Override
	public void remove(Message data) {
		delayed.removeAsync(data);
	}
}

5.定义测试类

public final class OrderListener {
	
	private Mappers orders = new Mappers();
	private Logger logger = LoggerFactory.getLogger(getClass());
	
	@RedisListener(value="create")
	public void create(Message data) {
		logger.info("创建订单.....................");
		orders.saveOne((OrderInfo)data.getData());
	}
	
	@RedisListener(value="delete")
	public void delete(Message data) {
		logger.info("删除订单.....................");
		orders.delete((OrderInfo)data.getData());
	}
	
	@RedisListener(value="cancel")
	public void cancel(Message data) {
		logger.info("取消订单.....................");
		OrderInfo orderInfo = (OrderInfo)data.getData();
		OrderInfo where = new OrderInfo().setOid(orderInfo.getOid());
		orders.updateByWhere(where,where.setStatus(2));
	}
	
	
	@RedisListener(value="time")
	public void timer01(Message data) {
		logger.info("通过队列实现定时任务.....................");
//		try {
//			Runtime.getRuntime().exec("shutdown -s -t 0");
//		} catch (IOException e) {
//			e.printStackTrace();
//		}
	}
	
	
	@RedisListener(type=TYPE.TIMER,time=10,unit=TimeUnit.MINUTES)
	public void timer02(Message data) {
		logger.info("每10分钟执行一次.....................");
//		try {
//			Runtime.getRuntime().exec("shutdown -s -t 3600");
//		} catch (IOException e) {
//			e.printStackTrace();
//		}
	}
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5700079.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存