- 解决方案
- 一、EMQX 规则引擎配置
- 二、订阅端代码
- 1、订阅端代码
- 2、订阅端回调代码
- 三、发布端代码
该规则引擎为获取指定主题的所有信息,并转发到指定接口。如果未按上述解决方案设置编码,此处 select 获取到的中文为乱码。
package com.mqtt.client; import java.text.SimpleDateFormat; import java.util.Date; import java.util.UUID; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MyMqttClient { public static final String MQTT_BROKER_HOST = "tcp://localhost:1883"; public static final String MQTT_CLIENT_ID = UUID.randomUUID().toString().substring(0, 8).toUpperCase(); public static final String USERNAME = "123456"; public static final String PASSWORD = "123456"; public static final String TOPIC_FILTER = "/wxhntmy/mqtt/#"; private volatile static MqttClient mqttClient; private static MqttConnectOptions options; public static void main(String[] args) { // TODO 自动生成的方法存根 try { // host为主机名,clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示, // MemoryPersistence设置clientid的保存形式,默认为以内存保存 mqttClient = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence()); // 配置参数信息 options = new MqttConnectOptions(); // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(true); // 设置用户名 options.setUserName(USERNAME); // 设置密码 options.setPassword(PASSWORD.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20); // 连接 mqttClient.connect(options); // 订阅 mqttClient.subscribe(TOPIC_FILTER); // 设置回调 mqttClient.setCallback(new MyMqttCallback()); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String datef = sdf.format(date); System.out.println("[" + datef + "] MQTT_Client Started!"); System.out.println("[" + datef + "] MQTT_Client Host: " + MQTT_BROKER_HOST); System.out.println("[" + datef + "] MQTT_Client ID: " + MQTT_CLIENT_ID); System.out.println("[" + datef + "] MQTT_Client Topic_Filter: " + TOPIC_FILTER); } catch (Exception e) { e.printStackTrace(); } } }2、订阅端回调代码
package com.mqtt.client; import java.text.SimpleDateFormat; import java.util.Date; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; public class MyMqttCallback implements MqttCallback { @Override public void connectionLost(Throwable arg0) { // TODO 自动生成的方法存根 Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String datef = sdf.format(date); System.out.println("[" + datef + "] Connection Lost!"); } @Override public void deliveryComplete(IMqttDeliveryToken arg0) { // TODO 自动生成的方法存根 Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String datef = sdf.format(date); System.out.println("[" + datef + "] Delivery Complete!"); } @Override public void messageArrived(String arg0, MqttMessage arg1) throws Exception { // TODO 自动生成的方法存根 Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String datef = sdf.format(date); //必须设置获取消息的字符编码,否则获取到的是乱码 String msgString = new String(arg1.getPayload(), "UTF-8"); System.out.println( "[" + datef + "] Topic: " + arg0 + " Qos: " + arg1.getQos() + " Message: " + msgString); } }三、发布端代码
package com.mqtt.publish; import java.io.UnsupportedEncodingException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.UUID; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class PublishSample { public static void main(String[] args) throws UnsupportedEncodingException { // TODO 自动生成的方法存根 // 主题 String topic = "/wxhntmy/mqtt"; // 内容 String content = "hello 哈哈"; int qos = 1; String broker = "tcp://localhost:1883"; String userName = "123456"; String password = "123456"; String clientId = String.valueOf(System.currentTimeMillis()); // 内存存储 MemoryPersistence persistence = new MemoryPersistence(); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String datef = sdf.format(date); System.out.println("[" + datef + "] MQTT_Publish Started!"); System.out.println("[" + datef + "] MQTT_Publish Broker: " + broker); System.out.println("[" + datef + "] MQTT_Publish ClientID: " + clientId); System.out.println("[" + datef + "] MQTT_Publish Topic: " + topic); System.out.println("[" + datef + "] MQTT_Publish Content: " + content); // 创建客户端 MqttClient sampleClient = null; try { sampleClient = new MqttClient(broker, clientId, persistence); } catch (MqttException e1) { // TODO 自动生成的 catch 块 e1.printStackTrace(); } // 创建链接参数 MqttConnectOptions connOpts = new MqttConnectOptions(); try { // 在重新启动和重新连接时记住状态 connOpts.setCleanSession(false); // 设置连接的用户名 connOpts.setUserName(userName); connOpts.setPassword(password.toCharArray()); connOpts.setKeepAliveInterval(20); // 建立连接 sampleClient.connect(connOpts); // 创建消息 //规则引擎获取到的中文乱码原因在于这里,没有设置获取Bytes的字符编码 MqttMessage message = new MqttMessage(content.getBytes("UTF-8")); // 设置消息的服务质量 message.setQos(qos); String tp = topic + "/" + String.valueOf(System.currentTimeMillis()); while(true) { // 发布消息 sampleClient.publish(tp, message); date = new Date(); sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); datef = sdf.format(date); System.out.println("[" + datef + "] topic " + tp); System.out.println("[" + datef + "] message " + message); try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { // TODO 自动生成的 catch 块 e.printStackTrace(); } } } catch (MqttException me) { try { // 断开连接 sampleClient.disconnect(); // 关闭客户端 sampleClient.close(); } catch (MqttException e) { // TODO 自动生成的 catch 块 e.printStackTrace(); } date = new Date(); sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); datef = sdf.format(date); System.out.println("[" + datef + "] reason " + me.getReasonCode()); System.out.println("[" + datef + "] msg " + me.getMessage()); System.out.println("[" + datef + "] loc " + me.getLocalizedMessage()); System.out.println("[" + datef + "] cause " + me.getCause()); System.out.println("[" + datef + "] excep " + me); me.printStackTrace(); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)