1、下载安装包(地址戳这里)apache-activemq-5.16.3-bin.tar.gz并上传到云服务器。
2、解压:tar -zvxf apache-activemq-5.16.3-bin.tar.gz
3、相关配置文件
activemq.xml可以修改各协议连接的ip地址:
jetty.xml可以修改管理后台的ip及端口
jetty-realm.properties可查看账户名及密码
4、进入bin目录执行./activemq start命令启动activeMq,其他命令如下:
5、访问http://xxxx:xxxx/8161/admin进入管理页面
注:如不能访问请开启防火墙端口,如是云服务器,只要在入方向开放对应端口就可以,更多安装及启动方式请 戳这里
二、springboot集成activemq1、添加依赖包
org.apache.activemq activemq-all5.8.0
2、发布者
ActiveMqttClient(单例模式连接类)
package com.mq.server.busi.config; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.Session; import javax.jms.TopicConnection; import javax.jms.TopicSession; public class ActiveMqttClient { private static volatile ActiveMqttClient instance = null; private static TopicConnection connection = null; private static final String USERNAME = env("ACTIVEMQ_USER", "admin"); private static final String PASSWORD = env("ACTIVEMQ_PASSWORD", "password"); private static final String HOST = env("ACTIVEMQ_HOST", "118.31.168.121"); private static final int PORT = Integer.parseInt(env("ACTIVEMQ_PORT", "61616")); private ActiveMqttClient() { try { //创建连接工厂 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://" + HOST + ":" + PORT); //创建连接 connection = connectionFactory.createTopicConnection(USERNAME, PASSWORD); //开启连接 connection.start(); } catch (Exception e) { e.printStackTrace(); } } public static synchronized ActiveMqttClient getInstance() { if (instance == null) { instance = new ActiveMqttClient(); } return instance; } public TopicSession getSession() { try { //创建会话,不需要事务则传入false,注释session.commit(),如果是需要事务则传入true,放开session.commit() return connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); } catch (Exception e) { e.printStackTrace(); } return null; } private static String env(String key, String defaultValue) { String rc = System.getenv(key); if( rc== null ) { return defaultValue; } return rc; } }
MqServerBusiServiceImpl(发送消息)
package com.mq.server.busi.service.impl; import com.mq.server.busi.config.ActiveMqttClient; import com.mq.server.busi.service.IMqServerBusiService; import org.springframework.stereotype.Service; import javax.jms.MapMessage; import javax.jms.Topic; import javax.jms.TopicPublisher; import javax.jms.TopicSession; @Service public class MqServerBusiServiceImpl implements IMqServerBusiService { private ActiveMqttClient activeMqttClient = ActiveMqttClient.getInstance(); @Override public void sendMessage(String msg) { try { TopicSession session = activeMqttClient.getSession(); //创建topic Topic topic = session.createTopic("topic-test"); TopicPublisher publisher = session.createPublisher(topic); String message1 = "发送消息:测试activemq用mqtt协议以Topic主题发布和订阅方式发送消息"; MapMessage message = session.createMapMessage(); message.setString("text", message1); publisher.send(message); session.commit(); } catch (Exception e) { e.printStackTrace(); } } }
3、消费者
package com.sys.server.busi.config; import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQMapMessage; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Component; import javax.jms.*; import java.util.Map; @Component public class ActiveMqttConsume implements ApplicationListener{ private ActiveMqttClient activeMqttClient = ActiveMqttClient.getInstance(); public void reveiveMessage() { try { TopicSession session = activeMqttClient.getSession(); //创建topic Topic topic = session.createTopic("topic-test"); MessageConsumer consumer = session.createConsumer(topic); do { Message msg = consumer.receive(); if (msg instanceof TextMessage) { String body = ((TextMessage) msg).getText(); System.out.println(body); } if (msg instanceof ActiveMQMapMessage) { Map body = ((ActiveMQMapMessage) msg).getContentMap(); System.out.println(body); } if (msg instanceof ActiveMQBytesMessage) { String body = ((ActiveMQBytesMessage) msg).readUTF(); System.out.println(body); } else { System.out.println("Unexpected message type: " + msg.getClass()); } } while (true); } catch (Exception e) { e.printStackTrace(); } } @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { this.reveiveMessage(); } }
启动服务验证
PS:大家可以参考部署那一块examples下面的demo,里面有各个协议各种语言的demo;另外,activeMq也可以换成apollo,apollo 是 ActiveMQ的子工程,是 ActiveMQ的下一代消息代理,是一个更快、更可靠、更容易维护的消息代理。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)