WebSphere MQ 接收发送
添加mq jar
类介绍:
SendMSG:消息发送类。
Main():主方法。
SendMSG():消息发送方法。
方法描述:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package test;
public class SendMSG{
MQEnvironmenthostname = "19216810201";
//通道类型为服务器连接通道
MQEnvironmentchannel = "tongdao";
MQEnvironmentCCSID = 1381;
//消息队列端口号
MQEnvironmentport = 10618;
try{
//建立队列管理器QM_SERVER为队列管理器名称
MQQueueManager qMgr = new MQQueueManager("test");
int openOptions = MQCMQOO_INPUT_AS_Q_DEF|MQCMQOO_OUTPUTMQCMQOO_INQUIRE;//建立队列INITQ队列名称INITQ为本地队列
MQQueue queue = qMgraccessQueue("wanghui",openOptions,null,null,null);
Systemoutprintln("成功建立通道");
MQMessage message = new MQMessage();
messageformat = MQCMQFMT_STRING;
messagecharacterSet = 1381;
messagewriteString("王辉");
messageexpiry = -1;//设置消息用不过期
queueput(message);//将消息放入队列
queueclose();//关闭队列
qMgrdisconnect();//断开连接
}catch(EOFExceptione){
eprintStackTrace();
}catch(MQExceptione){
eprintStackTrace();
}catch(Exceptione){
eprintStackTrace();
}
}
ReceiveMSG:消息接收类。
Main():主方法。
ReceiveMSG():消息接收方法。
public class ReceiveMSG {
MQEnvironmenthostname="19216810201";//通道类型为服务器连接通道
MQEnvironmentchannel="tongdao";
MQEnvironmentCCSID=1381;
MQEnvironmentport=10618;
try{
//建立队列管理器QM_SERVER为队列管理器名称
MQQueueManager qMgr = new MQQueueManager("test");
int openOptions=MQCMQOO_INPUT_AS_Q_DEF|MQCMQOO_OUTPUT|MQCMQOO_INQUIRE;//建立队列INITQ队列名称INITQ为本地队列
MQQueue queue=qMgraccessQueue("wanghui",openOptions,null,null,null);
Systemoutprintln("成功建立通道");
MQMessage message= new MQMessage();
messageformat=MQCMQFMT_STRING;
messagecharacterSet=1381;
//从队列中获取消息
MQGetMessage Optionspmo=new MQGetMessageOptions();
queueget(message,pmo);
Stringchars=messagereadLine();
Systemoutprintln(chars);
queueclose();//关闭队列
qMgrdisconnect();//断开连接
}catch(EOFExceptione){
eprintStackTrace();
}catch(MQExceptione){
eprintStackTrace();
}catch(Exceptione){
eprintStackTrace();
}
}
消息队列之 RabbitMQ(推荐)
>
首先消息是网络通讯的载体,队列可以理解是一种先进先出的数据结构,消息队列是存放消息的容器,是分布式系统中的重要组件。消息队列的优势在于:解耦、异步、削峰,把相关性不
强的模块独立分开视为解耦,异步就是非必要逻辑异步方式处理,加快响应速度,削峰是避免短期高并发导致系统问题进行缓冲队列处理。消息队列的缺点在于:加强系统复杂性、系统可用性降低,使
用了消息队列系统出现问题排查的范围就变大、需要考虑消息队列导致的问题。
本文说明主流的消息队列,针对使用过的zeroMQ和rabbitMQ、Kakfa:
zeroMQ :C语言开发,号称最快的消息队列,本着命名zero的含义,中油中间架构使用简单,表面上是基于socket的封装套接字API,在多个节点应用场景下非常灵活、架构的可扩展性很强,
实现N到M的协同处理;
zmq的socket模式: req、rep、push、pull、pub、sub、router、dealer。
(1)req和rep:请求回应模型,req和rep都可以请求和回答,不同的只是req是先send再rec,rep是先rec再send。支持N个请求端一个接收端,也支持N个接收端一个请求端。N个接收端采
用rr负载均衡。 哪个是“一”端,哪个就bind端口,“N”端就只能connect,所以,req+rep无论谁bind端口,肯定要有一个是“一”。
(2) router和dealer:随时可以发送和接收的req和rep,看起来router+dealer跟 req+rep属于同类功能。因为router和dealer可以随时发送接收,所以它们可以用来做路由。一个router用来响
应N个req,然后它在响应处理的时候,再通过另一个socket把请求扔出去,接收者是另外的M个rep,这就做到N:M。
(3)pub和sub :订阅和推送,对应发布者和订阅者。
(4)push和pull:就是管道,一个只推数据,一个只拉数据。
rabbitMQ :使用erlang语言开发,高并发特点,基于AMQP(即Advanced Message Queuing Protocol)的开源高级消费队列,AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/
订阅)、可靠性、安全),企业级适应性和稳定性,并且有WEB管理界面方便用户查看和管理。以下是rabbitMQ的结构图:
(1)Producer:数据发送方,一般一个Message有两个部分:payload(有效载荷)和label(标签),payload是数据实际载体,label是exchange的名字或者一个tag,决定发给哪个Consumer;
(2)Exchange: 内部 消息交换器,exchange从生产者那收到消息后,一般会指定一个Routing Key,来指定这个消息的路由规则,当然Routing Key需要与Exchange Type及Binding key联合使用
才能最终生效,根据路由规则,匹配查询表中的routing key,分发消息到queue中;
(3)binding:即绑定,绑定(Binding)Exchange与Queue的同时,一般会指定一个Binding key,但不一定会生效,依赖于Exchange Type;
(4)Queue:即队列是rabbitmq内部对象,用于存储消息,一个message可以被同时拷贝到多个queue中,queue对load balance的处理是完美的。对于多个Consumer来说,RabbitMQ 使用循
环的方式(round-robin)的方式均衡的发送给不同的Consumer;
(5)Connection与Channel: Connection 就是一个TCP的连接,Producer和Consumer都是通过TCP连接到RabbitMQ Server, Channel 是为了节省开销建立在上述的TCP连接中的接口,大部
分的业务 *** 作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等;
(6)Consumer:即数据的接收方,如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者;
(7)Broker: 即RabbitMQ Server,其作用是维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输;
(8)Virtual host:即虚拟主机,当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue;
rabbitMQ消息转发中的路由转发是重点,生产者Producer在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange收到消息后可以看到消息中指定的RoutingKey,再根据当前
Exchange的ExchangeType,按一定的规则将消息转发到相应的queue中去。三种Exchage type:
(1)Direct exchange :直接转发路由,原理是通过消息中的routing key,与binding 中的binding-key 进行比对,若二者匹配,则将消息发送到这个消息队列;
比如:消息生成者生成一个message(payload是1,routing key为苹果),两个binding(binding key分别为苹果、香蕉);exchange比对消息的routing key和binding key后,将消息发给了queue1,消息消费者1获得queue1的消息;
(2)Topic exchange: 通配路由,是direct exchange的通配符模式,
比如:消息生成者生成一个message(payload是1,routing key为quickorangerabbit),两个binding(binding key分别为orange 、 rabbit);exchange比对消息的routing key和binding key
后,exchange将消息分发给两个queue,两个消费者获得queue的消息;
(3)Fanout exchange: 复制分发路由,原理是不需要routkey,当exchange收到消息后,将消息复制多份转发给与自己绑定的消息队列,
比如:消息生成者生成一个message(payload是1,routing key为苹果),两个binding(binding key分别为苹果、香蕉);exchange将消息分发给两个queue,两个消费者获得queue的消息;
rabbiMQ如何保证消息的可靠性?
(1)Message durability:消息持久化,非持久化消息保存在内存中,持久化消息写入内存同时也写入磁盘;
(2)Message acknowledgment:消息确认机制,可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移
除。通过ACK。每个Message都要被acknowledged(确认,ACK)。
(3)生产者消息确认机制:AMQP事务机制、生产者消息确认机制(publisher confirm)。
最后, 对比一下zeroMQ、rabbitMQ、kafka主流的消息队列的性能情况:
对比方向 概要
吞吐量 万级 RabbitMQ 的吞吐量要比 十万级甚至是百万级Kafka 低一个数量级。ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。
可用性 都可以实现高可用。RabbitMQ 都是基于主从架构实现高可用性。 kafka 也是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
时效性 RabbitMQ 基于erlang开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。其他两个个都是 ms 级。
功能支持 Kafka 功能较为简单,主要支持简单的MQ功能,在大数据领域实时计算以及日志采集被大规模使用;ZeroMQ能够 实现RabbitMQ不擅长的高级/复杂 的队列
消息丢失 RabbitMQ有ack模型,也有事务模型,保证至少不会丢数据, Kafka 理论上不会丢失,但不排除批量情况下。
开发环境 RabbitMQ需要erlang支持、kafka基于zookeeper管理部署、zeroMQ程序编译调用即可
封装库 基于c++开发,使用RabbitMQ-C,cppKafka,而zeroMQ基于C语言开发,无需封装
mq_open函数用来打开消息队列,如果mq_open函数未实现,可以尝试使用mq_close函数来关闭打开的消息队列。另外,也可以尝试通过使用mq_getattr函数来获取消息队列的属性,或者使用mq_send函数向消息队列发送消息。
JMS:Java Message Service,java消息服务,是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。
p2p:点对点发送,一个消息只能被消费一次
涉及:
消息队列(Queue)
发送者(Sender)
接收者(Receiver)
每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着信息,直到它们被消费或超时。
示意图:p2p示意图
特点:
Pub/Sub:发布订阅,一个消息可以被消费多次
涉及角色:
主题(Topic)
发布者(Publisher)
订阅者(Subscriber)
客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
示意图:Pub/Sub示意图
特点:
MQ:消息中间件(MOM:Message Orient middleware),消息队列
作为系统间通信的必备技术,低耦合、可靠传输、流量控制、最终一致性
实现异步消息通信
Apache下
完全支持Java的JMS协议
消息模式:1、点对点 2、发布订阅
Erlang语言实现的开源的MQ中间件,支持多种协议
主要的通信协议是AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用协议的一个开放标准,为面向消息的中间件设计。
Apache下开源项目
高性能分布式消息队列,一般海量数据传输,大数据部门用
单机吞吐量:10w/s
阿里 贡献给了Apache
参考了Kafka实现基于Java 消息中间件
消息传输最快
RabbitMQ是一个开源的AMQP实现,服务端用Erlang语言编写,支持多种客户端,如:Python、Ruby、NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
涉及角色:
可以基于Docker安装RabbitMQ,记住其端口:
15672:网页版可视化服务器数据
5672:客户端连接的端口号
点对点消息
一个消息只能消费一次
只需要队列就可以,不需要交换机
消息发送者和消息接收者者可以不同时在线
RabbitMQ特色就在于Exchange,主要有以下类型:
fanout:只要有消息就转发给绑定的队列,不会进行消息的路由判断
direct:会根据路由匹配规则,将消息发送到指定队列中,注意路由规则不支持特殊字符
topic:会根据路由匹配规则,将消息发送到指定队列中,注意路由规则支持特殊字符,比如: #
kafka是个日志处理缓冲组件,在大数据信息处理中使用。和传统的消息队列相比较简化了队列结构和功能,以流形式处理存储(持久化)消息(主要是日志)。日志数据量巨大,处理组件一般会处理不过来,所以作为缓冲层的kafka,支持巨大吞吐量。为了防止信息丢失,其消息被调用后不直接丢弃,要多存储一段时间,等过期时间过了才丢弃。这是mq和redis不能具备的。主要特点如下:巨型存储量: 支持TB甚至PB级别数据。高吞吐,高IO:一般配置的服务器能实现单机每秒100K以上消息的传输。消息分区,分布式消费:能保消息顺序传输。 支持离线数据处理和实时数据处理。Scale out:支持在线水平扩展,以支持更大数据处理量
redis只是提供一个高性能的、原子 *** 作内存键值对,具有高速访问能力,可用做消息队列的存储,但是不具备消息队列的任何功能和逻辑,要作为消息队列来实现的话,功能和逻辑要通过上层应用自己实现。
我们以RabbitMQ为例介绍。它是用Erlang语言开发的开源的消息队列,支持多种协议,包括AMQP,XMPP, SMTP, STOMP。适合于企业级的开发。
MQ支持Broker构架,消息发送给客户端时需要在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
还有ActiveMq,ZeroMq等。功能基本上大同小异。并发吞吐TPS比较,ZeroMq 最好,RabbitMq 次之, ActiveMq 最差。
原文:
以上就是关于java怎么将mq接收的文件消息提取出来全部的内容,包括:java怎么将mq接收的文件消息提取出来、【rabbitMQ】消息队列之 rabbitMQ、消息队列之zeroMQ、rabbitMQ、kafka等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)