yml配置:
server: port: 8088 spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest publisher-/confirm/i-type: correlated #消息确认方式,通过 correlated 来确认(将来的消息中才会带 correlation_id,只有通过 correlation_id 我们才能将发送的消息和返回值之间关联起来) publisher-returns: true #开启发送失败退回 #1.开启 confirm 确认机制
maven依赖:
1.配置类:org.springframework.boot spring-boot-starter-amqp
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { public static final String RPC_QUEUE1 = "queue_1"; //通道1 public static final String RPC_QUEUE2 = "queue_2"; //通道2 public static final String RPC_EXCHANGE = "rpc_exchange"; // 交换机 @Bean Queue msgQueue() { return new Queue(RPC_QUEUE1); } @Bean Queue replyQueue() { return new Queue(RPC_QUEUE2); } @Bean TopicExchange exchange() { return new TopicExchange(RPC_EXCHANGE); } @Bean Binding msgBinding() { return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1); } @Bean Binding replyBinding() { return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2); } @Bean RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setReplyAddress(RPC_QUEUE2); template.setReplyTimeout(6000); return template; } @Bean SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(RPC_QUEUE2); container.setMessageListener(rabbitTemplate(connectionFactory)); return container; } }2.生产者
import com.yl.config.RabbitConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; @Slf4j @RestController public class RpcClientController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send") public String send(String message) { // 创建消息对象 Message newMessage = MessageBuilder.withBody(message.getBytes()).build(); log.info("生产者发送消息----->>>>>", newMessage); //客户端发送消息 Message result = rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE1, newMessage); String response = ""; if (result != null) { // 获取已发送的消息的 correlationId String correlationId = newMessage.getMessageProperties().getCorrelationId(); log.info("生产者----->>>>>{}", correlationId); // 获取响应头信息 HashMap3.消费者headers = (HashMap ) result.getMessageProperties().getHeaders(); // 获取 server 返回的消息 id String msgId = (String) headers.get("spring_returned_message_correlation"); if (msgId.equals(correlationId)) { response = new String(result.getBody()); log.info("生产者发送消息----->>>>>:{}", response); } } return response; } }
import com.yl.config.RabbitConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; @Slf4j @Component public class RpcServerController { @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(queues = RabbitConfig.RPC_QUEUE1) public void process(Message msg) throws UnsupportedEncodingException { String message=new String(msg.getBody(),"UTF-8"); log.info("消费者消费消息的消息体:{}----->>>>>"+message); Message response = MessageBuilder.withBody(("i'm receive:"+new String(msg.getBody())).getBytes()).build(); CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId()); rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE2, response, correlationData); } }
postmain:http://localhost:8088/send?message=听闻广陵不知寒,大雪龙骑下江南
结果:
二.路由模式(路由--->交换机) yml配置:
server: port: 8084 spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest publisher-/confirm/is: true #1.开启 confirm 确认机制 publisher-returns: true #2.开启 return 确认机制 #3.设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 template: mandatory: true
maven配置:
1.RabbitMQ配置org.springframework.boot spring-boot-starter-amqp
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitQueueConfig { public static final String QUEUE_NAME="rollback-queue"; //通道 public static final String EXCHANGE_NAME="rollback-exchange"; //交换机 public static final String ROUTINGKEY_NAME="rollback-routingkey"; //路由 @Bean public Queue queue(){ return new Queue(QUEUE_NAME,true); } @Bean public DirectExchange directExchange(){ return new DirectExchange(EXCHANGE_NAME); } @Bean public Binding binding(){ return BindingBuilder.bind(queue()).to(directExchange()) .with(ROUTINGKEY_NAME); } }2.重写RabbitTemplate,创建RabbitTemplateConfig配置类(手动确认消费)
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @Slf4j public class RabbitTemplateConfig { //第二种方式 final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("/confirm/iCallback,相关数据:{}", correlationData); log.info("/confirm/iCallback,确认消息:{}", ack); log.info("/confirm/iCallback,原因:{}", cause); } }; @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(); //设置连接工厂Bean rabbitTemplate.setConnectionFactory(connectionFactory); //手动开启 rabbitTemplate.setMandatory(true); //设置传输数据是json格式 rabbitTemplate.setMessageConverter(jsonMessageConverter()); //流程:生产者-->交换机-->路由键-->队列 //ConfirmCallback //流程:生产者-->交换机 //1)成功 触发回调 //2)失败 触发回调 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("/confirm/iCallback,相关数据:{}", correlationData); log.info("/confirm/iCallback,确认消息:{}", ack); log.info("/confirm/iCallback,原因:{}", cause); } }); //第二种方式 //rabbitTemplate.setConfirmCallback(/confirm/iCallback); //ReturnCallback:该回调函数的触发器与mandatory: true参数有必要关系 //流程:交换机-->队列 //成功 不触发回调 //失败 触发回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("ReturnCallback,消息:{}", message); log.info("ReturnCallback,回应码:{}", replyCode); log.info("ReturnCallback,回应信息:{}", replyText); log.info("ReturnCallback,交换机:{}", exchange); log.info("ReturnCallback,路由键:{}", routingKey); } }); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter jsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } }3.消息监听指定具体的队列
import com.yl.controller.RabbitQueueReceiver; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MessageListenerConfig { @Bean public SimpleMessageListenerContainer simpleMessageListenerContainer( ConnectionFactory connectionFactory, RabbitQueueConfig rabbitQueueConfig, RabbitQueueReceiver rabbitQueuReceiver){ SimpleMessageListenerContainer container=new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory);//设置mq连接工厂对象 container.setConcurrentConsumers(1);//设置并发消费者 container.setMaxConcurrentConsumers(1);//设置最多的并发消费者 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息 // container.setMessageConverter(jackson2JsonMessageConverter()); //注意:此处不能使用Autowired根据类型自动注入队列,必须调用rabbitmqDirectConfig.firstQueue()获得,why? // 因为项目中可能存在多个队列,它们的类型都是Queue,自动注入会报错 container.setQueues(rabbitQueueConfig.queue()); container.setMessageListener(rabbitQueuReceiver); return container; } @Bean public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){ return new Jackson2JsonMessageConverter(); } }4.消息生产者
import com.yl.common.Result; import com.yl.common.SnowflakeIdWorker; import com.yl.entity.Journal; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; @RestController @Slf4j public class SendController { final static SnowflakeIdWorker idWorker = new SnowflakeIdWorker(0, 0); @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("send") public Result send(){ String exchange="rollback-exchange"; String routingkey="rollback-routingkey"; log.info("生产者开始发送消息"); Journal journal = new Journal(); journal.setId(idWorker.nextId()); journal.setTitle("听闻广陵不知寒,大雪龙骑下江南"); // journal.setCreateTime(LocalDateTime.now()); journal.setTitleDesc("怒发冲冠⑵,凭阑处⑶、潇潇雨歇⑷。抬望眼,仰天长啸⑸,壮怀激烈⑹。三十功名尘与土⑺,八千里路云和月⑻。莫等闲⑼、白了少年头,空悲切⑽。 靖康耻⑾,犹未雪。臣子恨,何时灭。驾长车,踏破贺兰山缺⑿。壮志饥餐胡虏肉⒀,笑谈渴饮匈奴血⒁。待从头、收拾旧山河,朝天阙⒂。"); //注意:将消息推送到正常的交换机中 //参数一:交换机名称 //参数二:路由键 //参数三:传递参数 //流程:生产者-->交换机-->路由键-->队列 rabbitTemplate.convertAndSend(exchange,routingkey,journal); log.info("生产者发送消息完成"); return Result.succ(" *** 作成功"); } }5.消息的消费者
import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.databind.util.JSONPObject; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener; import org.springframework.stereotype.Component; @Slf4j @Component @RabbitListener(queues = {"rollback_queue"}) public class RabbitQueueReceiver extends AbstractAdaptableMessageListener { @Override public void onMessage(Message message, Channel channel) throws Exception { //消息的唯一ID,单调递增正整数,从1开始,当multiple=trues,一次性处理<=deliveryTag的所有 long deliveryTag = message.getMessageProperties().getDeliveryTag(); boolean multiple=false; //false单条 true 批量 // channel.basicAck(deliveryTag, multiple); //正确消费消息 // channel.basicReject(deliveryTag,true); //为true会重新放回队列 // channel.basicNack(deliveryTag, multiple,true) try { String msg=new String(message.getBody(),"UTF-8"); JSonObject json = JSONObject.parseObject(msg); Long id = json.getLong("id"); log.info("消费的消息id"+id+"-------->>>>>>>"+"消费者消费消息的消息体:{}----->>>>>"+message); //睡眠四秒 for(int i=0;i<4;i++){ Thread.sleep(1000); System.out.println("..."); } // if(deliveryTag%2==0){ // throw new RuntimeException("偶数必须为0"); // } log.info("消息已被正确消费--->>>>>>>>"+deliveryTag); //当前模式为单条消费 channel.basicAck(deliveryTag, multiple); } catch (Exception e) { e.printStackTrace(); //报异常重新投递 channel.basicReject(deliveryTag,true); } } // @RabbitHandler // public void handlerMessage(Journal orderVo) { // log.info("消费者消费消息"+orderVo.toString()); // } }6.实体类对象
import com.fasterxml.jackson.annotation.JsonFormat; import lombok.Data; import java.time.LocalDateTime; @Data public class Journal { private Long id; private String title; private String titleDesc; }7.工具类
//雪花算法 生成主键ID public class SnowflakeIdWorker { // ==============================Fields=========================================== private final long twepoch = 1420041600000L; private final long workerIdBits = 5L; private final long datacenterIdBits = 5L; private final long maxWorkerId = -1L ^ (-1L << workerIdBits); private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits); private final long sequenceBits = 12L; private final long workerIdShift = sequenceBits; private final long datacenterIdShift = sequenceBits + workerIdBits; private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits; private final long sequenceMask = -1L ^ (-1L << sequenceBits); private long workerId; private long datacenterId; private long sequence = 0L; private long lastTimestamp = -1L; // ==============================Constructors===================================== public SnowflakeIdWorker(long workerId, long datacenterId) { if (workerId > maxWorkerId || workerId < 0) { throw new IllegalArgumentException( String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); } if (datacenterId > maxDatacenterId || datacenterId < 0) { throw new IllegalArgumentException( String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId)); } this.workerId = workerId; this.datacenterId = datacenterId; } // ==============================Methods========================================== public synchronized long nextId() { long timestamp = timeGen(); // 如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 if (timestamp < lastTimestamp) { throw new RuntimeException(String.format( "Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); } // 如果是同一时间生成的,则进行毫秒内序列 if (lastTimestamp == timestamp) { sequence = (sequence + 1) & sequenceMask; // 毫秒内序列溢出 if (sequence == 0) { // 阻塞到下一个毫秒,获得新的时间戳 timestamp = tilNextMillis(lastTimestamp); } } // 时间戳改变,毫秒内序列重置 else { sequence = 0L; } // 上次生成ID的时间截 lastTimestamp = timestamp; // 移位并通过或运算拼到一起组成64位的ID return ((timestamp - twepoch) << timestampLeftShift) // | (datacenterId << datacenterIdShift) // | (workerId << workerIdShift) // | sequence; } protected long tilNextMillis(long lastTimestamp) { long timestamp = timeGen(); while (timestamp <= lastTimestamp) { timestamp = timeGen(); } return timestamp; } protected long timeGen() { return System.currentTimeMillis(); } // ==============================Test============================================= public static void main(String[] args) { SnowflakeIdWorker idWorker = new SnowflakeIdWorker(0, 0); for (int i = 0; i < 100; i++) { long id = idWorker.nextId(); String insertSQL = "insert into orderNumber value('" + id + "');"; System.out.println(insertSQL); } } }
import lombok.Data; import java.io.Serializable; @Data public class Result implements Serializable { private int code; private String msg; private Object data; public static Result succ(Object data) { return succ(200, " *** 作成功", data); } public static Result succ(int code, String msg, Object data) { Result r = new Result(); r.setCode(code); r.setMsg(msg); r.setData(data); return r; } public static Result fail(String msg) { return fail(400, msg, null); } public static Result fail(int code, String msg, Object data) { Result r = new Result(); r.setCode(code); r.setMsg(msg); r.setData(data); return r; } }
postmain调用接口生产消息:http://localhost:8084/send
结果: 项目地址:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)