《RabbitMQ 实战指南》第一章 RabbitMQ 简介

《RabbitMQ 实战指南》第一章 RabbitMQ 简介,第1张

《RabbitMQ 实战指南》第一章 RabbitMQ 简介 《RabbitMQ 实战指南》第一章 RabbitMQ 简介

文章目录
  • 《RabbitMQ 实战指南》第一章 RabbitMQ 简介
  • 一、什么是消息中间件
  • 二、消息中间件的作用
  • 三、RabbitMQ 的特点
  • 四、RabbitMQ 的安装及简单使用
    • 1.安装 Erlang
    • 2.RabbitMQ 的安装
    • 3.RabbitMQ 的运行
    • 4.生产和消费消息

一、什么是消息中间件

消息(Message)是指在应用间传送的数据,消息类型可以是文本字符串、JSON 或者内嵌对象

消息队列中间件(Message Queue Middleware,简称为 MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信

MQ 一般有两种传递模式:点对点(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输成为可能。发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。主题使得消息的订阅者与消息的发布者相互保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用

面向消息的中间件(简称为 MOM,Message Oriented Middleware)提供了以松耦合的灵活方式集成应用程序的一种机制。它们提供了基于存储和转发的应用程序之间的一部数据传输,即应用程序彼此不直接通信,而是与作为中介的 MQ 通信。MQ 提供了有保证的消息发送,应用程序开发人员无需了解远程过程调用(RPC)和网络通信协议的细节

MQ 适用于需要可靠的数据传输的分布式环境。采用 MQ 的系统中,不同的对象之间通过传递消息来激活对方的时间,以完成相应的 *** 作。发送者将消息发送给消息服务器,消息服务器将消息存放在若干队列中,在合适的时候再将消息转发给接收者。MQ 能在不同平台之间通信,它常被用来屏蔽各种平台及协议之间的特性,实现应用程序之间的协同,其优点在于能够在客户和服务器之间提供同步和异步的连接,并且在任何时刻都可以将消息进行传送或者存储转发,这也是它比远程过程调用更进步的原因

以图 1-1 为例,应用程序 A 与应用程序 B 通过使用 MQ 的应用程序编程接口(API,Application Program Interface)发送消息来进行通信

MQ 将消息路由给应用程序 B,这样消息就可以存在于完全不同的计算机上。消息中间件负责处理网络通信,如果网络连接不可用,MQ会存储消息,直到连接变得可用,再将消息转发给应用程序 B。灵活性的另一方面体现在,当应用程序 A 发送其消息时,应用程序 B 甚至可以处于不运行状态,MQ 将保留这份消息,直到应用程序 B 开始执行并消费消息,这样还防止了应用程序 A 因为等待应用程序 B 消费消息而出现阻塞

二、消息中间件的作用
  • 解耦:在项目启动之初来预测将来会碰到什么需要是极其困难的。消息中间件在处理过程中插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,这允许你独立地扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束即可
  • 冗余(存储):有些情况下,处理数据的过程会失败。消息中间件可以把数据进行持久化,直到它们已经被完全处理,通过这一方式可以规避数据丢失的风险
  • 扩展性:因为消息中间件解耦了应用的处理过程,所以提高消息入队和处理的效率是很容易的,只要另外增加处理过程即可,不需要改变代码,也不需要调节参数
  • 削峰:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果以能处理这类峰值为标准而投入硬件资源,无疑是巨大的浪费。使用消息中间件使得关键组件能够支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃
  • 可恢复性:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入消息中间件中的消息仍然可以在系统恢复后进行处理
  • 顺序保证:在大多数使用场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性
  • 缓冲:在任何重要的系统中,都会存在需要不同处理时间的元素。消息中间件通过一个缓冲层来帮助任务最高效率地执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度
  • 异步通信:在很多时候,应用不想也不需要立即处理消息。消息中间件提供了异步处理机制,允许应用把一些消息放入消息中间件中,但并不立即处理它,在之后需要的时候再慢慢处理
三、RabbitMQ 的特点

RabbitMQ 是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。RabbitMQ 发展到今天,被越来越多的人认可,这和它在易用性、扩展性、可靠性等方面的卓著表现是分不开的,RabbitMQ 的具体特点可以概括为以下几点:

  • 可靠性:RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认及发布确认等
  • 灵活的路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ 已经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件来实现自己的交换器
  • 扩展性:多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中的节点
  • 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用
  • 多种协议:RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP、MQTT 等多种消息中间件协议
  • 多语言客户端:RabbitMQ 几乎支持所有常用语言,比如 Java、Python、Ruby、PHP、C#、Javascript 等
  • 管理界面:RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等
  • 插件机制:RabbitMQ 提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件
四、RabbitMQ 的安装及简单使用 1.安装 Erlang

第一步,解压安装包,并配置安装目录,这里我们预备安装到 /opt/erlang 目录下

第二步,如果出现类似关键报错信息:No curses library functions found。那么此时需要安装 ncurses,安装步骤如下(遇到提示输入 y 后直接回车即可)如下:

第三步,安装 Erlang:

如果在安装的过程中出现类似 “No ***** found” 的提示,可根据提示信息安装相应的包,之后再执行第二或者第三步,直到提示安装完毕为止

第四步,修改 /etc/profile 配置文件,添加下面的环境变量:

最后执行如下命令让配置文件生效:

可以输入 erl 命令来验证 Erlang 是否安装成功

2.RabbitMQ 的安装

RabbitMQ 的安装比 Erlang 的安装要简单,直接将下载的安装包解压到相应的目录下即可。本书撰稿时的最新版本为 3.6.12,本书示例大多采用同一系列的 3.6.x 版本

这里选择将 RabbitMQ 安装到与 Erlang 同一目录(/opt)下面:

同样修改 /etc/profile 文件,添加下面的环境变量:

之后执行 source /etc/profile 命令让配置文件生效

3.RabbitMQ 的运行

在修改了 /etc/profile 配置文件之后,可以任意打开一个 Shell 窗口,输入以下命令以运行 RabbitMQ 服务:

在 rabbitmq-server 命令后面添加一个 “-detached” 参数是为了能够让 RabbitMQ 服务以守护进程的方式在后台运行,这样就不会因为当前 Shell 窗口的关闭而影响服务

运行 rabbitmqctl status 命令可以查看 RabbitMQ 当前状态,是否正常启动

也可以通过 rabbitmqctl cluster_status 命令来查看集群信息,目前只有一个 RabbitMQ 服务节点,可以看作单节点的集群

4.生产和消费消息

本节将演示如何使用 RabbitMQ Java 客户端生产和消费消息

使用的 RabbitMQ Java 客户端版本为 4.2.1,相应的 maven 构建文件如下:


    
        com.rabbitmq
        amqp-client
        4.2.1
    

默认情况下,访问 RabbitMQ 服务的用户名和密码都是 “guest”,这个账户有限制,默认只能通过本地网络(如 localhost)访问,远程网络访问受限,所以在实现生产和消费消息之前,需要另外添加一个用户,并设置相应的访问权限

计算机的世界是从 “Hello World!” 开始的,首先生产者发送一条消息 “Hello World!” 至 RabbitMQ 中,之后由消费者消费。下面演示生产者客户端的代码:

package com.sisyphus.rabbitmq.demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitProducer {
    private static final String EXCHANGE_NAME = "exchange_demo";
    private static final String ROUTING_KEY = "routingkey_demo";
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "192.168.0.2";
    private static final int PORT = 5672;   //RabbitMQ 服务端默认端口号为 5672

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("root");
        factory.setPassword("root123");
        Connection connection = factory.newConnection();    //创建连接
        Channel channel = connection.createChannel();       //创建信道
        //创建一个 type="direct"、持久化的、非自动删除的交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
        //创建一个持久化、非排他的、非自动删除的队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //将交换器与队列通过路由键绑定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        //发送一条持久化的消息:Hello World!
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        //关闭资源
        channel.close();
        connection.close();
    }
}

上面的生产者客户端的代码首先和 RabbitMQ 服务器建立一个连接(Connection),然后在这个连接之上创建一个信道(Channel)。之后创建一个交换器(Exchange)和一个队列(Queue),并通过路由键进行绑定。然后发送一条消息,最后关闭资源

消费者客户端代码:

package com.sisyphus.rabbitmq.demo;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer {
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "192.168.0.2";
    private static final int PORT = 5672;

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Address[] addresses = new Address[]{
                new Address(IP_ADDRESS, PORT)
        };
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("root");
        factory.setPassword("root123");
        //这里的连接方式与生产者的 demo 略有不同,注意辨别区别
        Connection connection = factory.newConnection();    //创建连接
        final Channel channel = connection.createChannel(); //创建信道
        channel.basicQos(64);   //设置客户端最多接收未被 ack 的消息的个数
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                throws IOException{
                System.out.println("recv message: " + new String(body));
                try{
                    TimeUnit.SECONDS.sleep(1);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, consumer);
        //等待回调函数执行完毕之后,关闭资源
        TimeUnit.SECONDS.sleep(5);
        channel.close();
        connection.close();
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5681986.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存