Vert.x MQTT消息发送, 客户端和服务端
1. maven项目依赖2.YAML文件配置io.vertx vertx-webio.vertx vertx-mqttio.vertx vertx-config-yamlcom.fasterxml.jackson.core jackson-databindcom.lance.common vertx-common-core0.0.1-SNAPSHOT
server: host: 127.0.0.1 port: 180033.MQTT服务端配置
public class MqttServerApp extends AbstractVerticle { private final static String CLIENT_ID = "clientHello"; @Override public void start(Promise4.MQTT客户端配置startPromise) throws Exception { ConfigProperties properties = config().mapTo(ConfigProperties.class); int port = properties.getServer().getPort(); log.info("===>json: {}, port: {}", properties, port); MqttServer mqttServer = MqttServer.create(vertx, create(properties)); mqttServer.endpointHandler(endpoint -> { // shows main connect info log.info("MQTT client [{}] request to connect, clean session = {}", endpoint.clientIdentifier(), endpoint.isCleanSession()); if (endpoint.auth() != null) { log.info("[username = {}, password = {}]", endpoint.auth().getUsername(), endpoint.auth().getPassword()); } log.info("[properties = {}]", endpoint.connectProperties()); if (endpoint.will() != null) { log.info("[will topic: {}, msg: {}, QoS: {}, isRetain: {}]", endpoint.will().getWillTopic(), endpoint.will().getWillMessageBytes(), endpoint.will().getWillQos(), endpoint.will().isWillRetain()); } log.info("[keep alive timeout = {}]", endpoint.keepAliveTimeSeconds()); // accept connection from the remote client endpoint.accept(true); receiver(endpoint); endpoint.disconnectMessageHandler(disconnectMessage -> log.info("Received disconnect from client, reason code = {}", disconnectMessage.code())); }) .exceptionHandler(t -> log.error("MQTT exception fail: ", t)) .listen(ar -> { if (ar.succeeded()) { log.warn("MQTT server is listening on port: {}", ar.result().actualPort()); } else { log.error("Fail on starting the server: ", ar.cause()); } }); } private void receiver(MqttEndpoint endpoint) { endpoint.publishHandler(p -> { log.info("Server received message [{}] with QoS [{}]", p.payload().toString(Charset.defaultCharset()), p.qosLevel()); if (p.qosLevel() == MqttQoS.AT_LEAST_ONCE) { endpoint.publishAcknowledge(p.messageId()); } else if (p.qosLevel() == MqttQoS.EXACTLY_ONCE) { endpoint.publishReceived(p.messageId()); } send(endpoint); }) .publishReleaseHandler(endpoint::publishComplete); } private void send(MqttEndpoint endpoint) { Buffer payload = Buffer.buffer("server: hello world."); endpoint.publish(MqttClientApp.MQTT_TOPIC, payload, MqttQoS.AT_MOST_ONCE, false, false, s -> { if (s.succeeded()) { log.info("===>Server publish success: {}", s.result()); } else { log.error("===>Server publish fail: ", s.cause()); } }); } private MqttServerOptions create(ConfigProperties configProperties) { MqttServerOptions options = new MqttServerOptions(); options.setPort(configProperties.getServer().getPort()); options.setHost(configProperties.getServer().getHost()); return options; } }
public class MqttClientApp extends AbstractVerticle { public static final String MQTT_TOPIC = "hello_topic"; @Override public void start() { MqttClient client = MqttClient.create(vertx, create()); // handler will be called when we have a message in topic we subscribe for client.publishHandler(p -> { log.info("Client received message on [{}] payload [{}] with QoS [{}]", p.topicName(), p.payload().toString(Charset.defaultCharset()), p.qosLevel()); }); client.connect(18003, "127.0.0.1", s -> { if (s.succeeded()) { log.info("Client connect success."); subscribe(client); } else { log.error("Client connect fail: ", s.cause()); } }).exceptionHandler(event -> { log.error("client fail: ", event.getCause()); }); } private void subscribe(MqttClient client) { client.subscribe(MQTT_TOPIC, 0, e -> { if (e.succeeded()) { log.info("===>subscribe success: {}", e.result()); vertx.setPeriodic(10_000, l -> publish(client)); } else { log.error("===>subscribe fail: ", e.cause()); } }); } private void publish(MqttClient client) { Buffer payload = Buffer.buffer("client: hello world."); client.publish(MQTT_TOPIC, payload, MqttQoS.AT_MOST_ONCE, false, false, s -> { if (s.succeeded()) { log.info("===>Client publish success: {}", s.result()); } else { log.error("===>Client publish fail: ", s.cause()); } }); } private MqttClientOptions create() { MqttClientOptions options = new MqttClientOptions(); options.setClientId("ClientId_" + RandomStringUtils.randomAlphanumeric(6)); options.setMaxMessageSize(100_000_000); options.setKeepAliveInterval(2); return options; } }5. 结果
2022-01-25 19:06:53.244 WARN 21 --- [ntloop-thread-1] lver.dns.DnsServerAddressStreamProviders---[ 70] : Can not find io.netty.resolver.dns.macos.MacOSDnsServerAddressStreamProvider in the classpath, fallback to system defaults. This may result in incorrect DNS resolutions on MacOS. 2022-01-25 19:06:53.291 INFO 21 --- [ntloop-thread-1] io.vertx.mqtt.impl.MqttClientImpl ---[ ] : Connection with 127.0.0.1:18003 established successfully 2022-01-25 19:06:53.432 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 34] : Client connect success. 2022-01-25 19:06:53.512 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 47] : ===>subscribe success: 1 2022-01-25 19:07:03.537 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 59] : ===>publish success: 2 2022-01-25 19:07:03.551 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 29] : Client received message on [hello_topic] payload [server: hello world.] with QoS [AT_MOST_ONCE] 2022-01-25 19:07:13.518 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 59] : ===>publish success: 3 2022-01-25 19:07:13.521 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 29] : Client received message on [hello_topic] payload [server: hello world.] with QoS [AT_MOST_ONCE] 2022-01-25 19:07:23.521 INFO 21 --- [ntloop-thread-1] org.lance.mqtt.MqttClientApp ---[ 59] : ===>publish success: 46.项目完整地址
Vertx之MQTT客户端服务端发送 Github 地址
Vertx之MQTT客户端服务端发送 Gitee 地址
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)