RabbitMQ

RabbitMQ,第1张

RabbitMQ 直连模型 项目整体视图

导入依赖
       
            junit
            junit
            4.12
        
        
            com.rabbitmq
            amqp-client
            5.14.0
        
        
            junit
            junit
            RELEASE
            compile
        
创建生产者Producer
package a_direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

//消息的生产者
public class Producer {
    @Test
    public void sendMessage() throws Exception{
        //1、创建一个连接工厂
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //2、设置相关的参数
        connectionFactory.setHost("39.105.127.232");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("sxt");//设置用户密码
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("/v-sxt");//虚拟主机
        //3、从链接工厂里面创建一个链接
        Connection connection = connectionFactory.newConnection();
        //4、创建一个通道
        Channel channel = connection.createChannel();
        
        channel.queueDeclare("hello",true,false,false,null);
        
        channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());
        //7、关闭通道和连接
        channel.close();
        connection.close();
        System.out.println("消息发送成功");
    }
}

创建消费者Consumer
package a_direct;

import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;

//消息的生产者
public class Consumer {
    @Test
    public void sendMessage() throws Exception{
        //1、创建一个连接工厂
        ConnectionFactory connectionFactory=new ConnectionFactory();
        //2、设置相关的参数
        connectionFactory.setHost("39.105.127.232");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("sxt");//设置用户密码
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("/v-sxt");//虚拟主机
        //3、从链接工厂里面创建一个链接
        Connection connection = connectionFactory.newConnection();
        //4、创建一个通道
        Channel channel = connection.createChannel();
        
        channel.queueDeclare("hello",true,false,false,null);

        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者接收到消息:"+new String(body));
                //super.handleDelivery(consumerTag, envelope, properties, body);
            }
        });
        //不能让程序结束
        System.in.read();


//        //7、关闭通道和连接
//        channel.close();
//        connection.close();
//        System.out.println("消息发送成功");
    }
}

编写工具类优化代码

项目整体视图

生产者Producer
package a_direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import utils.RabbitMQUtils;

//消息的生产者
public class Producer {
    @Test
    public void sendMessage() throws Exception{
//        //1、创建一个连接工厂
//        ConnectionFactory connectionFactory=new ConnectionFactory();
//        //2、设置相关的参数
//        connectionFactory.setHost("39.105.127.232");
//        connectionFactory.setPort(5672);
//        connectionFactory.setUsername("sxt");//设置用户密码
//        connectionFactory.setPassword("123456");
//        connectionFactory.setVirtualHost("/v-sxt");//虚拟主机
//        //3、从链接工厂里面创建一个链接
//        Connection connection = connectionFactory.newConnection();
        Connection connection = RabbitMQUtils.getConnection();
        //4、创建一个通道
        Channel channel = connection.createChannel();
        
         
        channel.queueDeclare("hello",true,false,false,null);要想消息持久化就要设置上面那行的参数
        
          channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
        //channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());要想消息持久化就要设置上面那行的参数
        //7、关闭通道和连接
//        channel.close();
//        connection.close();
        RabbitMQUtils.closeChannelAndConnection(channel,connection);
        System.out.println("消息发送成功");
    }
}

消费者Consumer
package a_direct;

import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;

import java.io.IOException;

//消息的生产者
public class Consumer {
    @Test
    public void sendMessage() throws Exception{
//        //1、创建一个连接工厂
//        ConnectionFactory connectionFactory=new ConnectionFactory();
//        //2、设置相关的参数
//        connectionFactory.setHost("39.105.127.232");
//        connectionFactory.setPort(5672);
//        connectionFactory.setUsername("sxt");//设置用户密码
//        connectionFactory.setPassword("123456");
//        connectionFactory.setVirtualHost("/v-sxt");//虚拟主机
//        //3、从链接工厂里面创建一个链接
//       
        Connection connection = RabbitMQUtils.getConnection();
        //4、创建一个通道
        Channel channel = connection.createChannel();
        
        channel.queueDeclare("hello",true,false,false,null);

        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者接收到消息:"+new String(body));
                //super.handleDelivery(consumerTag, envelope, properties, body);
            }
        });
        //不能让程序结束
        System.in.read();


//        //7、关闭通道和连接
//        channel.close();
//        connection.close();
//        System.out.println("消息发送成功");
    }
}

工具类RabbitMQUtils
package utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

//工具类
public class RabbitMQUtils {
    private static ConnectionFactory connectionFactory;
    static {
        connectionFactory=new ConnectionFactory();
        //2、设置相关的参数
        connectionFactory.setHost("39.105.127.232");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("sxt");//设置用户密码
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("/v-sxt");//虚拟主机
    }
    
    public static Connection getConnection(){
        try {
            //从链接工厂里面创建一个链接
            Connection connection = connectionFactory.newConnection();
            return connection;
        }catch (Exception e){
            System.out.println(e);
        }
        return null;
    }
    
    public static void closeChannelAndConnection(Channel channel,Connection connection){
        try {
            if(null!=channel) channel.close();
            if (null!=connection) connection.close();
        }catch (Exception e){
            System.out.println(e);
        }
    }
}

work queue 整体视图


注意:这里的RabbitMQUtils工具类为上种模型所编写

创建生产者
package b_workqueue;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import utils.RabbitMQUtils;

//生产者
public class Producer {
    @Test
    public void sendMessage() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare("hello",false,false,false,null);
        //发送消息
        for (int i=1;i<=100;i++){
            channel.basicPublish("","hello",null,("hello rabbitmq-----workqueue"+i).getBytes());
        }
        //关闭
        RabbitMQUtils.closeChannelAndConnection(channel,connection);
        System.out.println("消息发送成功");
    }
}

创建消费者1
package b_workqueue;

import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;

import java.io.IOException;

//消费者
public class Consumer1 {
    @Test
    public void receiveMessage() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare("hello",false,false,false,null);
        //接收消息
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者【1】接收消息:"+new String(body));
            }
        });
        System.in.read();
        RabbitMQUtils.closeChannelAndConnection(channel,connection);
    }
}

创建消费者2
package b_workqueue;

import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;

import java.io.IOException;

//消费者
public class Consumer2 {
    @Test
    public void receiveMessage() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        //创建信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare("hello",false,false,false,null);
        //接收消息
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者【2】接收消息:"+new String(body));
            }
        });
        System.in.read();
        RabbitMQUtils.closeChannelAndConnection(channel,connection);
    }
}

测试

注意测试的时候要先运行消费者然后再运行生产者,消费者平均消费

消息的自动确认机制


当消费者1的消费能力有限时,此时消费者就不是平均消费,消费者1消费不完的,可由消费者2消费

fanout

广播

创建生产者
package c_fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import utils.RabbitMQUtils;

//生产者
public class Proudcer {
    @Test
    public void senMessage() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //设置交换机
        channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
        //向交换机发消息
        channel.basicPublish("logs","",null,"我是一个fanout类型的消息".getBytes());
        //关闭
        RabbitMQUtils.closeChannelAndConnection(channel,connection);
        System.out.println("消息发送成功");
    }
}

创建消费者1
package c_fanout;

import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;

import java.io.IOException;

//消费者1
public class Consumer1 {
    @Test
    public void Message() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //设置交换机
        channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
       //从通道里面得到一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //把临时队列和交换机进行绑定
        channel.queueBind(queue,"logs","");
        //接受消息
        channel.basicConsume(queue,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者【1】接收到消息是:"+new String(body));
            }
        });
        System.out.println("消费者【1】启动成功");
        System.in.read();
    }
}

创建消费者2
package c_fanout;

import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;

import java.io.IOException;

//消费者2
public class Consumer2 {
    @Test
    public void Message() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //设置交换机
        channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
       //从通道里面得到一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //把临时队列和交换机进行绑定
        channel.queueBind(queue,"logs","");
        //接受消息
        channel.basicConsume(queue,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者【2】接收到消息是:"+new String(body));
            }
        });
        System.out.println("消费者【2】启动成功");
        System.in.read();
    }
}

测试


Routing-Direct

路由-直连

创建生产者
package d_routing_direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import utils.RabbitMQUtils;

//生产者
public class Proudcer {
    @Test
    public void senMessage() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //设置交换机
        channel.exchangeDeclare("logs", BuiltinExchangeType.DIRECT);
        //向交换机发消息 发送了四条消息,分别指定了路由key
        channel.basicPublish("logs","info",null,"我是一个routingkey-Direct类型的消息-info".getBytes());
        channel.basicPublish("logs","warn",null,"我是一个routingkey-Direct类型的消息-warn".getBytes());
        channel.basicPublish("logs","debug",null,"我是一个routingkey-Direct类型的消息-debug".getBytes());
        channel.basicPublish("logs","error",null,"我是一个routingkey-Direct类型的消息-error".getBytes());

        //关闭
        RabbitMQUtils.closeChannelAndConnection(channel,connection);
        System.out.println("消息发送成功");
    }
}

创建消费者1
package d_routing_direct;

import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;

import java.io.IOException;

//消费者1
public class Consumer1 {
    @Test
    public void Message() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //设置交换机
        channel.exchangeDeclare("logs", BuiltinExchangeType.DIRECT);
       //从通道里面得到一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //把临时队列和交换机进行绑定
        channel.queueBind(queue,"logs","info");
        channel.queueBind(queue,"logs","warn");
        channel.queueBind(queue,"logs","debug");
        //接受消息
        channel.basicConsume(queue,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者【info】接收到消息是:"+new String(body));
            }
        });
        System.out.println("消费者【info】启动成功");
        System.in.read();
    }
}

创建消费者2
package d_routing_direct;

import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;

import java.io.IOException;

//消费者2
public class Consumer2 {
    @Test
    public void Message() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //设置交换机
        channel.exchangeDeclare("logs", BuiltinExchangeType.DIRECT);
       //从通道里面得到一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //把临时队列和交换机进行绑定
        channel.queueBind(queue,"logs","error");
        //接受消息
        channel.basicConsume(queue,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者【error】接收到消息是:"+new String(body));
            }
        });
        System.out.println("消费者【error】启动成功");
        System.in.read();
    }
}

测试


Routing-Topic

.*匹配一个
.#匹配0个或多个

生产者
package e_routing_topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.junit.Test;
import utils.RabbitMQUtils;

//生产者
public class Proudcer {
    @Test
    public void senMessage() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //设置交换机
        channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC);
        //向交换机发消息 发送了四条消息,分别指定了路由key
        channel.basicPublish("topic","user.insert",null,"我是一个routingkey-Topic类型的消息-user.insert".getBytes());
        channel.basicPublish("topic","user.insert.a",null,"我是一个routingkey-Topic类型的消息-user.insert.a".getBytes());
        //关闭
        RabbitMQUtils.closeChannelAndConnection(channel,connection);
        System.out.println("消息发送成功");
    }
}

消费者1
package e_routing_topic;

import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;

import java.io.IOException;

//消费者1
public class Consumer1 {
    @Test
    public void Message() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //设置交换机
        channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC);
       //从通道里面得到一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //把临时队列和交换机进行绑定
        channel.queueBind(queue,"topic","user.*");

        //接受消息
        channel.basicConsume(queue,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者【*】接收到消息是:"+new String(body));
            }
        });
        System.out.println("消费者【*】启动成功");
        System.in.read();
    }
}

消费者2
package e_routing_topic;

import com.rabbitmq.client.*;
import org.junit.Test;
import utils.RabbitMQUtils;

import java.io.IOException;

//消费者1
public class Consumer2 {
    @Test
    public void Message() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //设置交换机
        channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC);
       //从通道里面得到一个临时队列
        String queue = channel.queueDeclare().getQueue();
        //把临时队列和交换机进行绑定
        channel.queueBind(queue,"topic","user.#");

        //接受消息
        channel.basicConsume(queue,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者【#】接收到消息是:"+new String(body));
            }
        });
        System.out.println("消费者【#】启动成功");
        System.in.read();
    }
}

测试


集成spring boot-直连

两个spring boot项目

生产者测试类
package com.sxt;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RabbitmqSpringbootProducerApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
        System.out.println(rabbitTemplate);
    }
    
    @Test
    void testHello(){
        rabbitTemplate.convertAndSend("hello","hello world");
        System.out.println("消息发送成功");
    }

}

yml配置文件
server:
  port: 8001
spring:
  application:
    name: producer
  rabbitmq:
    host: 39.105.127.232
    port: 5672
    username: user
    password: 123456
    virtual-host: /v-sxt

编写生产者HelloConfig配置
package com.sxt.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class HelloConfig {
    //创建一个队列
    @Bean
    public Queue hello(){//ctrl + p 显示其他参数   这里可以和hello项目一样设置五个参数
        Queue hello = new Queue("hello");
        return hello;
    }
}

消费者HelloConsumer
package com.sxt.consumer;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queuesToDeclare = {@Queue("hello")})
public class HelloConsumer {
    @RabbitHandler
    public void receive(String body){
        System.out.println("消费者收到消息,内容为:"+body);
    }
}

消费者与生产者的yml配置文件一样,先启动生产者再启动消费者

测试

集成spring boot-work 生产者测试类
    @Test
    void testWork(){
        for (int i=1;i<=10;i++){
            rabbitTemplate.convertAndSend("work","hello work--"+i);
        }
        System.out.println("消息全部发送成功");
    }
生产者配置类WorkConfig
package com.sxt.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class WorkConfig {
    //创建一个队列
    @Bean
    public Queue work(){//ctrl + p 显示其他参数   这里可以和hello项目一样设置五个参数
        Queue work = new Queue("work");
        return work;
    }
}

消费者配置类WorkConsumer
package com.sxt.consumer;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component

public class WorkConsumer {
    @RabbitListener(queuesToDeclare = {@Queue("work")})
    public void receive1(String body){
        System.out.println("消费者[1]收到消息,内容为:"+body);
    }

    @RabbitListener(queuesToDeclare = {@Queue("work")})
    public void receive2(String body){
        System.out.println("消费者[2]收到消息,内容为:"+body);
    }
}

测试


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5688515.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存