本次需求需要同步两库中的company表。
业务代码略过。。。
rabbitmq创建交换机
绑定队列
创建队列
2.发布者代码
@Autowired RabbitTemplate rabbitTemplate; @Override public void sendMessageCompany(JSONObject messageVo, String routingkey) { // 交换机名称 绑定的routingkey 消息 rabbitTemplate.convertAndSend("company", "hello", messageVo); }
3.消费者代码
初始化通道
package com.gold.mtmc.rabbitmq.config; import com.gold.mtmc.common.contants.enums.RabbitMqEnum; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class HelloExchangeConfig { private String EXCHANGE = "hello"; private String QUEUE = "helloqueue"; private String RoutingKey = "message_hello"; @Bean public DirectExchange helloExchange() { DirectExchange directExchange = new DirectExchange(EXCHANGE, true, false); return directExchange; } @Bean public Queue helloQueueMsg() { Queue queue = new Queue(QUEUE, true, false, false); return queue; } @Bean public Binding helloBindingQueueMsg() { Binding binding = BindingBuilder.bind(helloQueueMsg()).to(helloExchange()).with(RoutingKey); return binding; } }
@Slf4j @Component public class DirectMsgListener { @Autowired private MsgListener msgListener; //指定队列名称 @RabbitListener(queues = "fanout") public void displayMailFanout(JSONObject messageVo, Channel channel, Message message) throws IOException { //回调处理消息 try { log.info("directMsg队列监听器收到消息开始:" + messageVo.toString()); //调用发送消息接口 msgListener.send(messageVo); log.info("directMsg队列监听器收到消息结束:" + messageVo.toString()); } catch (Exception e) { log.info("***********************************************发送消息失败:"+e.getMessage()); e.printStackTrace(); log.info("***********************************************发送消息失败:"+e); } finally { log.info("directMsg队列监听器收到消息结束:" + messageVo.toString() + message.getMessageProperties().getDeliveryTag()); //这段代码表示,这次消息,我已经接受并消费掉了,不会再重复发送消费 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)