心跳机制每隔一段时间进行一次连接关闭,连接重连保证websocket实现长连接不断开。
我这里事件设置1小时,js代码如下
//js代码
var pathTotal="102020"
var url="ws://172.16.28.250:8088/websocket/"+pathTotal
var socket;
var lockReconnect = false; //避免socket重复连接
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {//判断当前设备是否支持websocket
socket = new WebSocket(url);
socket.onclose = function (event) {
reconnect(url); //尝试重新连接
console.log(event);
console.log('连接关闭')
};
socket.onerror = function () {
reconnect(url); //尝试重新连接
console.log(event);
console.log('连接错误')
};
socket.onopen = function (event) {
heartCheck.reset().start(); //心跳检测重置
console.log(event);
console.log('连接开启')
};
socket.onmessage = function (event) {
heartCheck.reset().start(); //心跳检测重置
console.log(event);
console.log('消息接收到了,只要有接收到消息,连接都是正常的')
if (event.data != 'Msg') {
let data = JSON.parse(event.data);
}
setMessageInnerHTML2(event.data,suffix)
};
} else {
alert.log("你的浏览器不支持WebSocket!");
}
// // 监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function () {
socket.close();
}
// 重新连接
function reconnect(url) {
console.log("重新连接")
if (lockReconnect) return;
lockReconnect = true;
setTimeout(function () { //没连接上会一直重连,设置延迟避免请求过多
socket = new WebSocket(url);
lockReconnect = false;
}, 2000);
}
//心跳检测
var heartCheck = {
timeout: 55000*60, //1分钟发一次心跳,时间设置小一点较好(50-60秒之间)
timeoutObj: null,
serverTimeoutObj: null,
reset: function () {
clearInterval(this.timeoutObj);
clearTimeout(this.serverTimeoutObj);
return this;
},
start: function () {
var self = this;
this.timeoutObj = setInterval(function () {
console.log("重新连接")
//这里发送一个心跳,后端收到后,返回一个心跳消息,
//onmessage拿到返回的心跳就说明连接正常
socket.send("Msg");
self.serverTimeoutObj = setTimeout(function () {//如果超过一定时间还没重置,说明后端主动断开了
socket.close(); //如果onclose会执行reconnect,我们执行socket.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
}, self.timeout)
}, this.timeout)
}
}
分发策略,消息接受使用rabbitmq通过no来判断位置信息,分发到各个大屏
/**
* 连接建立成功调用的方法
*
* @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
@OnOpen
public void onOpen(@PathParam(value = "no") String no,Session session) {
this.session = session;
sessions.add(session);
webSocketSet.remove(no);
webSocketSet.put(no,this); //加入set中
addOnlineCount(); //在线数加1
log.info("有新连接加入!当前在线人数为" + getOnlineCount());
String QUEUE_NAME = "GREATHIIT_FACE_TERMINAL";
// String QUEUE_NAME = "closedPlace";
try {
//创建连接连接到MabbitMQ
ConnectionFactory factory = new ConnectionFactory();
//设置MabbitMQ所在主机ip或者主机名
factory.setHost();
factory.setPort();
factory.setUsername();
factory.setPassword();
//创建一个连接
Connection connection = factory.newConnection();
//创建一个频道
Channel channel = connection.createChannel();
//指定一个队列
/*
durable:true、false.true:在服务器重启时,能够存活
exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
autodelete:当没有任何消费者使用时,自动删除该队列。this means that the queue will be deleted when there are no more processes consuming messages from it.
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//每次从队列获取的数量,保证一次只分发一个
channel.basicQos(1);
//监听队列
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//处理监听得到的消息
String message = null;
try {
message = new String(body, "UTF-8");
// if(){} 业务逻辑判断
//将字符串转换成MsgDa对象
MsgDa msgDa = GsonJsonUtil.stringToObject(message, MsgDa.class);
LaboratoryService laboratoryService = (LaboratoryService) SpringContextUtil.getBean("laboratoryService");
ConcurrentHashMap<String, Object> deviceTerminalType = laboratoryService.getDataLaboratorBymsg(msgDa);
if(deviceTerminalType.size()>0){
String key =(String) deviceTerminalType.get("key");
String value = (String) deviceTerminalType.get("value");
if(webSocketSet.get(key)!=null){
synchronized (key){
int length = value.length();
if(length>80&&session.isOpen()){
sendMessage(key,value);
}
}
}
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
channel.abort();
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
//autoAck是否自动回复,如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出,
// 那么就无法获取数据,我们当然是不希望出现这样的情况,所以才去手动回复,每当消费者收到并处理信息然后在通知
// 生成者。最后从队列中删除这条信息。如果消费者异常退出,如果还有其他消费者,那么就会把队列中的消息发送给其
// 他消费者,如果没有,等消费者启动时候再次发送。
boolean autoAck = false;
//消息消费完成确认
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
sessions.remove(session);
subOnlineCount(); //在线数减1
log.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
* @param session 可选的参数
*/
// @OnMessage
// public void onMessage(String message, Session session) {
// log.info("来自客户端的消息:" + message);
// //群发消息
// for (WebSocketServer item : webSocketSet) {
// try {
// item.sendMessage(message);
// } catch (IOException e) {
// e.printStackTrace();
// continue;
// }
// }
// }
/**
* 发生错误时调用
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误");
error.printStackTrace();
}
/**
* 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。
*
* @param message
* @throws IOException
*/
public void sendMessage(String no, String message) throws IOException {
//阻塞式的(同步的)
// if (sessions.size() != 0) {
// for (Session s : sessions) {
// if (s != null) {
// s.getBasicRemote().sendText(message);
// }
// }
// }
webSocketSet.get(no).sendMessage(message);
//非阻塞式的(异步的)
// this.session.getAsyncRemote().sendText(message);
// log.info("[x] 推送消息"+message);
}
private void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
消息处理分发
首先将人脸识别终端发过来的数据进行JSON解析成msgDa对象通过对象中设备id判断设备所在位置通过第一次初始化设定的规则发送到不同楼层的大屏初始化补充,通过key value形式存入redis
List<Terminal> list = terminalService.findTerminal(new Terminal().setTerminalCategory("face"));
for (Terminal terminal : list) {
map.put(terminal.getTerminalSerialNumber(),terminal);
}
String msgDaJSON = "";
String msgDaJSON2 = "";
Terminal terminal = map.get(msgDa.getDevice_id());
String key = terminal.getTerminalPlace();
if (msgDa != null) {
msgDaJSON = JSON.toJSONString(msgDa);
msgDa.setImageFile("");
msgDaJSON2 = JSON.toJSONString(msgDa);
int length = msgDaJSON2.length();
if(length>80){
t5(key.substring(0, 6)+".data", "\n" + msgDaJSON2);
}
}
ConcurrentHashMap<String, Object> concurrentHashMap = new ConcurrentHashMap<>();
Object str = redisUtil.get(key.substring(0, 6));
String[] split = str.toString().split(",");
// String[] splita = ;
for (String o : split) {
if(o.equals(key)){
concurrentHashMap.put("key", key.substring(0,6));
concurrentHashMap.put("value", msgDaJSON);
}
}
return concurrentHashMap;
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)