1.通过事务机制
RabbitMQ的事务机制 *** 作过程与事务型数据库有些类似:
1.channel.txSelect()用于开启事务 2.channel.txCommit()用于提交事务 3.channel.txRollback()用于回滚事务 -------------------------------------------------------------------------------- 示例: try { channel.txSelect(); String message = ""; channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); channel.txCommite(); }catch (Exception e) { e.printStackTrace(); channel.toRollback(); } -----------------------------------------------------详细例子-------------------------------------------- package com.dfyang.rabbitmq.tx; import com.dfyang.rabbitmq.RabbitConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; public class TXProducer { private static final String EXCHANGE_NAME = "tx.exchange"; private static final String QUEUE_NAME = "tx.queue"; private static final String ROUTING_KEY = "tx"; public static void main(String[] args) throws Exception { Connection connection = RabbitConnectionFactory.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); String message = "test!"; try { channel.txSelect(); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); channel.txCommit(); } catch (Exception e) { channel.txRollback(); } channel.close(); connection.close(); } }
1.客户端发送Tx.Select 将信道置为事务
2.Broker回复Tx.Select-Ok 确认已将信道置为事务模式
3.在发送完消息之后,客户端发送Tx.Commit提交事务
4.Broker回复Tx.Commit-Ok确认事务提交。
2.生产者/confirm/i机制
1.单条/confirm/i(发送一条等待确认一条) channel./confirm/iSelect();//将信道置为/confirm/i String message = ""; channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAN,message.getBytes()); if(!channel.waitFor/confirm/is()){ System.out.println("消息发送失败"); } System.out.println("消息发送成功"); 单条/confirm/i模式的效率仅仅比事务模式高一点,这种模式是阻塞的。 2.批量confirm 批量/confirm/i模式就是先开启/confirm/i模式,发送多条之后再调用waitFor/confirm/is()方法确认,这样发送多条之后才会等待一次确认消息,效率比单条/confirm/i模式高了许多。但是如果返回false或者超时,这一批次的消息就要全部重发,如果经常丢消息,效率并不比单条/confirm/i高。。 -----------------------------------详细代码-------------------------------- package com.dfyang.rabbitmq.tx; import com.dfyang.rabbitmq.RabbitConnectionFactory; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.MessageProperties; import java.util.Queue; public class TXProducer { private static final String EXCHANGE_NAME = "tx.exchange"; private static final String QUEUE_NAME = "tx.queue"; private static final String ROUTING_KEY = "tx"; public static void main(String[] args) throws Exception { Connection connection = RabbitConnectionFactory.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); String message = "test!"; channel./confirm/iSelect(); for (int i = 0; i < 10000; i++) { channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); } channel.waitFor/confirm/isOrDie(); channel.close(); connection.close(); } } 3.异步/confirm/i模式:采用异步模式将不用阻塞等待borker服务器确认接收到消息就可以继续发送消息 package com.springrabbitmq.comfirm; import com.rabbitmq.client.Channel; import com.rabbitmq.client./confirm/iListener; import com.rabbitmq.client.Connection; import com.springrabbitmq.util.RabbitMQConnectionUtil; import java.io.IOException; import java.util.*; import java.util.concurrent.TimeoutException; public class Send3 { private static final String QUEUE_NAME="test_queue_/confirm/i1"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = RabbitMQConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); //开启/confirm/i模式 //注意已经定义队列为AMQP的事务机制的话,就不能再改成/confirm/i channel./confirm/iSelect(); //未确认的消息标识 final SortedSetconfirmSet = Collections.synchronizedSortedSet(new TreeSet ()); ///confirm/i监听 channel.add/confirm/iListener(new /confirm/iListener() { //没有问题的handleAck,从un/confirm/ied集合里移除元素表示确认收到了 public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple){ System.out.println("------handleAck----multiple------"); //multiple为true时,移除(deliveryTag+1)之前的多个元素 /confirm/iSet.headSet(deliveryTag+1).clear(); }else { System.out.println("------handleAck----multiple------false"); //multiple为false时,移除一个 /confirm/iSet.remove(deliveryTag); } } //RabbitMQ异常时没有收到消息,/confirm/i会回执一条Nack给生产者 public void handleNack(long deliveryTag, boolean multiple) throws IOException { if (multiple){ System.out.println("------handleNack----multiple------"); //multiple为true时,移除(deliveryTag+1)之前的多个元素 /confirm/iSet.headSet(deliveryTag+1).clear(); }else { System.out.println("------handleNack----multiple------false"); //multiple为false时,移除一个 /confirm/iSet.remove(deliveryTag); } } }); String msg="Hello Confirm Message!![异步]"; for (int i=0;i<20;i++){ //channel为每次发布的消息指派一个ID long seqNo = channel.getNextPublishSeqNo(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("Message Send Success!"+i); //每次发布后将ID添加到un/confirm/ied未确定是否发送成功的集合Set中 /confirm/iSet.add(seqNo); System.out.println(/confirm/iSet.size()); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)