2. 代码示例org.apache.activemq activemq-all5.5.0
public class ActiveMQClient { private Logger logger = LoggerFactory.getLogger(ActiveMQClient.class); private String userName = "admin"; private String password = "admin"; private String brokerUrl = "127.0.0.1:61616"; private Connection connection = null; private boolean connected = false; Destination destination = null; private MessageProducer producer = null; private MessageConsumer consumer = null; private Session session = null; public boolean connect() { ConnectionFactory connectionFactory; try { connectionFactory = new ActiveMQConnectionFactory(userName,password,"tcp://"+ brokerUrl); connection = connectionFactory.createConnection(); connection.start(); connected = true; logger.info("[ActiveMQ连接服务]服务连接成功"); return connected; } catch (JMSException e) { e.printStackTrace(); connected = false; logger.info("[ActiveMQ连接服务]服务连接失败"); return connected; } } public boolean disconnect() { try { if(producer != null) { producer.close(); } if(consumer != null) { consumer.close(); } if(session != null) { session.close(); } if(connection != null) { connection.close(); connected = false; } logger.info("[ActiveMQ断开服务]服务断开成功"); return true; } catch (JMSException e) { e.printStackTrace(); logger.info("[ActiveMQ断开服务]服务断开失败,发生异常"); return false; } } public boolean sendMessage(String topic, String message) { if(connection == null || !connected) { logger.info("[ActiveMQ发送消息]消息发送失败,服务尚未连接。"); return false; } try { session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); destination = session.createTopic(topic); producer = session.createProducer(destination); TextMessage textMessage = session.createTextMessage(message); producer.send(textMessage); logger.info("[ActiveMQ发送消息]消息发送成功。主题:{},消息:{}", topic,message); return true; } catch (JMSException e) { e.printStackTrace(); logger.info("[ActiveMQ发送消息]消息发送失败,发生异常。"); return false; } } public boolean receiveMessage(String topic) { if(connection == null || !connected) { logger.info("[ActiveMQ订阅消息]消息订阅失败,服务尚未连接。"); return false; } try { session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); destination = session.createTopic(topic); consumer = session.createConsumer(destination); consumer.setMessageListener(message -> { try { if(message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; logger.info("[ActiveMQ接收消息]消息主题:{},消息:{}", topic,textMessage.getText()); } } catch (JMSException e) { e.printStackTrace(); logger.info("[ActiveMQ接收消息]失败,发生异常"); } }); logger.info("[ActiveMQ订阅消息]消息订阅成功。"); return true; } catch (JMSException e) { e.printStackTrace(); logger.info("[ActiveMQ订阅消息]失败,发生异常。"); return false; } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)