软件方面:主要是Erlang+RabbitMQ得提前安装好。不知道如何安装的可以转到我的文章查看。Erlang及RabbitMQ下载安装
二、设计思路①集群的实现我使用spring-cloud-gateway、Spring-cloud-alibaba等框架实现,设计基于微服务架构的集群。网关可以使用nacos配置中心实现负载均衡。
②集群方案解决了,接着便是集群通信了,在这之前我看了很多博文,因为websocket的session不支持序列化,导致不能使用Redis存储session二次利用,大佬们提供了许多解决集群通信的方式:比如提取公共服务创建Map
③实现流程:做的一个小案例,用户之间处于不同的服务器上,最终实现了指定用户发送消息和群体发送消息。
三、代码实现 1、父项目exam-web-net依赖2、子项目 ①exam-web-gatewayexam-web-gateway exam-web-websocket 1.16.18 2.1.6.RELEASE 2.1.0.RELEASE Greenwich.SR5 org.springframework.boot spring-boot-dependencies${spring-boot.version} pom import org.springframework.cloud spring-cloud-dependencies${spring-cloud.version} pom import com.alibaba.cloud spring-cloud-alibaba-dependencies${spring-cloud-alibaba.version} pom import
网关主要是用于统一管理集群入口,根据断言进行匹配转发至对应的集群服务器,比较重要的是yaml文件配置,同时创建一个启动类即可:com.alibaba.cloud spring-cloud-alibaba-nacos-discoverycom.alibaba.cloud spring-cloud-alibaba-nacos-configorg.springframework.cloud spring-cloud-starter-gateway2.1.5.RELEASE
spring: application: name: gateway-server cloud: nacos: discovery: # 服务地址和端口 server-addr: localhost:8812 namespace: 05c28632-9826-4852-83ff-9315fb619084 #dev config: server-addr: localhost:8812 namespace: 05c28632-9826-4852-83ff-9315fb619084 #dev file-extension: yaml group: DEV_GROUP #dev gateway: #路由规则 routes: - id: setstatus_route # - 表示该数据为一个集合 存在多个 # lb负载均衡 从注册中心读取服务进行使用 uri: lb://websocket-server #websocket服务ID predicates: - Path=/demo @SpringBootApplication public class GateWayApplication { public static void main(String[] args) { SpringApplication.run(GateWayApplication.class,args); } }②exam-web-websocke
这里RabbitMQ配置用到了该博主的代码,在自己的需求下进行了改造
pom依赖
org.springframework.boot spring-boot-starter-webcom.alibaba.cloud spring-cloud-alibaba-nacos-discoverycom.alibaba.cloud spring-cloud-alibaba-nacos-configorg.projectlombok lombokorg.springframework.boot spring-boot-starter-websocketorg.springframework.boot spring-boot-starter-amqpcom.alibaba fastjson1.2.9
websocket配置类
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } }websocket服务
import *.*.*.websocket.result.MessageResult; import *.*.*.websocket.service.WebSocketService; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; //encoder为自定义解码器,因为websocket返回对象默认只能为String,byte[]和可序列化的对象,可实现自定义类实现序列化接口并编写该接口的解码器 @ServerEndpoint(value = "/webSocket/{sid}",encoders = {EncoderServer.class}) @Component @Slf4j public class WebSocketServer { public static ConcurrentHashMapMessageResult实体类electricSocketMap = new ConcurrentHashMap (); private static ApplicationContext applicationContext; private AtomicInteger ati = new AtomicInteger(); public static void setApplicationContext(ApplicationContext applicationContext) { WebSocketServer.applicationContext = applicationContext; } @onOpen public void open(@PathParam("sid")String sid,Session session){ electricSocketMap.put(sid,session); Set > entries = electricSocketMap.entrySet(); for (Map.Entry entry : entries) { System.out.println(entry.getKey()+"的session值为"+entry.getValue()); } System.out.println("websocket连接成功,用户编号为"+sid); } @onClose public void close(@PathParam("sid")String sid,Session session){ Set > entries = electricSocketMap.entrySet(); for (Map.Entry entry : entries) { int value = ati.getAndIncrement(); System.out.println(value+"号为"+entry.getValue()+"Key为"+entry.getKey()); } if(electricSocketMap.containsKey(sid)){ electricSocketMap.remove(sid); System.out.println(String.format("--{%s}{%s}---",sid,"已断开链接")); } } @onMessage public void onMessage(@PathParam("sid")String sid, String message, Session session) { WebSocketService websocketService = applicationContext.getBean(WebSocketService.class); System.out.println("websocket received message:"+message); MessageResult messageResult = JSONObject.parseObject(message,MessageResult.class); if(messageResult.getToUserId() != null){ sid = messageResult.getToUserId(); } if(electricSocketMap.containsKey(sid)){ if(messageResult.getMsgType() == 1){ session = electricSocketMap.get(sid); sendMsg(session,messageResult); }else { Set > entries = electricSocketMap.entrySet(); for (Map.Entry entry : entries) { sendMsg(entry.getValue(),messageResult); } } } else{ if(messageResult.getMsgType() == 1){ websocketService.send(sid,messageResult); }else{ websocketService.sendAll(electricSocketMap,messageResult); } } } @onError public void onError(Session session, Throwable error) { error.printStackTrace(); System.out.println("发生错误"); } public static void sendMsg(Session session, MessageResult message) { try { session.getBasicRemote().sendObject(message); } catch (Exception e) { e.printStackTrace(); } } private void sendAll(MessageResult message) { WebSocketService websocketService = applicationContext.getBean(WebSocketService.class); websocketService.sendAll(electricSocketMap,message); } }
import lombok.Data; import java.io.Serializable; @Data public class MessageResult implements Serializable { private static final long serialVersionUID = 7849057146501910808L; private String toUserId; private String msgInfo; private int msgType; public MessageResult(){ } public MessageResult(String toUserId,String msgInfo,int msgType){ this.toUserId = toUserId; this.msgInfo = msgInfo; this.msgType = msgType; } }EncoderServer解码器
import *.*.*.websocket.result.MessageResult; import net.sf.json.JSONObject; import javax.websocket.EncodeException; import javax.websocket.Encoder; import javax.websocket.EndpointConfig; public class EncoderServer implements Encoder.TextWebSocketService{ @Override public String encode(MessageResult messageResult) throws EncodeException { JSonObject jsonObject = JSONObject.fromObject(messageResult); return jsonObject.toString(); } @Override public void init(EndpointConfig endpointConfig) { } @Override public void destroy() { } }
import *.*.*.websocket.result.MessageResult; import *.*.*.websocket.sender.FanoutSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.websocket.Session; import java.util.HashMap; import java.util.Map; @Service public class WebSocketService { @Autowired private FanoutSender fanoutSender; public void send(String id, MessageResult message) { MapExchangeConfiguration创建交换机和队列map=new HashMap<>(); map.put(id,message); fanoutSender.sendMessage(map); } public void sendAll(Map electricSocketMap, MessageResult message) { Map map=new HashMap<>(); for (Map.Entry sessionEntry : electricSocketMap.entrySet()) { map.put(sessionEntry.getKey(),message); } fanoutSender.sendMessage(map); } }
import *.*.*.websocket.config.ServerConfig; import *.*.*.websocket.entity.Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ExchangeConfiguration { private static Logger logger = LoggerFactory.getLogger(ExchangeConfiguration.class); @Autowired private ServerConfig serverConfig; @Bean public FanoutExchange fanoutExchange() { logger.info("【【【交换机实例创建成功】】】"); return new FanoutExchange(Constants.FANOUT_EXCHANGE_NAME); } @Bean public Queue queue() { logger.info("【【【队列实例创建成功】】】"); //动态名称 return new Queue(Constants.TEST_QUEUE1_NAME+"——"+serverConfig.getUrl()); } @Bean public Binding bingQueue1ToExchange() { logger.info("【【【绑定队列到交换机成功】】】"); return BindingBuilder.bind(queue()).to(fanoutExchange()); } }Constants常量
package *.*.*.websocket.entity; public class Constants { public static final String FANOUT_EXCHANGE_NAME = "fanout.exchange.name"; public static final String TEST_QUEUE1_NAME = "test.queue1.name"; }FanoutSender发布消息
package *.*.*.websocket.sender; import *.*.*.websocket.entity.Constants; import *.*.*.websocket.result.MessageResult; import net.sf.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Map; @Component public class FanoutSender { private static Logger logger = LoggerFactory.getLogger(FanoutSender.class); @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(MapFanoutReceiver订阅消息rabbitMessageList) { logger.info("【消息发送者】发送消息到fanout交换机,"); rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE_NAME, "", JSONObject.fromObject(rabbitMessageList).toString()); } }
package *.*.*.websocket.receiver; import *.*.*.websocket.result.MessageResult; import *.*.*.websocket.server.WebSocketServer; import com.alibaba.fastjson.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import javax.websocket.Session; import java.util.Map; import java.util.Set; @Component public class FanoutReceiver { private static Logger logger = LoggerFactory.getLogger(FanoutReceiver.class); @RabbitHandler @RabbitListener(queues = "#{queue.name}")//动态绑定 public void receiveMessage(String jsonObject) { Maphtml代码map = JSONObject.parseObject(jsonObject,Map.class); Set > entries = map.entrySet(); for (Map.Entry entry : entries) { logger.info("消息接收者接收到来自【队列一】的消息,消息内容: "+jsonObject); Map electricSocketMap = WebSocketServer.electricSocketMap; if(electricSocketMap.containsKey(entry.getKey())){ Object obj = map.get(entry.getKey()); MessageResult result = (MessageResult) JSONObject.parseObject(obj.toString(),MessageResult.class); System.out.println(entry.getKey()+"的值为"+entry.getValue()); WebSocketServer.sendMsg(electricSocketMap.get(entry.getKey()),result); } } } }
四、出现的问题及解决方案WebSocket hello socket【userId】:
【toUserId】:
【msgType】:
【msgInfo】:
【 *** 作】:
【 *** 作】:
1、websocketServer中无法注入WebSocketService的实例。
可以在主启动类中对sebsocketServer的静态变量websocketService进行手动赋值,把websocketService实例手动存放到容器中去,再在容器中根据类名进行引用即可。
主启动类:
import *.*.*.websocket.server.WebSocketServer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; @SpringBootApplication public class WebSocketApplication { public static void main(String[] args) { SpringApplication springApplication = new SpringApplication(WebSocketApplication.class); ConfigurableApplicationContext configurableApplicationContext = springApplication.run(args); //解决websocket无法注入问题 WebSocketServer.setApplicationContext(configurableApplicationContext); } }
2、fastJson转换VO对象失败
原因是fastJson转换后的对象与原对象的toString方法冲突,导致fastJson提示cantconvert错误,可以先把fastJson转换后的对象先转为Object类,再使用根据类强制转换的方法:JSONObjecet.parseObject(String text,Class clazz)进行转换。
public void receiveMessage(String jsonObject) { Mapmap = JSONObject.parseObject(jsonObject,Map.class); Set > entries = map.entrySet(); for (Map.Entry entry : entries) { logger.info("消息接收者接收到来自【队列一】的消息,消息内容: "+jsonObject); Map electricSocketMap = WebSocketServer.electricSocketMap; if(electricSocketMap.containsKey(entry.getKey())){ Object obj = map.get(entry.getKey()); MessageResult result = (MessageResult) JSONObject.parseObject(obj.toString(),MessageResult.class); System.out.println(entry.getKey()+"的值为"+entry.getValue()); WebSocketServer.sendMsg(electricSocketMap.get(entry.getKey()),result); } } }
以上就是简单集群通信的准备和实现,作为一个学习记录。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)