RabbitMQ集成JAVA(简单模式,工作模式)

RabbitMQ集成JAVA(简单模式,工作模式),第1张

RabbitMQ集成JAVA(简单模式,工作模式)

第一步导入依赖:

    com.rabbitmq
    amqp-client
    5.7.3

1.简单模式连接(1个消费者获取一个队列):
发布者代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Providers {
    public static void main(String[] args) throws IOException, TimeoutException {
        
        //创建连接
        ConnectionFactory factory = new ConnectionFactory();
        //地址
        factory.setHost("192.168.17.8");
        //用户的【Can access virtual hosts】值
        factory.setVirtualHost("/");
        //服务器端口
        factory.setPort(5672);
        //用户名
        factory.setUsername("guest");
        //密码
        factory.setPassword("llh123456");
        //获取连接
        Connection connection = factory.newConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //申明队列(对列名称,是否持久化,是否独占频道,是否自动销毁,其他参数Map集合)
        channel.queueDeclare("First_llh",true,false,false,null);
        //设置消息
        String massges = "hellow rabbitmq";
        //发送消息(对列名称,BasicProperties基础配置,消息的字节数组)
        channel.basicPublish("","First_llh",null,massges.getBytes());
        //关闭频道
        channel.close();
        //关闭连接
        connection.close();
    }
}
接收者代码:
import com.rabbitmq.client.*;
import util.RabbitConnection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Conmmons {
    public static void main(String[] args) throws IOException, TimeoutException {
        
        //获取连接
        Connection geust = RabbitConnection.getConection("192.168.17.8", "/", 5672, "guest", "llh123456");
        //创建频道
        Channel channel = geust.createChannel();
        //接受消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            //获取消息(delivery.getBody() = [B@4322ce5)然后设置编码UTF_8编码,在转换string类型
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("massges:[" + message +"]");
        };
        //对列名称,是否持久化,
        channel.basicConsume("First_llh",true,deliverCallback,(consumerTag)->{});
    }
}
考虑到创建连接的代码冗余:

写一个连接的工具类

package util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitConnection {
    public static Connection getConection(String host,String SSL,Integer port,String user,String pwd) throws IOException, TimeoutException {
        //创建连接
        ConnectionFactory factory = new ConnectionFactory();
        //地址
        factory.setHost(host);
        //用户的【Can access virtual hosts】值
        factory.setVirtualHost(SSL);
        //服务器端口
        factory.setPort(port);
        //用户名
        factory.setUsername(user);
        //密码
        factory.setPassword(pwd);
        //获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

调用工具类获取连接

 //获取连接
Connection geust = RabbitConnection.getConection("192.168.17.8", "/", 5672, "guest", "llh123456");
注:接收者启动会监视队列是否发送了新的消息,实时刷新的
2.工作模式连接(2个消费者获取一个队列):
接收者1类代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import util.RabbitConnection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class ConmmonsTwo {
    public static void main(String[] args) throws IOException, TimeoutException {
        
        //获取连接
        Connection geust = RabbitConnection.getConection("192.168.17.8", "/", 5672, "guest", "llh123456");
        //创建频道
        Channel channel = geust.createChannel();
        //接受消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            //获取消息(delivery.getBody() = [B@4322ce5)然后设置编码UTF_8编码,在转换string类型
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("massges:[" + message +"]");
        };
        //对列名称,是否持久化,
        channel.basicConsume("First_llh",true,deliverCallback,(consumerTag)->{});
    }
}
接收者2类代码
import com.rabbitmq.client.*;
import util.RabbitConnection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Conmmons {
    public static void main(String[] args) throws IOException, TimeoutException {
        
        //获取连接
        Connection geust = RabbitConnection.getConection("192.168.17.8", "/", 5672, "guest", "llh123456");
        //创建频道
        Channel channel = geust.createChannel();
        //接受消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            //获取消息(delivery.getBody() = [B@4322ce5)然后设置编码UTF_8编码,在转换string类型
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("massges:[" + message +"]");
        };
        //对列名称,是否持久化,
        channel.basicConsume("First_llh",true,deliverCallback,(consumerTag)->{});
    }
}
注:工作模式下对列会均匀分配资源

1.同时运行 接收者1类 和 接收者2类

发送10条数据

 读取者1的结果

 

读取者2的结果

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5686433.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存