客户端通过WebSocket长连接与服务端实现消息订阅,并且根据连接参数订阅对应的消息!
2、服务端 2.1 核心依赖坐标我这里是使用的springboot启动器依赖
2.2 WebSocket工具类org.springframework.boot spring-boot-starter-websocket
主要实现客户端到服务端WebSocketSession的增删改查
import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.WebSocketSession; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @Slf4j public class WsSessionUtil { private static final ConcurrentMap2.3 WebSocket消息处理类SESSION_POOL = new ConcurrentHashMap<>(); public static void add(String key, WebSocketSession session) { // 添加 session SESSION_POOL.put(key, session); } public static WebSocketSession remove(String key) { // 删除 session return SESSION_POOL.remove(key); } public static void removeAndClose(String key) { WebSocketSession session = remove(key); if (session != null) { try { // 关闭连接 session.close(); } catch (IOException e) { log.error("websocket关闭失败。。。"); } } } public static WebSocketSession get(String key) { // 获得 session return SESSION_POOL.get(key); } public static ConcurrentMap get(){ return SESSION_POOL; } }
继承AbstractWebSocketHandler方法,实现连接创建,关闭,以及消息的发送和异常处理方法
import com.paratera.console.biz.utils.WsSessionUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.AbstractWebSocketHandler; @Component @Slf4j public class WarningWsHandler extends AbstractWebSocketHandler { @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { log.info("建立ws连接"); WsSessionUtil.add(session.getId(),session); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { log.info("发送文本消息"); // 获得客户端传来的消息 String payload = message.getPayload(); log.info("server 接收到消息 " + payload); session.sendMessage(new TextMessage(payload)); } @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { log.error("异常处理"); WsSessionUtil.removeAndClose(session.getId()); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { log.info("关闭ws连接"); WsSessionUtil.removeAndClose(session.getId()); } }2.4 WebSocket配置类
将上一步创建的handler注册到WebSocket服务,并且自定义过滤器,实现beforeHandshake方法,对参数进行校验
import com.paratera.console.biz.handler.WarningWsHandler; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import org.springframework.web.socket.server.HandshakeInterceptor; import java.util.Map; @Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Autowired private WarningWsHandler warningWsHandler; @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry .addHandler(warningWsHandler, "warningWs") //允许跨域,可以具体设置客户端IP .setAllowedOrigins("*") .addInterceptors(new MyHandshakeInterceptor()); } private static class MyHandshakeInterceptor implements HandshakeInterceptor { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map2.5 消息发送attributes) throws Exception { String collector = ((ServletServerHttpRequest) request).getServletRequest().getParameter("userid"); if (StringUtils.isEmpty(collector)) { return false; } else { attributes.put("userid", collector); return true; } } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { } } }
private void broadcastMsg(String text, String userId) { for (WebSocketSession session : WsSessionUtil.get().values()) { //如果是对应推送的用户,则推送 if (session.getAttributes().containsValue(userId)) { try { session.sendMessage(new TextMessage(text)); } catch (IOException e) { log.error("发送报警信息到客户端异常:" + e.getMessage()); } } } }3、客户端
通过浏览器内置WebSocket插件实现消息的订阅
Java WebSocket实现 Welcome
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)