我没有用到串口处理太深的东西。
客户的原程序不能给你,不过我给你改一下吧。
里面的一些东西,已经经过了处理,要运行,可能你要运哗或自己改一下,把没有用的东西去掉。
我这里已经没有串口设备了,不能调了,你自己处理一下吧,不过旁伍基本的东西已经有了。
=================================================================
#coding=gb18030
import sys,threading,time
import serial
import binascii,encodings
import re
import socket
class ReadThread:
def __init__(self, Output=None, Port=0, Log=None, i_FirstMethod=True):
self.l_serial = None
self.alive = False
self.waitEnd = None
self.bFirstMethod = i_FirstMethod
self.sendport = ''
self.log = Log
self.output = Output
self.port = Port
self.re_num = None
def waiting(self):
if not self.waitEnd is None:
self.waitEnd.wait()
def SetStopEvent(self):
if not self.waitEnd is None:
self.waitEnd.set()
self.alive = False
self.stop()
def start(self):
self.l_serial = serial.Serial()
self.l_serial.port = self.port
self.l_serial.baudrate = 9600
self.l_serial.timeout = 2
self.re_num = re.compile('\d')
try:
if not self.output is None:
self.output.WriteText(u'打开通讯端口\r\n')
if not self.log is None:
self.log.info(u'打开通讯端口')
self.l_serial.open()
except Exception, ex:
if self.l_serial.isOpen():
self.l_serial.close()
self.l_serial = None
if not self.output is None:
self.output.WriteText(u'出错:\r\n%s\r\n' % ex)
if not self.log is None:
self.log.error(u'%s' % ex)
return False
if self.l_serial.isOpen():
if not self.output is None:
self.output.WriteText(u'创建接收任务\r\n')
if not self.log is None:
self.log.info(u'创建接收任务')
self.waitEnd = threading.Event()
self.alive = True
self.thread_read = None
self.thread_read = threading.Thread(target=self.FirstReader)
self.thread_read.setDaemon(1)
self.thread_read.start()
return True
else:
if not self.output is None:
self.output.WriteText(u'通讯端口未打开\r\n')
if not self.log is None:
self.log.info(u'通讯端口未打开')
return False
def InitHead(self):
#串口的其它的一些处理
try:
time.sleep(3)
if not self.output is None:
self.output.WriteText(u'数据接收任务开始连接网络\r\n')
if not self.log is None:
self.log.info(u'数据接收任务开始连接网络')
self.l_serial.flushInput()
self.l_serial.write('\x11')
data1 = self.l_serial.read(1024)
except ValueError,ex:
if not self.output is None:
self.output.WriteText(u'出错:\r\n%s\r\n' % ex)
if not self.log is None:
self.log.error(u'%s' % ex)
self.SetStopEvent()
return
if not self.output is None:
self.output.WriteText(u'开始接收数据\r\n')
if not self.log is None:
self.log.info(u'开始接收数据')
self.output.WriteText(u'===================================\r\n')
def SendData(self, i_msg):
lmsg = ''
isOK = False
if isinstance(i_msg, unicode):
lmsg = i_msg.encode('gb18030')
else:
lmsg = i_msg
try:
#发送数据到相应的处理组件
pass
except Exception, ex:
pass
return isOK
def FirstReader(self):
data1 = ''
isQuanJiao = True
isFirstMethod = True
isEnd = True
readCount = 0
saveCount = 0
RepPos = 0
#read Head Infor content
self.InitHead()
while self.alive:
try:
data = ''
n = self.l_serial.inWaiting()
if n:
data = data + self.l_serial.read(n)
#print binascii.b2a_hex(data),
for l in xrange(len(data)):
if ord(data[l])==0x8E:
isQuanJiao = True
continue
if ord(data[l])==0x8F:
isQuanJiao = False
continue
if ord(data[l]) == 0x80 or ord(data[l]) == 0x00:
if len(data1)>10:
if not self.re_num.search(data1,1) is None:
saveCount = saveCount + 1
if RepPos==0:
RepPos = self.output.GetInsertionPoint()
self.output.Remove(RepPos,self.output.GetLastPosition())
self.SendData(data1)
data1 = ''
continue
except Exception, ex:
if not self.log is None:
self.log.error(u'%s' % ex)
self.waitEnd.set()
self.alive = False
def stop(self):
self.alive = False
self.thread_read.join()
if self.l_serial.isOpen():
self.l_serial.close()
if not self.output is None:
self.output.WriteText(u'关闭通迅端口:[%d] \r\n' % self.port)
if not self.log is None:
self.log.info(u'关闭通迅端口:[%d]' % self.port)
def printHex(self, s):
s1 = binascii.b2a_hex(s)
print s1
#测试用部分
if __name__ == '__main__':
rt = ReadThread()
f = open("sendport.cfg", "r")
rt.sendport = f.read()
f.close()
try:
if rt.start():
rt.waiting()
rt.stop()
else:
pass
except Exception,se:
print str(se)
if rt.alive:
rt.stop()
print 'End OK .'
del rt
Python非常适合写一些测试的脚本,如快速的串口通信测试等。如果使用VC++ QT开发,可能用时较多,使用python,如果掌握使用方法,可以直接读写测试,配合设备或是串口助手,很快验证与实现。Python有没有现成的串口API直接调用呢?经过实践验证,需要安装一个叫 Pyserial的组件即可。这个可以在github上下载。
在windows 7 64bit 上可以使用吗?当然可以使用,我安装的python3.5为64位的。把下载后的文件,其中有一个serial的文件夹,拷贝到python35安装路径, C:\Python35\Lib\site-packages\serial
网上可以搜一下windows的安装包,安装完也是:C:\Python35\Lib\site-packages\serial ,可以用最新的版本,替换即可。
测试的方法:在python IDE里测试:
>>>import serial
这里如果报错,是州岩python版本与pyserial版本没有配合肆腔好。如果正常,不返回,即可以导入serial模块。
>>>ser=serial.Serial("COM5",115200)
这里为COM5,115200的波特率。如果打不开,请检查安装环境。
>>>ser.write('hello,serial test'.encode())
17
发送测试(如果返回字节数,说明返回成功),这里需要转换一个编码为字节。
以上测试,可以使用现在的设备或是串口助手,如安装Virtual Serial Port Driver 7.2 虚拟串口软件,设置一对串口,进行自发自收的测试。
>>>print(ser.read()line())
b'abcdefg\r\n'
这里是串口接收,有接收的超时。设备或是串口助手发送一个字符串,以回车换行结束,这里就可以收到打印出来。
也可以用ser.read(),这里只接收一个字符来实现。
上面已经实现了基本的串口 *** 作。
关闭串口为:裂迹衫
>>>ser.close()
如果使用python,一般写个py文件,就像windows bat 批处理一样,这是python强大的地方。如果写一个py脚本呢?其实只要把上面的命令,一条条写下来,就是一个脚本,测试如下:
import serialser=serial.Serial("COM5",115200,timeout=0.5)for i in range(0,100-1):ser.write('hello\r\n'.encode())print(ser.readline())ser.close()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)