一、我为什么要在flutter项目中使用mqtt?
我的项目是使用flutter开发,然后有一个功能是我们需要和蓝牙网关进行通信,然后网关通信使用的mqtt协议。由于flutter的pub仓库中提供了非常方便的插件——mqtt_client,所以我们可以使用这个插件,非常简单方便的就可以实现和网关的通信。
二、使用
在flutter项目中的pubspec.yaml文件中加入依赖,然后pub get依赖。
mqtt_client: ^8.2.0
版本根据自己的需要选择,因为我的项目比较老,还没有切换到flutter2.0所以不能使用支持空安全的版本。
这个插件的使用非常简单,我们只需要连接上mqtt服务器,连接成功之后,我们就可以发布消息和订阅消息。
先记录一下我封装的工具类:
import 'dart:io';
import 'dart:convert';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
import 'package:typed_data/typed_data.dart';
typedef ConnectedCallBack = void Function();
class MqttTool{
MqttQos qos = MqttQos.exactlyOnce;
MqttServerClient mqttClient;
static MqttTool _instance;
static MqttTool getInstance(){
if(_instance == null){
_instance = new MqttTool();
}
return _instance;
}
Future connect(String server,int port,String clientIdentifier,String userName,String password,{bool isSsl = false}){
mqttClient = MqttServerClient.withPort(server, clientIdentifier, port);
mqttClient.onConnected = _onConnected;
mqttClient.onDisconnected = _onDisconnected;
mqttClient.onSubscribed = _onSubscribed;
mqttClient.onSubscribeFail = _onSubscribeFail;
mqttClient.onUnsubscribed = _onUnsubscribed;
mqttClient.pongCallback = _pong;
final connMessage = MqttConnectMessage().authenticateAs(userName, password).keepAliveFor(60).startClean().withWillQos(qos);
mqttClient.connectionMessage = connMessage;
mqttClient.setProtocolV311();
mqttClient.logging(on: false);
if(isSsl){
mqttClient.secure = true;
mqttClient.onBadCertificate = _onBadCertificate;
}
return mqttClient.connect(userName,password);
}
void disConnect(){
mqttClient?.disconnect();
}
///发送string数据
int publishMessage(String pTopic,String msg){
Uint8Buffer data = new Uint8Buffer();
var codeUnits = msg.codeUnits;
data.addAll(codeUnits);
return mqttClient.publishMessage(pTopic, qos, data,retain: false);
}
int publishJsonMessage(String pTopic, json){
return publishMessage(pTopic,jsonEncode(json));
}
///发送数据
int publishRawMessage(String pTopic,List list){
Uint8Buffer data = new Uint8Buffer();
data.addAll(list);
return mqttClient.publishMessage(pTopic, qos, data,retain: false);
}
///订阅消息
Subscription subscribeMessage(String subtopic){
return mqttClient.subscribe(subtopic, qos);
}
///取消消息订阅
void unsubscribeMessage(String unSubtopic){
mqttClient.unsubscribe(unSubtopic);
}
///获取mqtt状态
MqttClientConnectionStatus getMqttStatus(){
return mqttClient.connectionStatus;
}
Stream>> updates(){
return mqttClient.updates;
}
void _onConnected(){
_log("已连接");
}
void _onDisconnected() {
_log("连接已断开");
}
void _onSubscribed(String topic) {
_log("订阅成功--$topic");
}
void _onSubscribeFail(String topic) {
_log("订阅失败--$topic");
}
void _onUnsubscribed(String topic) {
_log("取消订阅成功--$topic");
}
bool _onBadCertificate(X509Certificate certificate) {
_log("证书校验失败");
return true;
}
void _log(String msg){
print("MQTT--->$msg");
}
void _pong() {
print('Ping response client callback invoked');
}
}
封装得很简单,有其他需要可以自己加逻辑。
然后使用如下:
1、创建连接
// 建立连接
_connect() async {
String server = "broker.emqx.io";
int port = 1883;
String clientId = "test";
String userName = "username";
String password = "password";
MqttTool.getInstance()
.connect(server, port, clientId, userName, password)
.then((v) {
if (v.returnCode == MqttConnectReturnCode.connectionAccepted) {
print("mqtt连接成功");
_startListen();
} else if (v.returnCode == MqttConnectReturnCode.badUsernameOrPassword) {
print("mqtt连接失败 --密码错误");
} else {
print(" mqtt连接失败");
}
},onError: (error){
print("连接出错了--$error");
});
}
server是你的mqtt服务,port是你的服务端口,然后clientId决定客户端的唯一性,这个id可以不设置,如果不设置,在emqtt服务端会自动产生一个唯一的id,如果你要用到session,必须有一个唯一个id,你可以用imei。如果你一定要收到离线消息的话,就必须使用确定的id了。如果clientId重复的话,要么会拒绝新的连接尝试,要么会断开之前现有的连接。
2、订阅和取消订阅
// 订阅主题
_subscribeTopic() {
MqttTool.getInstance().subscribeMessage("pubTopic");
}
// 取消订阅
_unSubscribeTopic() {
MqttTool.getInstance().unsubscribeMessage("pubTopic");
}
传入的是约定的主题,就是其他端给你发送什么主题,你就订阅你需要的那个主题。
3、发布消息
// 发布消息
_publishTopic(json) {
int result = MqttTool.getInstance().publishJsonMessage("sendmsg",json );
}
第一个参数就是你发出的主题。别的端可以订阅这个主题,来接收你发出的消息。
5、监听订阅的消息
// 开启监听消息
_startListen() {
_listenSubscription = MqttTool.getInstance().updates().listen(_onData);
}
// 监听消息的具体实现
_onData(List> data) {
final MqttPublishMessage recMess = data[0].payload;
final String topic = data[0].topic;
final String pt = Utf8Decoder().convert(recMess.payload.message);
var result = jsonDecode(pt);
Map p = Map();
p["topic"] = topic;
p["type"] = "json";
p["payload"] = result;
print("收到消息--$p");eak;
}
}
然后做相应的逻辑处理就好了。只是一个笔记,不是教程!!!!!
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)