目的是为了实时监控室外广告屏的亮度,界面,声音,开关机等等…… 因为室外的网络情况是随时可变的,所以采用的MQTT协议,作为Android客户端来说因为用MQTT发送消息太繁琐,我们采用的是客户端只接收命令,然后用Http进行数据反馈,这个项目近期也做完了,故记录一下。
第一步:导入在线库
// mqtt 包导入 implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
第二步:声明广播(这个是在线库自带的 直接这么写就行,不用自己写这个service)
第三步:MqttManager的全部代码
public class MqttManager { public final static String TAG = MqttManager.class.getSimpleName(); @SuppressLint("StaticFieldLeak") private static volatile MqttManager mInstance = null; private MqttCallback mCallback; public MqttClient client; private MqttConnectOptions conOpt; private Context context; public String[] topic; private MqttManager(Context context) { mCallback = new MqttCallbackBus(context); this.context = context; } public static MqttManager getInstance(Context context) { if (mInstance == null) { synchronized (MqttManager.class) { if (mInstance == null) { mInstance = new MqttManager(context); } } } return mInstance; } /** * 释放单例, 及其所引用的资源 */ public static void release() { try { if (mInstance != null) { mInstance.disConnect(); mInstance = null; } } catch (Exception e) { e.printStackTrace(); } } /** * 创建Mqtt 连接 * * @param brokerUrl Mqtt服务器地址(tcp://xxxx:1863) * @param userName 用户名 * @param password 密码 * @return */ public boolean createConnect(String brokerUrl, String userName, String password) { topic = new String[]{PUBLISH_TOPIC}; String deviceId = Utils.getDeviceNo(); if (client != null && client.isConnected()) { return true; } boolean flag = false; try { conOpt = new MqttConnectOptions(); conOpt.setCleanSession(true); //不接收离线期间的消息 每次都是重新登陆 conOpt.setAutomaticReconnect(true); //自动重连 if (!TextUtils.isEmpty(password)) { conOpt.setPassword(password.toCharArray()); } if (!TextUtils.isEmpty(userName)) { conOpt.setUserName(userName); } client = new MqttClient(brokerUrl, deviceId, new MemoryPersistence()); client.setCallback(mCallback); flag = doConnect(); client.subscribe(topic); } catch (MqttException e) { e.printStackTrace(); //重连 String msg = "exception_reconnect"; EventBus.getDefault().post(msg); } return flag; } /** * 建立连接 * * @return */ private boolean doConnect() { boolean flag = false; if (client != null) { try { client.connect(conOpt); flag = true; } catch (Exception e) { e.printStackTrace(); } } return flag; } public boolean publish(String topicName, int qos, byte[] payload) { boolean flag = false; try { if (client == null) { createConnect(HOST, USERNAME, PASSWORD); } if (!client.isConnected()) { this.reconnect(); } MqttMessage message = new MqttMessage(payload); message.setQos(qos); client.publish(topicName, message); flag = true; } catch (MqttException e) { e.printStackTrace(); } return flag; } private boolean subscribe(String topicName, int qos) { boolean flag = false; if (client != null && client.isConnected()) { try { client.subscribe(topicName, qos); flag = true; } catch (MqttException e) { e.printStackTrace(); } } return flag; } private boolean subscribe(String[] topicName, int qos[]) { boolean flag = false; if (client != null && client.isConnected()) { try { client.subscribe(topicName, qos); flag = true; } catch (MqttException e) { e.printStackTrace(); } } return flag; } /** * 取消连接 * * @throws MqttException */ public void disConnect() throws MqttException { if (client != null && client.isConnected()) { client.disconnect(); } } /** * 关闭连接 */ public void close() { if (client != null && client.isConnected()) { try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } } /** * 重新连接 */ private void reconnect() { if (client != null && !client.isConnected()) { try { client.setCallback(mCallback); client.connect(conOpt); client.subscribe(topic); } catch (MqttException e) { e.printStackTrace(); } } } }
里面用eventBus做了下重连处理,后续响应如下
boolean isHavNet = isConnectIsNomarl(); //判断是否有网 if (isHavNet) { handler.removeMessages(4); MqttManager.getInstance(BaseApplication.getContext()).createConnect(HOST, USERNAME, PASSWORD); }else{ handler.sendEmptyMessageDelayed(4, 10000);//无网等待有网再连 }
第四步:mqtt消息回调类
public class MqttCallbackBus implements MqttCallbackExtended { private Context mContext; public MqttCallbackBus(Context context) { this.mContext = context; } @Override public void connectComplete(boolean reconnect, String serverURI) { Log.e("MqttCallbackBus", "MQTT_connectComplete:"); //断开连接必须重新订阅才能收到之前订阅的session连接的消息 if(reconnect){ Log.e("MqttCallbackBus_重连订阅主题", "MQTT_connectComplete:"); //这里是发送消息去重新订阅 String msg = "reconnect"; EventBus.getDefault().postSticky(msg); } } @Override public void connectionLost(Throwable cause) { //掉线 Log.e("MqttCallbackBus>>>", "MQTT_connectionLost 掉线原因:"+cause.getMessage()); cause.printStackTrace(); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { MqttReceivedMessage msg = (MqttReceivedMessage) message; String s = msg.toString(); Log.e("MQTT_messageArrived_msg", "MQTT_msg"+topic); HashMap hs = new Gson().fromJson(s, HashMap.class); EventBus.getDefault().post(hs); //抛出收到的消息 eventbus响应做自己的业务处理 } @Override public void deliveryComplete(IMqttDeliveryToken token) { //(发布)publish后会执行到这里,发送状态 token.isComplete() Log.e("MqttCallbackBus>>>", "MQTT_deliveryComplete:"); } }
其中connectComplete的重连回调reconnect = true 的eventbus的响应是:
//重连 能进重连 说明必有网了 不必再else 定时重试网络 String[] topic = new String[]{PUBLISH_TOPIC}; try { MqttManager.getInstance(BaseApplication.getContext()).client.subscribe(topic); } catch (MqttException e) { e.printStackTrace(); }
MainActivity的使用如下:
//网络变化监听 首次连接mqtt boolean isHavNet = isConnectIsNomarl(); if (isHavNet) { handler.removeMessages(1); MqttManager.getInstance(BaseApplication.getContext()).createConnect(HOST, USERNAME, PASSWORD); } else { handler.sendEmptyMessageDelayed(1, 5000); }
MQTT全局的配置参数:
附赠个判断是否有网的工具类
/** * 判断网络是否连接 */ private boolean isConnectIsNomarl() { ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext().getSystemService(Context.CONNECTIVITY_SERVICE); @SuppressLint("MissingPermission") NetworkInfo info = connectivityManager.getActiveNetworkInfo(); if (info != null && info.isConnected()) { String name = info.getTypeName(); return true; } else { return false; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)