可以查看队列中的消息而不消费,没有订阅的功能
package com.zjw.activemq.browser; import java.util.Enumeration; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; public class BrowserQueue { private static final String ACTIVEMQ_HOST = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false"; private static final String USERNAME = null; private static final String PASSWORD = null; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_HOST); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); QueueBrowser browser = session.createBrowser(new ActiveMQQueue("test")); Enumeration enumeration = browser.getEnumeration(); while (enumeration.hasMoreElements()) { TextMessage textMessage = (TextMessage) enumeration.nextElement(); System.out.println("textMessage:" + textMessage.getText()); } conn.close(); } }JMSCorrelationID
用于消息之间的关联,给人一种会话的感觉
http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html
示例
package com.zjw.activemq.correlation; import com.zjw.activemq.delay.cron.Test; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class CorrelationIDQueueReceiver { public static void main(String[] args) throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false" ); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("test"); MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(message -> { if(message instanceof TextMessage) { TextMessage msg = (TextMessage) message; try { System.out.println(msg.getJMSCorrelationID()); } catch (JMSException e) { e.printStackTrace(); } } }); } } package com.zjw.activemq.correlation; import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class CorrelationIDQueueSender { public static void main(String[] args) throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false" ); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("test"); MessageProducer producer = session.createProducer(queue); Message message = session.createTextMessage("Message from ServerA" ); message.setJMSCorrelationID("movie"); producer.send(message); // conn.close(); } }JMSReplyTo
发送方可以接受到消息消费确认的地址,达到类似于同步的效果
发送消息中设置replyTo
message.setJMSReplyTo(queue);
在接收端可以获取到replyTo,然后发送确认信息到对应的位置
Destination replyTo = message.getJMSReplyTo();
后面的QueueRequestor就是使用replayTo实现的,只不过发送端client默认添加了一个监听在一个临时队列中,然后replayTo设置为该地址,接收端可以向该地址回复确认信息,达到同步调用效果
QueueRequestor同步消息QueueRequestor发送同步消息,本质违背了mq的异步通讯原则。但是mq还是能够提供应用解耦、异构系统的特性因为使用QueueRequestor发送消息后,会等待接收端的回复,如果收不到回复就会造成死等现象!而且该方法没有设置超时等待的功能。
接收端示例:
package com.zjw.activemq.sync; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; public class QueueRequestorReceiver { private static final String ACTIVEMQ_HOST = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false"; private static final String USERNAME = null; private static final String PASSWORD = null; public static void main(String[] args) throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_HOST); Connection conn = factory.createConnection(); conn.start(); Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(new ActiveMQQueue("test")); consumer.setMessageListener(message -> { System.out.println("接到一条消息并发回确认信息."); try { Destination replyTo = message.getJMSReplyTo(); System.out.println("replyTo:" + replyTo); MessageProducer producer = session.createProducer(replyTo); producer.send(session.createTextMessage(replyTo.toString())); } catch (JMSException e) { e.printStackTrace(); } }); } }
发送端示例:
package com.zjw.activemq.sync; import javax.jms.Queue; import javax.jms.QueueRequestor; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class QueueRequestorSender { private static final String ACTIVEMQ_HOST = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false"; private static final String USERNAME = null; private static final String PASSWORD = null; public static void main(String[] args) throws Exception { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_HOST); ActiveMQConnection conn = (ActiveMQConnection)factory.createConnection(); // 这个不能少,因为要监听回来的消息,如果只是简单的发送消息可以写这个 conn.start(); QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("test"); QueueRequestor requestor = new QueueRequestor(session, queue); TextMessage message = session.createTextMessage("Hello"); System.out.println("发送同步请求"); TextMessage respMsg = (TextMessage)requestor.request(message); System.out.println("收到请求响应"); System.out.println("Response message content: [" + respMsg.getText() +"]"); } }生产环境中影响性能的几个因素 OOM
activemq启动脚本中配置内存
ACTIVEMQ_OPTS=-Xms1G -Xmx1G
修改配置文件,以及配置文件中的百分比
SystemUsage配置设置了一些系统内存和硬盘容量,当系统消耗超过这些容量设置时,amq会“slow down producer”,还是很重要的。
持久化和非持久化 消息异步发送建议使用默认,强制开启有可能丢失消息
异步发送丢失消息的场景是:生产者设置UseAsyncSend=true,使用producer.send(msg)持续发送消息。由于消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。如果服务端突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。
一下方式可以设置开启:
new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
((ActiveMQConnection)connection).setUseAsyncSend(true)批量确认
ActiveMQ缺省支持批量确认消息,批量确认可以提高系统性能
关闭方法:
new ActiveMQConnectionFactory("tcp://locahost:61616?jms.optimizeAcknowledge=false");
((ActiveMQConnectionFactory)connectionFactory).setOptimizeAcknowledge(fase);
((ActiveMQConnection)connection).setOptimizeAcknowledge(true);消费缓冲与消息积压prefetchSize
消费者端,一般来说消费的越快越好,broker的积压越小越好。
但是考虑到事务性和客户端确认的情况,如果一个消费者一次获取到了很多消息却都不确认,这会造成事务上下文变大,broker端这种“半消费状态”的数据变多,所以ActiveMQ有一个prefetchSize参数来控制未确认情况下,最多可以预获取多少条记录。
Pre-fetch默认值
创建连接时整体设置
String ACTIVEMQ_HOST = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false&jms.prefetchPolicy.all=50"; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(null, null, ACTIVEMQ_HOST);
创建连接时对topic和queue单独设置
String ACTIVEMQ_HOST = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)?Randomize=false&jms.prefetchPolicy.queuePrefetch=1&jms.prefetchPolicy.topicPrefetch=1"; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(null, null, ACTIVEMQ_HOST);
针对destination单独设置
Destination topic = session.createTopic("test?consumer.prefetchSize=10");
注意:对destination设置prefetchsize后会覆盖连接时的设置值
消息到底是推还是拉?发送消息时是推向broker
获取消息时:
-
默认是一条一条的推
-
当customer的prefetchSize满的时候停止推消息
-
当customer的prefetchSize==0时拉取消息
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)