ActiveMQ实现发送消息和异步接收消息代码示例

ActiveMQ实现发送消息和异步接收消息代码示例,第1张

ActiveMQ实现发送消息和异步接收消息代码示例 ActiveMQ实现发送消息和接收消息Demo 1. 添加依赖

    org.apache.activemq
    activemq-all
    5.5.0

2. 代码示例
public class ActiveMQClient {
    
    private Logger logger = LoggerFactory.getLogger(ActiveMQClient.class);
    private String userName = "admin";
    private String password = "admin";
    private String brokerUrl = "127.0.0.1:61616";
    private Connection connection = null;
    private boolean connected = false;
    Destination destination = null;
    private MessageProducer producer = null;
    private MessageConsumer consumer = null;
    private Session session = null;

    
    public boolean connect() {
        ConnectionFactory connectionFactory;
        try {
            connectionFactory = new ActiveMQConnectionFactory(userName,password,"tcp://"+ brokerUrl);
            connection = connectionFactory.createConnection();
            connection.start();
            connected = true;
            logger.info("[ActiveMQ连接服务]服务连接成功");
            return connected;
        } catch (JMSException e) {
            e.printStackTrace();
            connected = false;
            logger.info("[ActiveMQ连接服务]服务连接失败");
            return connected;
        }
    }

    
    public boolean disconnect() {
        try {
            if(producer != null) {
                producer.close();
            }
            if(consumer != null) {
                consumer.close();
            }
            if(session != null) {
                session.close();
            }
            if(connection != null) {
                connection.close();
                connected = false;
            }
            logger.info("[ActiveMQ断开服务]服务断开成功");
            return true;
        } catch (JMSException e) {
            e.printStackTrace();
            logger.info("[ActiveMQ断开服务]服务断开失败,发生异常");
            return false;
        }
    }

    
    public boolean sendMessage(String topic, String message) {
        if(connection == null || !connected) {
            logger.info("[ActiveMQ发送消息]消息发送失败,服务尚未连接。");
            return false;
        }
        try {
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            destination = session.createTopic(topic);
            producer = session.createProducer(destination);
            TextMessage textMessage = session.createTextMessage(message);
            producer.send(textMessage);
            logger.info("[ActiveMQ发送消息]消息发送成功。主题:{},消息:{}", topic,message);
            return true;
        } catch (JMSException e) {
            e.printStackTrace();
            logger.info("[ActiveMQ发送消息]消息发送失败,发生异常。");
            return false;
        }
    }

    
    public boolean receiveMessage(String topic) {
        if(connection == null || !connected) {
            logger.info("[ActiveMQ订阅消息]消息订阅失败,服务尚未连接。");
            return false;
        }
        try {
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            destination = session.createTopic(topic);
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(message -> {
                try {
                    if(message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        logger.info("[ActiveMQ接收消息]消息主题:{},消息:{}", topic,textMessage.getText());
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                    logger.info("[ActiveMQ接收消息]失败,发生异常");
                }

            });
            logger.info("[ActiveMQ订阅消息]消息订阅成功。");
            return true;
        } catch (JMSException e) {
            e.printStackTrace();
            logger.info("[ActiveMQ订阅消息]失败,发生异常。");
            return false;
        }
    }
    
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5683214.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存