redis的发布订阅模式,使发布者和订阅者完全解耦
1.pom.xml and application.propertiesorg.springframework.boot spring-boot-starter-data-redisio.lettuce lettuce-coreredis.clients jedis2.9.0
spring: redis: #数据库索引 database: 5 host: 127.0.0.1 port: 6379 password: 123456 jedis: pool: #最大连接数 max-active: 8 #最大阻塞等待时间(负数表示没限制) #最大空闲 max-idle: 8 #最小空闲 min-idle: 02.消息发布者、消息处理者POJO、redis消息监听器容器以及redis监听器注入IOC容器
@Configuration //相当于xml中的beans public class RedisConfig { @Bean //相当于xml中的bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter, MessageListenerAdapter listenerAdapter2) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅了一个叫chat 的通道 container.addMessageListener(listenerAdapter, new PatternTopic("chat")); container.addMessageListener(listenerAdapter2, new PatternTopic("chat2")); //这个container 可以添加多个 messageListener return container; } @Bean MessageListenerAdapter listenerAdapter(MessageReceiver receiver) { //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage” //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看 return new MessageListenerAdapter(receiver, "receiveMessage"); } @Bean MessageListenerAdapter listenerAdapter2(MessageReceiver receiver) { //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage2” //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看 return new MessageListenerAdapter(receiver, "receiveMessage2"); } @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } @Bean RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) { Jackson2JsonRedisSerializer
MessageListenerAdapter通过反射使普通的POJO就可以处理消息。具体情况见MessageListenerAdapter的onMessage方法。
3.消息发布者@EnableScheduling //开启定时器功能 @Component public class MessageSender { @Autowired private StringRedisTemplate stringRedisTemplate; @Scheduled(fixedRate = 2000) //间隔2s 通过StringRedisTemplate对象向redis消息队列chat频道发布消息 public void sendMessage(){ stringRedisTemplate.convertAndSend("chat",String.valueOf(Math.random())); stringRedisTemplate.convertAndSend("chat2",String.valueOf(Math.random())); } }4.普通的消息处理器POJO
@Component public class MessageReceiver { public void receiveMessage(String message){ System.out.println("收到一条chat的消息:"+message); } public void receiveMessage2(String message){ System.out.println("收到一条chat2的消息:"+message); } }
MessageListenerAdapter通过反射调用receiveMessage方法处理消息
5.其他方式(参考) 配置
package com.example.day3_30.redisConfiBack; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import java.util.concurrent.CountDownLatch; @Configuration public class RedisMessageListener { @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter, MessageListenerAdapter listenerAdapterTest2){ RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //接受消息的key container.addMessageListener(listenerAdapter,new PatternTopic("phone")); container.addMessageListener(listenerAdapterTest2,new PatternTopic("phoneTest2")); return container; } @Bean public MessageListenerAdapter listenerAdapter(ReceiverRedisMessage receiver){ return new MessageListenerAdapter(receiver,"receiveMessage"); } @Bean public MessageListenerAdapter listenerAdapterTest2(ReceiverRedisMessage receiver){ return new MessageListenerAdapter(receiver,"receiveMessage2"); } @Bean ReceiverRedisMessage receiver(CountDownLatch latch) { return new ReceiverRedisMessage(latch); } @Bean public CountDownLatch latch(){ return new CountDownLatch(1);//指定了计数的次数 1 } }消息处理
package com.example.day3_30.redisConfiBack; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import java.util.concurrent.CountDownLatch; public class ReceiverRedisMessage { private static final Logger log = LoggerFactory.getLogger(ReceiverRedisMessage.class); private CountDownLatch latch; @Autowired public ReceiverRedisMessage(CountDownLatch latch) { this.latch = latch; } public void receiveMessage(String jsonMsg) { log.info("[开始消费REDIS消息队列phone数据...]"); try { System.out.println(jsonMsg); log.info("[消费REDIS消息队列phone数据成功.]"); } catch (Exception e) { log.error("[消费REDIS消息队列phone数据失败,失败信息:{}]", e.getMessage()); } latch.countDown(); } public void receiveMessage2(String jsonMsg) { log.info("[开始消费REDIS消息队列phoneTest2数据...]"); try { System.out.println(jsonMsg); log.info("[消费REDIS消息队列phoneTest2数据成功.]"); } catch (Exception e) { log.error("[消费REDIS消息队列phoneTest2数据失败,失败信息:{}]", e.getMessage()); } latch.countDown(); } }测试
package com.example.day3_30.redisConfiBack; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping public class PublisherController { private static final Logger log = LoggerFactory.getLogger(PublisherController.class); @Autowired private RedisTemplate redisTemplate; @GetMapping(value = "pub/{id}") public String pubMsg(@PathVariable String id){ redisTemplate.convertAndSend("phone","223333"); redisTemplate.convertAndSend("phoneTest2","34555665"); log.info("Publisher sendes Topic... "); return "success"; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)