python使用paho-mqtt库, 连接mqtt服务器进行发布与订阅消息的极简示例
0 安装库sudo pip install paho-mqtt1 连接服务器示例
Paho库采用回调函数的方式来返回连接状态
代码中还设置了遗嘱消息,这条消息会存储在服务器,一旦客户端非正常断开(不使用disconnect断开,最常见的是代码出错卡死),即会发布该消息。
import paho.mqtt.client as mqtt HOST = "192.168.103.174" #服务器ip地址 PORT = 1883 #服务器端口 USER = 'pc' #登陆用户名 PASSWORD = '123' #用户名对应的密码 def on_connect(client, userdata, flags, rc): rc_status = [ "连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户密码错误", "无授权" ] print("connect:" , rc_status[rc]) client = mqtt.Client() client.on_connect = on_connect #注册返回连接状态的回调函数 client.username_pw_set(USER, PASSWORD) #如果服务器要求需要账户密码 client.will_set("test/die", "我死了", 0) #设置遗嘱消息 client.connect(HOST, PORT, keepalive=600) # 连接服务器 #client.disconnect() #断开连接,不会触发遗嘱消息2 发布消息的API
较为简单,调用这条API即可发布消息
TOPIC_PUB = "test/py_test" #发布主题 MESSAGE = "这是测试消息" #载荷 client.publish(TOPIC_PUB, MESSAGE, qos=0) #发布消息3 接收数据
接收数据是通过登记回调函数,当收到数据时会调用该函数。
def on_message(client, userdata, msg): print("主题:", msg.topic) print("消息:", str(msg.payload,'utf-8') ,'n' ) client.on_message = on_message #定义回调函数 client.subscribe('test/#', qos=0) #订阅主题test/# client.loop_start() #非阻塞,启动接收线程 #client.loop_forever() #阻塞式,会卡死在这等待接收测试代码 1 编写发送程序
以下代码的功能是,每隔3秒发送一句测试消息
import time import paho.mqtt.client as mqtt HOST = "192.168.103.174" #服务器ip地址 PORT = 1883 #服务器端口 USER = 'pc' #登陆用户名 PASSWORD = '123' #用户名对应的密码 def on_connect(client, userdata, flags, rc): rc_status = [ "连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户密码错误", "无授权" ] print("connect:" , rc_status[rc]) client = mqtt.Client() client.on_connect = on_connect #注册返回连接状态的回调函数 client.username_pw_set(USER, PASSWORD) #如果服务器要求需要账户密码 client.will_set("test/die", "我死了", 0) #设置遗嘱消息 client.connect(HOST, PORT, keepalive=600) # 连接服务器 #client.disconnect() #断开连接,不会触发遗嘱消息 TOPIC_PUB = "test/py_test" #发布主题 MESSAGE = "这是测试消息" #载荷 while 1: client.publish(TOPIC_PUB, MESSAGE, qos=0) #发布消息 time.sleep(3)
使用软件mqttx连接至mqtt服务器查看消息
2 编写接收测试程序
以下代码的功能是订阅test/#主题,并将收到的消息print出来。
我是非阻塞党,不能停在那条阻塞接收里,必须停在由自己控制的while内
import paho.mqtt.client as mqtt HOST = "192.168.103.174" #服务器ip地址 PORT = 1883 #服务器端口 USER = 'pc' #登陆用户名 PASSWORD = '123' #用户名对应的密码 def on_connect(client, userdata, flags, rc): rc_status = [ "连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户密码错误", "无授权" ] print("connect:" , rc_status[rc]) client = mqtt.Client() client.on_connect = on_connect #注册返回连接状态的回调函数 client.username_pw_set(USER, PASSWORD) #如果服务器要求需要账户密码 client.will_set("test/die", "我死了", 0) #设置遗嘱消息 client.connect(HOST, PORT, keepalive=600) # 连接服务器 #client.disconnect() #断开连接,不会触发遗嘱消息 def on_message(client, userdata, msg): print("主题:", msg.topic) print("消息:", str(msg.payload,'utf-8') ,'n' ) client.on_message = on_message #定义回调函数 client.subscribe('test/#', qos=0) #订阅主题test/# client.loop_start() #非阻塞,启动接收线程 #client.loop_forever() #阻塞式,会卡死在这等待接收 while 1: pass
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)