webSocket实现推送
webSocket是什么? 需求说明websocket集成步骤
pom.xmlwebSocket实现
自定义处理类MyWebSocketHandLer配置webScoketConfig 仿朋友圈实现
实现自定义执行器(Execute)改造WebSocketconfig定义处理器依赖信息类(存储主要信息)定义消息存储类改造MyWebSocketHandLer
1. 建立成功事件afterConnectionEstablished方法改造2. 接收消息事件handleTextMessage方法改造2.断开连接时afterConnectionClosed方法改造 调用处理器(Execute)
webSocket实现推送参考文献:https://www.cnblogs.com/kiwifly/p/11729304.html
webSocket是什么?阅读文章前请先阅读文献,文献一共提供了4中java可进行webSocket链接的方式,每种方式都有较为简单的说明使用集成和优缺点介绍。本文所使用的为第二种Spring封装
原生注解Spring封装TIOSTOMP
WebSocket 协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。
它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。
通过上图可以观察到webSocket相对于http请求来说,只进行了两次请求交互。首先进行一次http请求来与服务器判断是否进行交互,成功以后再进行长连接。其连接成功后的状态码为101
其他特点包括:
(1)建立在 TCP 协议之上,服务器端的实现比较容易。
(2)与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
(3)数据格式比较轻量,性能开销小,通信高效。
(4)可以发送文本,也可以发送二进制数据。
(5)没有同源限制,客户端可以与任意服务器通信。
(6)协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。
需求说明websocket集成步骤项目要求需要实现一个类似微信朋友圈的功能,可以实现朋友圈评论、朋友圈、未读消息的实时推送。起初所使用的是http请求进行轮询获取数据,对于服务器是比较大的负担并且效果也不是很好,后调整为使用webSocket进行实时推送取代客户端自主的轮询 *** 作,由服务器主动发送请求
实现思路:每个webSocket在连接成功以后都会生成一个独立的Session(与http中的Session是不一样的,webSocket是通过调用Session的 sendMessage(),方法来进行推送消息的),并且每个Session都会携带独立的ID值。所有我们只需要把连接成功时的Session储存下来,在进行轮询推送判断该session对应的用户是否有新的朋友圈、朋友圈评论、未读消息。如果存在就把新的数据推送给前端,前端在把接受到的数据进行呈现即可。
pom.xml我的想法了每一个类型的webSocket连接会对应一个不同的执行器(Execute),可以通过访问的url来判断是不是同一个类型的webSocket,比如/test和/content其实本质上他们的url是不同的,通过url就可以区分开来了。每一个执行器会对应自身的实现类,通过继承同一个抽象类把各个执行器绑定与一个父类,我们只需要在父类定义公共参数和需要重写的执行方法就可以了做到一致性了。然后每个不同类型的执行器对应了一块不同的线程池进行解耦 *** 作。
webSocket实现 自定义处理类MyWebSocketHandLerorg.springframework.boot spring-boot-starter-websocket2.5.1
public class MyWebSocketHandLer extends AbstractWebSocketHandler { @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { System.out.println("握手成功后,建立连接时调用该方法"); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { System.out.println("当接受到客户端数据时会调用该方法"); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { System.out.println("当webSocket断开连接时,会进行该方法"); } }配置webScoketConfig
@Configuration //开启websocket @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { //配置信息 @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry w) { //配置路径 + 处理器 //w.addHandler(getMyWebSocketHandLet(), WebSocketConfig.pathExecuteMap.keySet().toArray(new String[0])) w.addHandler(getMyWebSocketHandLet(),"/webSocket") .addInterceptors(getMyInterceptor()) .setAllowedOrigins("*"); } //处理器 @Bean public WebSocketHandler getMyWebSocketHandLet(){ return new MyWebSocketHandLer(); } //拦截器 @Bean public HandshakeInterceptor getMyInterceptor(){ //自定义 //return new MyInterceptor(); //jar自带的实现了可以参考实现 HttpSessionHandshakeInterceptor httpSessionHandshakeInterceptor = new HttpSessionHandshakeInterceptor(); httpSessionHandshakeInterceptor.setCreateSession(true); return httpSessionHandshakeInterceptor; } }
通过实现 WebSocketConfigurer 类并覆盖相应的方法进行 websocket 的配置。我们主要覆盖 registerWebSocketHandlers 这个方法。通过向 WebSocketHandlerRegistry 设置不同参数来进行配置。其中**addHandler 方法添加我们上面的写的 ws 的 handler 处理类,第二个参数是你暴露出的 ws 路径。addInterceptors 添加我们写的握手过滤器。setAllowedOrigins("*")**这个是关闭跨域校验,方便本地调试,线上推荐打开。
HttpSessionHandshakeInterceptor类,是org.springframework.web.socket.server.support包下提供的一个webSocket拦截器,主要作用是把第一次握手http中Session所携带的参数都接入到webSocket的Session中,也可以做一些握手校验。但是我统一吧校验都放在handler中去了,所以这儿就不说明。
注:应不需要那么复杂的需求,所以使用的过滤器为jar实现的处理器进行处理,如果需要实现自定义的处理器需要继承HttpSessionHandshakeInterceptor
public class MyInterceptor extends HttpSessionHandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Mapmap) throws Exception { System.out.println("握手前"); return true; } @Override public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) { System.out.println("握手完成"); } }
配置到这儿webSocket基本上就进行连接了,可以在网上找一个webSocket简单测试连接工具,进行调适O(∩_∩)O~~
仿朋友圈实现 实现自定义执行器(Execute)- 定义执行器接口,保证每个实现该执行器的类都需要重写该主要方法
//处理器接口 public interface ExecuteTask{ //处理方法 Runnable sendResult(); }
- 实现执行器的抽象类管理,定义其公共参数,并且统一提供构造方法,取消无参构造保证参数能有对应的数据信息
//统一定义实现类的变量与构造方法 public abstract static class DefaultExecuteTask implements ExecuteTask{ //存储信息类 public MyWebSocketHandLer.HandLerDepend hDepends; public DefaultExecuteTask(MyWebSocketHandLer.HandLerDepend hDepends) { this.hDepends = hDepends; } public MyWebSocketHandLer.HandLerDepend gethDepends() { return hDepends; } public void sethDepends(MyWebSocketHandLer.HandLerDepend hDepends) { this.hDepends = hDepends; } }
- 调整参数字段后续会说明使用
//修饰执行器实现类中的参数信息 @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.FIELD) public @interface ExecuteMainField{ //主要参数 public boolean isMain() default false; //是否重置为默认值 放置 isMain改变后无法重置,所以不携带该参数时,直接使用默认值 public boolean useDefault() default false; //存储的默认值useDefault为true会使用 public String useDefaultValue() default ""; }
- 实现自己的执行器类
@Getter @Setter @Slf4j(topic = "ThreadScheduleTask.DeptCircleExecuteTask") @ToString public static class DeptCircleExecuteTask extends DefaultExecuteTask{ //查询的班级圈 circleId > 0,number失效 @ExecuteMainField(isMain = false) private volatile long circleId = 0; //查询数量 @ExecuteMainField(isMain = true) private volatile int number = -1; //判断是否只查询自己数据 @ExecuteMainField(isMain = true, useDefault = true, useDefaultValue = "false") private volatile boolean mySelf = false; //执行bean类 private final SysDeptCircleServiceImpl deptCircleService = SpringUtils.getBean(SysDeptCircleServiceImpl.class); //返回值模板 private final MapresultTemplate = new HashMap (){{ put("isNewRemark",null); //评语 put("isData", null); //朋友圈数据 }}; public DeptCircleExecuteTask(MyWebSocketHandLer.HandLerDepend hDepends) { super(hDepends); } @Override public Runnable sendResult() { return new Runnable() { @Override public void run() { //实现自身的业务逻辑 } }; } }
到目前为止我们的执行器体系也算是完整起来了,后续迭代的话我们只需要不停的去继承DefaultExecuteTask类并且重写其中的sendResult(),方法就可以了,那么这时候就有新的问题了,虽然我们的执行器实现了,那怎么和我们的url关联起来呢?关联起来以后我们怎么样去调用我们的执行器呢?接下来就该解决这个问题了。
关于线程池配置我也放在了下面因为线程池,但是感觉线程池的使用还有有问题的。
线程池
//自定义内部线程池 //使用jdk自带线程组管理 只执行wss任务调度 @Getter private final static class MyThreadPoolExecutor extends ThreadPoolExecutor { //线程大小 private static final int corePoolSize = 10; //最大线程大小 private static final int maximumPoolSize = 10; // 线程池维护线程所允许的空闲时间 秒单位 private static final long keepAliveTime = 300; public static MyThreadPoolExecutor getThreadPool(){ MyThreadPoolExecutor executor = new MyThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new linkedBlockingQueue<>(50)); // 线程池对拒绝任务(无线程可用)的处理策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; } //corePoolSize + maximumPoolSize = 最终大小 private MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue改造WebSocketconfigworkQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } }
我们把url和每个对应的执行器关联起来,保证一个url对应一个执行器。最终还是通过键值对把url可类型关联起来。key为对应url,value是我们处理器的class类,别忘记我们其实在抽象类中是有定义一个基础构造方法的,所以我们只需要通过反射去实现抽象类中的构造方法就可以获取到对应的执行类信息了。
@Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { //定义url对应的处理器 //配置路径统一/webSocket 开始 public final static Map定义处理器依赖信息类(存储主要信息)> pathExecuteMap = new HashMap >(){{ this.put("/webSocket",ThreadScheduleTask.DeptCircleExecuteTask.class); }}; //配置信息 @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry w) { //配置路径 + 处理器 w.addHandler(getMyWebSocketHandLet(), WebSocketConfig.pathExecuteMap.keySet().toArray(new String[0])) .addInterceptors(getMyInterceptor()) .setAllowedOrigins("*"); } ..... }
实现自定义的处理器依赖,用来存储登录用户、以及Session的各种信息、配置等
@Getter @Setter public static class HandLerDepend{ //认证token @NonNull private String authorizationToken; //用户类 @NonNull private SysUser user; //存储的session类 @NonNull private WebSocketSession session; //wsSession配置类 //其中有wsSession的主要信息配置类 @NonNull private WsSession wsSession; //异常状态 private CloseReason reason; //反馈的结果集类 private HandLerDataProvide handLerDataProvide; //记录连接的活跃时间 @NonNull private long sessionActiveTime; //任务执行器 private ThreadScheduleTask.ExecuteTask executeTask; public HandLerDepend(String authorizationToken, SysUser user, WebSocketSession session) { this.authorizationToken = authorizationToken; this.user = user; this.session = session; //获取信息类 this.setWsSession(); //配置生命周期半个小时 this.setMaxIdleTimeout(MyWebSocketHandLer.MAX_IDLE_TIMEOUT); //初始化参数类 this.setHandLerDataProvide(null); this.renewalSessionActiveTime(); } //构造异常信息类 public HandLerDepend(CloseReason reason) { this.reason = reason; this.sessionActiveTime = -1; } //定义异常信息返回异常集合 public CloseStatus getCloseStatus(CloseStatus oldStatus){ if (this.reason == null){ return oldStatus; } return new CloseStatus(this.reason.getCloseCode().getCode(),this.reason.getReasonPhrase()); } //设置信息类 public void setWsSession(){ //获取信息类 StandardWebSocketSession socketSession = (StandardWebSocketSession) this.session; this.wsSession = (WsSession)socketSession.getNativeSession(); } //连接懒惰的时间,自动下发会刷新时间不可用于判断连接健康状态 public void setMaxIdleTimeout(long MaxIdleTimeout){ this.wsSession.setMaxIdleTimeout(MaxIdleTimeout); } //初始化参数类 public void setHandLerDataProvide(HandLerDataProvide hdp){ this.handLerDataProvide = hdp == null ? new HandLerDataProvide() : hdp; } //刷新有效时间 public long renewalSessionActiveTime(){ this.sessionActiveTime = System.currentTimeMillis(); return this.sessionActiveTime; } //判断连接是否健康 public boolean checkActive(long maxActiveTimeout){ return System.currentTimeMillis() > this.sessionActiveTime + maxActiveTimeout; } }
定义消息存储类WsSession是webSocket连接的核心,代表了当前这个webSocket的状态是否健康。以及我们消息的发送,消息接受。其实最后本质上都是调用了WsSession这个类去进行处理,有兴趣的小伙伴可以看看源码详细了解一下喔。。。。
统一管理返回值信息,不管我们获取还是给客户端发送统一通过这个消息类就可以了
@AllArgsConstructor @NoArgsConstructor public static class HandLerDataProvide{ //自定义的返回结果集 private volatile Object result = null; public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } //设置并发送 public void setResult(Object result,MyWebSocketHandLer.HandLerDepend hDepends) { if (result == null){ log.error("result is null"); return; } this.result = result; try { final String resultStr = JSON.toJSONString(result); hDepends.getSession().sendMessage(new TextMessage(resultStr)); log.info("发送成功 result = [{}]",resultStr); } catch (IOException e) { log.error("发送失败 e = [{}]",e.getMessage()); } } }
改造MyWebSocketHandLer这个类我本来是想用来存储消息以及管理参数的,所以我其实还定义了一个map
这是我处理连接的主要类,所以修改的部分很多,我把我所定义的公共信息和私有方法都定义到先统一方法这个标题下面,每个方法再各自详细说明
@Slf4j(topic = "MyWebSocketHandLer") public class MyWebSocketHandLer extends AbstractWebSocketHandler { //获取登录类信息 private final TokenService tokenService = SpringUtils.getBean(TokenService.class); //生命周期 public final static long MAX_IDLE_TIMEOUT = 5 * 60 * 1000; //key session——id value 参数类 public static final Map1. 建立成功事件afterConnectionEstablished方法改造handLerDepends = new ConcurrentHashMap<>(10); //获取handLerDepends对应的key值 public static String getSessionKey(WebSocketSession session){ StandardWebSocketSession socketSession = (StandardWebSocketSession) session; final WsSession wsSession = (WsSession)socketSession.getNativeSession(); return wsSession.getRequestURI().getPath().replace("/",".") + "_" + session.getId(); } //通过反射获取url对应的处理器 private ThreadScheduleTask.ExecuteTask getExecuteTask(MyWebSocketHandLer.HandLerDepend depend){ final String path = depend.getWsSession().getRequestURI().getPath(); if (!WebSocketConfig.pathExecuteMap.containsKey(path)){ return null; } Class extends ThreadScheduleTask.DefaultExecuteTask> aClass = WebSocketConfig.pathExecuteMap.get(path); try { Constructor extends ThreadScheduleTask.DefaultExecuteTask> constructor = aClass.getConstructor(MyWebSocketHandLer.HandLerDepend.class); return constructor.newInstance(depend); } catch (ReflectiveOperationException e) { log.error("创建处理构造器失败 = [{}]",e.getMessage()); } return null; } }
实现思路:当我们在建立连接时,我们可以通过先获取webSocket发起的请求url,并且和我们之前在webSocketConfing存储的class匹配起来,最后通过反射,就可以创建一个对应的实例了。在把创建的实例放在我们的消息存储类中,方便后期调用。
@Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { //获取当前的认证类 //这段代码主要是检验登录用户,并且通过Redis获取到用户信息,根据各自业务逻辑实现即可 ....... //存储连接信息 HandLerDepend depend = null; //添加类 final String key = getSessionKey(session); try { Ast.isNull("param authorization is null",authorization); //登录用户信息 LoginUser loginUser = tokenService.getLoginUser(authorization.toString()); Ast.isNull("authorization not find user message",loginUser); //处理器依赖信息类 depend = new HandLerDepend(authorization.toString(), loginUser.getUser(), session); //设置任务执行器 //通过url进行设置,把通过反射获取处理器存储起来 final ThreadScheduleTask.ExecuteTask executeTask = this.getExecuteTask(depend); Ast.isNull("执行器不可为null",executeTask); depend.setExecuteTask(executeTask); log.info("建立成功事件 key = [{}], handLerDepends.Size = [{}], ExecuteTask = [{}]",key,MyWebSocketHandLer.handLerDepends.size(),depend.getExecuteTask().getClass().getName()); }catch (Exception e){ //这里catch的原因是把错误信息统一反馈出来,如果使用webSocket其实对于前端和后期维护其实不很友好 depend = new HandLerDepend(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION,e.getMessage())); throw new RuntimeException(e.getMessage()); }finally { MyWebSocketHandLer.handLerDepends.put(key, depend); } }2. 接收消息事件handleTextMessage方法改造
实现思路:因为是接受消息事件,那其实这时候我们的所有准备工作已经所需要的数据都是已经构造完成的了。我们通过webSocketSession获取到对应的key,再去获取消息存储类中的处理器对象。通过反射我们就可以控制这个处理器中我们所需要的参数,比如我们分页page是10条。我们在处理器中定义了一个int page = 10; 这时候前端需要20条了,只需要发送{“page”:20},我们就可以吧page覆盖为20了。
@Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { final String key = getSessionKey(session); if (!MyWebSocketHandLer.handLerDepends.containsKey(key)){ log.error("key is not mapping = [{}]",key); return; } final HandLerDepend hDepend = MyWebSocketHandLer.handLerDepends.get(key); //刷新有效时间 long sessionActiveTime = hDepend.renewalSessionActiveTime(); String mesStr = new String(message.asBytes()); log.info("handle wss sessionKey = [{}], SessionActiveTime = [{}], data = [{}]",key,sessionActiveTime,mesStr); Ast.isNull("服务端接收消息不可为null",mesStr); //参数添加每个session的参数类中 JSONObject map = null; try { //通过反射设置参数信息 map = JSONObject.parseObject(message.getPayload()); final Class extends ThreadScheduleTask.ExecuteTask> aClass = hDepend.getExecuteTask().getClass(); Field[] fields = hDepend.getExecuteTask().getClass().getDeclaredFields(); for (Field field : fields) { ThreadScheduleTask.ExecuteMainField mainField = field.getAnnotation(ThreadScheduleTask.ExecuteMainField.class); if (mainField == null || !mainField.isMain()){ continue; } //赋值参数 field.setAccessible(true); final String nameField = field.getName(); final boolean containsKey = map.containsKey(nameField); final String setMethodName = getMethodName(nameField); if (mainField.useDefault() && !containsKey){ if (StringUtils.isEmpty(mainField.useDefaultValue())){ throw new RuntimeException("未定义需要使用的默认值 = [" + nameField + "]"); } aClass.getMethod(setMethodName,field.getType()).invoke(hDepend.getExecuteTask(), TypeUtils.castToJavaBean(mainField.useDefaultValue(),field.getType())); } if (containsKey){ aClass.getMethod(setMethodName,field.getType()).invoke(hDepend.getExecuteTask(),map.getObject(nameField,field.getType())); } } log.info("当前执行器对象 ExecuteTask = [{}]", hDepend.getExecuteTask()); }catch (Exception e){ log.error("执行器参数异常 = [{}]",e.getMessage()); } }2.断开连接时afterConnectionClosed方法改造
这个就比较简单了就是把我们在通信时候的错误信息打印出来就好了
@Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { final String key = getSessionKey(session); if (MyWebSocketHandLer.handLerDepends.containsKey(key)){ HandLerDepend delHd = MyWebSocketHandLer.handLerDepends.get(key); status = delHd.getCloseStatus(status); MyWebSocketHandLer.handLerDepends.remove(key,delHd); }else if (status.getCode() == 1011 && status.getReason() == null){ //初始话时校验失败,提供返回结果给前端 } log.info("断开连接 sessionId = [{}], size = [{}], status = [{}]",key,MyWebSocketHandLer.handLerDepends.size(),status.toString()); }
到此为止我们已经把我们的处理器类都改造完成了,接下来就是我们调用处理器了,我提供一个我调用实现的方法,可能不是最好的。但是可以直接使用喔
调用处理器(Execute)实现思路:通过实现CommandLineRunner类(CommandLineRunner 不同的可以先百度一下,大概意思就是在springboot加载的时候注入自己需要执行的任务方法),中的run()方法,然后我的想法是,我们定义一个单线程去执行我们的执行器轮询调度任务,主要也是因为我不太清楚CommandLineRunner是否会是单独开辟一个线程执行还是怎么样,所以我自己定义一个是最保险的,就算是出问题了其实也不会影响主线程的任务。
@Slf4j(topic = "ThreadScheduleTask") @Component public class ThreadScheduleTask implements CommandLineRunner { //每个url对应一片线程池。 private static final Map,MyThreadPoolExecutor> THREAD_POOL_EXECUTOR_MAP = new ConcurrentHashMap ,MyThreadPoolExecutor>(){{ this.put(DeptCircleExecuteTask.class,MyThreadPoolExecutor.getThreadPool()); }}; @Override public void run(String... args) throws Exception { handleMessage(); } public void handleMessage(){ Executors.newSingleThreadExecutor().execute(new Runnable() { @SneakyThrows @Override public void run() { while (true){ Thread.sleep(15 * 1000); log.info("开始处理webSocket事件"); MyWebSocketHandLer.handLerDepends.forEach( (k,hDepends) -> { //过滤非健康的请求数据 与 超时数据 if (hDepends.getSessionActiveTime() >= 0 && MyWebSocketHandLer.checkActive(k)){ //执行任务器 // myThreadPoolExecutor.execute(hDepends.getExecuteTask().sendResult()); THREAD_POOL_EXECUTOR_MAP.get(hDepends.getExecuteTask().getClass()).execute(hDepends.getExecuteTask().sendResult()); } }); } } }); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)