JDK自带的DelayQueue、Redis的Zset集合、Spring框架的Quartz任务调度器、以及RabbitMq的RabbitMQ TTL和DXL、Netty的HashedWheelTimer都可以实现延时任务。
应用场景订单超时未支付、取消订单、恢复库存、短信延时发送、定时任务以及服务端定时消息推送等
下面通过一个案例介绍如何使用Redis+Redisson+注解的方式来实现延时队列和定时任务,具体代码如下。1.定义一个接口类
@FunctionalInterface public interface RedisDelayedQueue{ void convertAndSend(Message data,Long time,TimeUnit type); default void convertAndSend(Message data){} default void convertAndSend(Message data,String time){} default void remove(Message data){} }
2.定义注解
@documented @Retention(RUNTIME) @Target(METHOD) @Inherited public @interface RedisListener { public enum TYPE{TIMER,QUEUE} String[] value() default {}; TYPE type() default TYPE.QUEUE; long time() default 0; TimeUnit unit() default TimeUnit.SECONDS; }
3.定义消息类
@Data @ToString @NoArgsConstructor @AllArgsConstructor @Builder @Accessors(chain=true) public final class Message{ private M data; private String key; }
4.定义接口实现类
public final class RedisDelayedQueueImplimplements RedisDelayedQueue { private static Map bean_map = new linkedHashMap (); private static RDelayedQueue > delayed; private static ScheduledThreadPoolExecutor executors = new ScheduledThreadPoolExecutor(10); static { try { for (Object bean : HttpServer.bean_list) { for (Method method : bean.getClass().getDeclaredMethods()) { if (!method.isAnnotationPresent(RedisListener.class)) continue; RedisListener listeners = method.getAnnotation(RedisListener.class); bean_map.put(listeners, new Object[] { method, bean }); } } bean_map.keySet().stream().filter(m -> m.type() == TYPE.TIMER).forEach(m -> { executors.scheduleAtFixedRate(() -> { try { Method method = (Method) bean_map.get(m)[0]; method.invoke(bean_map.get(m)[1]); } catch (Exception e) { e.printStackTrace(); } }, m.time(), m.time(), m.unit()); }); Config config = new Config(); config.useSingleServer().setAddress(String.format("redis://%s:%s", "127.0.0.1", 6379)).setDatabase(0); config.setCodec(new JsonJacksonCodec()); RedissonClient redissonClient = Redisson.create(config); RBlockingQueue > queue = redissonClient.getBlockingQueue("redis"); delayed = redissonClient.getDelayedQueue(queue); new Thread(() -> { while (true) { try { Message> message = queue.take(); RedisListener mapkey = bean_map.keySet().stream() .filter(m -> Arrays.asList(m.value()).contains(message.getKey())).findFirst() .orElse(null); if (mapkey == null) continue; Method method = (Method) bean_map.get(mapkey)[0]; method.invoke(bean_map.get(mapkey)[1], message); } catch (Exception e) { e.printStackTrace(); } } }).start(); } catch (Exception e) { e.printStackTrace(); } } @Override public void convertAndSend(Message data, Long time, TimeUnit type) { delayed.offerAsync(data, time, type); } @Override public void convertAndSend(Message data) { delayed.offerAsync(data, 0L, TimeUnit.SECONDS); } @Override public void convertAndSend(Message data, String time) { if (time == null || time.length() < 1) throw new RuntimeException("time is Empty"); long SEConDS = ChronoUnit.SECONDS.between(LocalTime.now(),LocalTime.parse(time,DateTimeFormatter.ofPattern("HH:mm:ss"))); System.out.println("SEConDS " + SECONDS); delayed.offerAsync(data, SECONDS, TimeUnit.SECONDS); } @Override public void remove(Message data) { delayed.removeAsync(data); } }
5.定义测试类
public final class OrderListener { private Mappersorders = new Mappers (); private Logger logger = LoggerFactory.getLogger(getClass()); @RedisListener(value="create") public void create(Message> data) { logger.info("创建订单....................."); orders.saveOne((OrderInfo)data.getData()); } @RedisListener(value="delete") public void delete(Message> data) { logger.info("删除订单....................."); orders.delete((OrderInfo)data.getData()); } @RedisListener(value="cancel") public void cancel(Message> data) { logger.info("取消订单....................."); OrderInfo orderInfo = (OrderInfo)data.getData(); OrderInfo where = new OrderInfo().setOid(orderInfo.getOid()); orders.updateByWhere(where,where.setStatus(2)); } @RedisListener(value="time") public void timer01(Message> data) { logger.info("通过队列实现定时任务....................."); // try { // Runtime.getRuntime().exec("shutdown -s -t 0"); // } catch (IOException e) { // e.printStackTrace(); // } } @RedisListener(type=TYPE.TIMER,time=10,unit=TimeUnit.MINUTES) public void timer02(Message> data) { logger.info("每10分钟执行一次....................."); // try { // Runtime.getRuntime().exec("shutdown -s -t 3600"); // } catch (IOException e) { // e.printStackTrace(); // } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)