上一篇RabbitMQ 搭建MQTT服务 中,创建了admin-test 用户,并创建 test-host 虚拟主机 专门处理MQTT业务,接着我们自己开发一个连接RabbitMQ MQTT服务的客户端
GitHub - harrylsp/MQTTClientContribute to harrylsp/MQTTClient development by creating an account on GitHub.https://github.com/harrylsp/MQTTClient
1.安装MQTTnet库 2. 连接MQTT服务 2.1 创建连接对象var mqttFactory = new MqttFactory(); mqttClient = mqttFactory.CreateMqttClient() as MqttClient;2.2 构建连接对象连接参数
var options = new MqttClientOptions { ClientId = clientId, ProtocolVersion = MQTTnet.Formatter.MqttProtocolVersion.V311, ChannelOptions = new MqttClientTcpOptions { Server = tcpServer, Port = tcpPort }, WillDelayInterval = 10, WillMessage = new MqttApplicationMessage { Topic = $"LastWill/{clientId}", Payload = Encoding.UTF8.GetBytes("I Lost the Connection!"), QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.Exactlyonce } }; if (options.ChannelOptions == null) { throw new InvalidOperationException(); } if (!string.IsNullOrWhiteSpace(mqttUser)) { options.Credentials = new MqttClientCredentials { Username = mqttUser, Password = Encoding.UTF8.GetBytes(mqttPassword) }; } // 设置为false,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。 // 设置为true,表示创建一个新的临时会话,在客户端断开时,会话自动销毁。 options.CleanSession = true; // 连接保活心跳 options.KeepAlivePeriod = TimeSpan.FromSeconds(10);2.3 注册监听回调事件
mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(new Action2.4 发起连接(e => { LogManager.WriteLogEx(LOGLEVEL.INFO, "客户端已连接"); })); mqttClient.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(new Action (e => { LogManager.WriteLogEx(LOGLEVEL.INFO, "客户端已断开连接"); })); mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(new Action (e => { string text = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); string topic = e.ApplicationMessage.Topic; string qos = e.ApplicationMessage.QualityOfServiceLevel.ToString(); string retained = e.ApplicationMessage.Retain.ToString(); LogManager.WriteLogEx(LOGLEVEL.INFO, $"客户端接收消息 >>Topic:{topic}; Qos:{qos}; Retained:{retained}"); LogManager.WriteLogEx(LOGLEVEL.INFO, $"客户端接收消息 >>Msg:{text}"); }));
await mqttClient.ConnectAsync(options); LogManager.WriteLogEx(LOGLEVEL.INFO, $"客户端{options.ClientId}尝试连接...");2.5 连接MQTT服务 3. 订阅主题
await mqttClient.SubscribeAsync(topic);
4. 发布消息
var message = new MqttApplicationMessage { Topic = topic, Payload = Encoding.UTF8.GetBytes(payload), QualityOfServiceLevel = MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce, Retain = true }; await mqttClient.PublishAsync(message, CancellationToken.None);
5.其他 5.1 MQTT协议介绍
https://docs.emqx.cn/broker/v4.3/development/protocol.html#mqtt%E5%8D%8F%E8%AE%AE
5.2 源码GitHub - harrylsp/MQTTClient
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)