Android 实现WebSocket长连接

Android 实现WebSocket长连接,第1张

最近项目中引入了实时接收服务器数据的功能,考量后通过WebSocket长链接来实现。

接下来了解一下webSocket 的特点:
1、建立在 TCP 协议之上,服务器端的实现比较容易。
2、与 HTTP 协议有着良好的兼容性。默认端口也是80和443,并且握手阶段采用 HTTP 协议,因此握手时不容易屏蔽,能通过各种 HTTP 代理服务器。
3、支持双向通信,实时性更强。
4、数据格式比较轻量,性能开销小,通信高效。
5、可以发送文本,也可以发送二进制数据。
6、没有同源限制,客户端可以与任意服务器通信。
7、协议标识符是ws(如果加密,则为wss),服务器网址就是 URL。

在我的项目计划中是通过Okhttp3实现socket长连接,先看一下效果:
依次展示的效果图为开启长链接------关闭长链接-------重连长链接

接下来看一下具体的代码
1、长链接初始化,创建Service服务,通过bindService启动Service服务

/**
     * 获取单例,非appContext,要先init
     */
    public static ImWebSocketBackgroundService getInstance(Context context) {
        if (instance == null) {
            synchronized (ImWebSocketBackgroundService.class) {
                if (instance == null) {
                    instance = new ImWebSocketBackgroundService(context);
                }
            }
        }
        return instance;
    }

    private ImWebSocketBackgroundService(Context context) {
        this.context = context;
    }

    private void onCreate() {
        if(webSocketServiceManager == null){
            webSocketServiceManager = new WebSocketServiceManager(context, this);
            webSocketServiceManager.bindService();
            sendGeneration = 0;
        }
    }
public class MyWebSocketService extends Service implements WebSocketResultListener {

    private WebSocketThread webSocketThread;
    private IWebsocketResponseDispatcher responseDispatcher;
    private SocketResultListenerStorage socketResultListenerStorage = new SocketResultListenerStorage();

    @Override
    public void onCreate() {
        super.onCreate();
        webSocketThread = new WebSocketThread();
        webSocketThread.setWebSocketResultListener(this);
        webSocketThread.start();
        responseDispatcher = new WebsocketResponseDispatcher();
    }

    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        return new WebSocketBinder();
    }

    @Override
    public void onDestroy() {
        super.onDestroy();
        if (webSocketThread.getHandler() != null)
            webSocketThread.getHandler().sendEmptyMessage(MessageType.QUIT);
    }

    public boolean sendText(String text) {
        if (webSocketThread.getHandler() != null) {
            Message message = Message.obtain();
            message.obj = text;
            message.what = MessageType.SEND;
            return webSocketThread.getHandler().sendMessage(message);
        }
        return false;
    }
}

在WebSocketServiceManager中创建ServiceConnection类型实例,并重写onServiceConnected()方法和onServiceDisconnected()方法。
当执行到onServiceConnected回调时通过IBinder实例得到Service,实现client与Service的连接
onServiceDisconnected回调被执行时,表示client与Service断开连接

public WebSocketServiceManager(Context context, WebSocketResultListener socketResultListener) {
        this.context = context;
        this.socketResultListener = socketResultListener;
    }

    private ServiceConnection serviceConnection = new ServiceConnection() {
        @Override
        public void onServiceConnected(ComponentName name, IBinder service) {
            Log.e(WebSocketThread.TAG, "WebSocketService 已经连接");
            serviceBindSucc = true;
            binding = false;
            bindTime = 0;   //连接成功后归零
            myWebSocketService = ((MyWebSocketService.WebSocketBinder)service).getWebSocketService();
            myWebSocketService.addSocketListener(socketResultListener);
            if(myWebSocketService.isSendConnected()) {
                if (socketResultListener != null) {
                    socketResultListener.connection();
                }
            }
        }

        @Override
        public void onServiceDisconnected(ComponentName name) {     //service异常被关闭 回调该方法
            binding = false;
            serviceBindSucc = false;
            if (bindTime<5 && !binding){
                Log.e(WebSocketThread.TAG, String.format("WebSocketService 连接断开,开始第%s次重连", bindTime));
                bindService();
            }
        }
    };

    //绑定服务
    public void bindService(){
        serviceBindSucc = false;
        binding = true;
        Intent intent = new Intent(context, MyWebSocketService.class);
        context.bindService(intent,serviceConnection, Context.BIND_AUTO_CREATE);
        bindTime++;
    }

2、在service创建时初始化Thread线程,通过Handler发送消息建立连接
   2.1 配置OkHttpClient
其中head中需要的参数依照大家项目中需要参数随时改变

private void initHeader() {
        headerMap.put("sourse", "android");
        headerMap.put("appVersion", Constants.VERSION_CODE);
        headerMap.put("token", Constants.TOKEN);
        headerMap.put("devCode",Constants.DevCode);
    }
OkHttpClient mClient = new OkHttpClient.Builder()
        .readTimeout(3, TimeUnit.SECONDS)//设置读取超时时间
        .writeTimeout(3, TimeUnit.SECONDS)//设置写的超时时间
        .connectTimeout(3, TimeUnit.SECONDS)//设置连接超时时间
        .build();

 2.2 使用Url,构建WebSocket请求
WEB_SOCKET_URL = "ws://47.74.235.101:8190/ws-community-im-websocket";
WEB_SOCKET_URL 依照大家项目中具体路径改变

private void initRequest() {
        Request.Builder requestBuilder = new Request.Builder().url(WEB_SOCKET_URL);
        if(headerMap == null){
            initHeader();
        }
        if (headerMap != null && !headerMap.isEmpty()) {
            Set strings = headerMap.keySet();
            StringBuffer stringBuffer = new StringBuffer();
            for (String string : strings) {
                stringBuffer.append(string).append(" ").append(headerMap.get(string)).append("\t");
            }
            Log.e("web", stringBuffer.toString());
            requestBuilder.headers(Headers.of(headerMap));
        }
        request = requestBuilder.build();
    }

2.3 发起连接,配置回调。

onOpen(),连接成功onMessage(String text),收到字符串类型的消息,一般我们都是使用这个onMessage(ByteString bytes),收到字节数组类型消息,我这里没有用到onClosed(),连接关闭onFailure(),连接失败,一般都是在这里发起重连 *** 作
//开始连接
WebSocket websocket = mClient.newWebSocket(request, new WebSocketListener() {
    @Override
    public void onOpen(WebSocket webSocket, Response response) {
        super.onOpen(webSocket, response);
        //连接成功...
    }

    @Override
    public void onMessage(WebSocket webSocket, String text) {
        super.onMessage(webSocket, text);
        //收到消息...(一般是这里处理json)
    }

    @Override
    public void onMessage(WebSocket webSocket, ByteString bytes) {
        super.onMessage(webSocket, bytes);
        //收到消息...(一般很少这种消息)
    }

    @Override
    public void onClosed(WebSocket webSocket, int code, String reason) {
        super.onClosed(webSocket, code, reason);
        //连接关闭...
    }

    @Override
    public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
        super.onFailure(webSocket, throwable, response);
        //连接失败...
    }
});

2.4 使用WebSocket对象发送消息,msg为消息内容,send方法会马上返回发送结果

//发送消息
webSocket.send(msg);


3、通过第2步连接成功后启动心跳,每隔10秒钟发起一次心跳,在onMessageReceive中接收到后台返回的json信息,在本项目中通过后台返回的code值判断此时时在线收到消息还是断开连接,大家依照各自项目改变

SEND_MESSAGE = 601;  //在线接收消息
private static final int DISCONNECT_MESSAGE = 602; //断开连接
   //开始心跳
    public void startHeartBeat() {
        Log.e(WebSocketThread.TAG, "启动心跳");
        runHeartBeat = true;
        if (scheduler == null) {
            scheduler = Executors.newScheduledThreadPool(1);
            scheduler.schedule(this, getDelay(), TimeUnit.SECONDS);
        }
    }
    @Override
    public void onMessageReceive(String jsonText) {
        try {
            JSONObject jsonObject = new JSONObject(jsonText);
            Log.i(WebSocketThread.TAG,jsonObject+"");

            if(jsonObject.has("msgType") && Integer.valueOf(jsonObject.getInt("msgType")) != null){
                int msgType = jsonObject.getInt("msgType");
                if (msgType == SEND_MESSAGE) {     //发送消息
                    String msg = jsonObject.getString("msg");
                    Log.d(WebSocketThread.TAG, msg);
                } else if (msgType == DISCONNECT_MESSAGE) {     //断开连接
                    stopHeartBeat();
                }
            }
        } catch (JSONException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void connection() {
        Log.e(WebSocketThread.TAG, "连接成功");
        startHeartBeat();
    }

    @Override
    public void run() {
        if (runHeartBeat) {
            if (sendText(defineText())) {
                sendGeneration++;
                Log.d(WebSocketThread.TAG,"发送心跳成功"+sendGeneration);
            }
            scheduler.schedule(this, getDelay(), TimeUnit.SECONDS);
        }
    }
    private int getDelay() {
        return 10;
    }

4、重连机制----处理切换网络,被挤掉线等情况
在第3步中提到的onFailure()方法中进行重连,连接成功后重新接收到后台返回的json信息

public class ReconnectWebSocketManager {

    private WebSocketThread webSocketThread;
    private volatile boolean retrying;      //正在重新连接
    private volatile boolean destroyed ;
    private final ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();       //单线程

    public ReconnectWebSocketManager(WebSocketThread webSocketThread) {
        this.webSocketThread = webSocketThread;
        this.retrying = false;
        this.destroyed = false;
    }

    synchronized void performReconnect(){
        retrying = false;
        retry();
    }

    private synchronized void retry() {
        if (!retrying){
            retrying = true;
            synchronized (singleThreadPool){
                singleThreadPool.execute(new ReconnectRunnable());
            }
        }
    }

    //销毁
    public void destroy(){
        destroyed = true;
        if (singleThreadPool !=null)
            singleThreadPool.shutdownNow();
        webSocketThread = null;
    }

    public class ReconnectRunnable implements Runnable {
        public void run() {
            retrying = true;
            for (int i = 0;i <20 ; i++){
                if (destroyed){
                    retrying = false;
                    return;
                }
                Handler handler = webSocketThread.getHandler();
                if (handler !=null){
                    if (webSocketThread.getConnectStatus() ==2) //已连接
                        break;
                    if (webSocketThread.getConnectStatus() ==1) //正在连接
                        continue;
                    if (webSocketThread.getConnectStatus() ==0)
                        handler.sendEmptyMessage(MessageType.CONNECT);
                }else {
                    break;
                }
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                retrying = false;
            }
        }
    }
}

到此长链接主要代码已经讲完了,别忘了在清单文件中注册service,添加网络权限



在build.gradle中添加okhttp3依赖

implementation 'com.squareup.okhttp3:okhttp:3.10.0'

希望能对于长链接需求的同学帮助,欢迎讨论,附上项目的代码地址demo
 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/992542.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存