activeMq部署安装及在springboot中实现消息的发布订阅

activeMq部署安装及在springboot中实现消息的发布订阅,第1张

activeMq部署安装及在springboot中实现消息的发布订阅 一、linux环境部署

1、下载安装包(地址戳这里)apache-activemq-5.16.3-bin.tar.gz并上传到云服务器。

2、解压:tar -zvxf apache-activemq-5.16.3-bin.tar.gz

3、相关配置文件

activemq.xml可以修改各协议连接的ip地址:

jetty.xml可以修改管理后台的ip及端口

jetty-realm.properties可查看账户名及密码

4、进入bin目录执行./activemq start命令启动activeMq,其他命令如下:

 5、访问http://xxxx:xxxx/8161/admin进入管理页面

注:如不能访问请开启防火墙端口,如是云服务器,只要在入方向开放对应端口就可以,更多安装及启动方式请 戳这里

二、springboot集成activemq

1、添加依赖包


    org.apache.activemq
    activemq-all
    5.8.0

 2、发布者

 ActiveMqttClient(单例模式连接类)

package com.mq.server.busi.config;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;


public class ActiveMqttClient {

    private static volatile ActiveMqttClient instance = null;
    private static TopicConnection connection = null;
    
    private static final String USERNAME = env("ACTIVEMQ_USER", "admin");
    
    private static final String PASSWORD = env("ACTIVEMQ_PASSWORD", "password");
    
    private static final String HOST = env("ACTIVEMQ_HOST", "118.31.168.121");
    
    private static final int PORT = Integer.parseInt(env("ACTIVEMQ_PORT", "61616"));

    
    private ActiveMqttClient() {
        try {
            //创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://" + HOST + ":" + PORT);
            //创建连接
            connection = connectionFactory.createTopicConnection(USERNAME, PASSWORD);
            //开启连接
            connection.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static synchronized ActiveMqttClient getInstance() {
        if (instance == null) {
            instance = new ActiveMqttClient();
        }
        return instance;
    }

    
    public TopicSession getSession() {
        try {
            //创建会话,不需要事务则传入false,注释session.commit(),如果是需要事务则传入true,放开session.commit()
            return connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    private static String env(String key, String defaultValue) {
        String rc = System.getenv(key);
        if( rc== null ) {
            return defaultValue;
        }
        return rc;
    }
}
MqServerBusiServiceImpl(发送消息)
package com.mq.server.busi.service.impl;

import com.mq.server.busi.config.ActiveMqttClient;
import com.mq.server.busi.service.IMqServerBusiService;
import org.springframework.stereotype.Service;

import javax.jms.MapMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;


@Service
public class MqServerBusiServiceImpl implements IMqServerBusiService {

    private ActiveMqttClient activeMqttClient = ActiveMqttClient.getInstance();

    @Override
    public void sendMessage(String msg) {
        try {
            TopicSession session = activeMqttClient.getSession();
            //创建topic
            Topic topic = session.createTopic("topic-test");
            TopicPublisher publisher = session.createPublisher(topic);
            String message1 = "发送消息:测试activemq用mqtt协议以Topic主题发布和订阅方式发送消息";
            MapMessage message = session.createMapMessage();
            message.setString("text", message1);
            publisher.send(message);
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3、消费者

package com.sys.server.busi.config;

import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

import javax.jms.*;
import java.util.Map;


@Component
public class ActiveMqttConsume implements ApplicationListener {

    private ActiveMqttClient activeMqttClient = ActiveMqttClient.getInstance();


    public void reveiveMessage() {
        try {
            TopicSession session = activeMqttClient.getSession();
            //创建topic
            Topic topic = session.createTopic("topic-test");
            MessageConsumer consumer = session.createConsumer(topic);
            do {
                Message msg = consumer.receive();
                if (msg instanceof TextMessage) {
                    String body = ((TextMessage) msg).getText();
                    System.out.println(body);
                } if (msg instanceof ActiveMQMapMessage) {
                    Map body = ((ActiveMQMapMessage) msg).getContentMap();
                    System.out.println(body);
                }  if (msg instanceof ActiveMQBytesMessage) {
                    String body = ((ActiveMQBytesMessage) msg).readUTF();
                    System.out.println(body);
                } else {
                    System.out.println("Unexpected message type: " + msg.getClass());
                }
            } while (true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        this.reveiveMessage();
    }
}

启动服务验证

PS:大家可以参考部署那一块examples下面的demo,里面有各个协议各种语言的demo;另外,activeMq也可以换成apollo,apollo 是 ActiveMQ的子工程,是 ActiveMQ的下一代消息代理,是一个更快、更可靠、更容易维护的消息代理。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5596396.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存