package com.wang; import com.rabbitmq.client.Channel; import com.rabbitmq.client./confirm/iCallback; import com.wang.conf.Config; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; @Component public class Producer { private static final String QUEUE_NAME="hello"; public static void main(String[] args) throws IOException, InterruptedException { Channel channel = Config.getChannel(); //处理并发的有序map集合,ConcurrentHashMap是无序的 ConcurrentSkipListMapmap = new ConcurrentSkipListMap<>(); //开启发送确认模式 channel.confirmSelect(); ConfirmCallback confirmCallback =(deliveryTag, multiple )->{ System.out.println(deliveryTag+"success received"); //返回的消息有时候是批量确认的,批量确认需要判断!应该可以关闭批量确认的,但是不知道如何关闭 if (multiple){ ConcurrentNavigableMap confirmed = map.headMap(deliveryTag,true); /confirm/ied.clear(); }else { map.remove(deliveryTag); } if (map.size()==0){ System.out.println("success send all message"); } }; ConfirmCallback /confirm/iCallback1 = (deliveryTag, multiple)->{ System.out.println(deliveryTag+"failed received"); }; channel.addConfirmListener(/confirm/iCallback,/confirm/iCallback1); long begin = System.currentTimeMillis(); try { channel.queueDeclare(QUEUE_NAME,true,false,false,null); for (int i = 0; i < 100 ; i++) { String mes = "hello world"+i; channel.basicPublish("",QUEUE_NAME,null,mes.getBytes()); //channel.getNextPublishSeqNo()获得的是下一个发布的序列号,当前序列号需要减一! map.put(channel.getNextPublishSeqNo()-1,mes); } long end = System.currentTimeMillis(); System.out.println("time:"+(begin-end)); } catch (IOException e) { e.printStackTrace(); } } }
采用生产者创建监听器方式监听传来的异步确认消息
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)