打开串口后启动一个线程来监听串口数据的进入,有数据时,就做数据的处理。
用python写串口通信程序的示例:
#coding=gb18030
import sys,threading,time
import serial
import binascii,encodings
import re
import socket
class ReadThread:
def __init__(self, Output=None, Port=0, Log=None, i_FirstMethod=True):
self.l_serial = None
self.alive = False
self.waitEnd = None
self.bFirstMethod = i_FirstMethod
self.sendport = ''
self.log = Log
self.output = Output
self.port = Port
self.re_num = None
def waiting(self):
if not self.waitEnd is None:
self.waitEnd.wait()
def SetStopEvent(self):
if not self.waitEnd is None:
self.waitEnd.set()
self.alive = False
self.stop()
def start(self):
self.l_serial = serial.Serial()
self.l_serial.port = self.port
self.l_serial.baudrate = 9600
self.l_serial.timeout = 2
self.re_num = re.compile('\d')
try:
if not self.output is None:
self.output.WriteTex
if not self.log is None:
self.log.info
self.l_serial.open()
except Exception, ex:
if self.l_serial.isOpen():
self.l_serial.close()
self.l_serial = None
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.error(u'%s' % ex)
return False
if self.l_serial.isOpen():
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.info
self.waitEnd = threading.Event()
self.alive = True
self.thread_read = None
self.thread_read = threading.Thread(target=self.FirstReader)
self.thread_read.setDaemon(1)
self.thread_read.start()
return True
else:
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.info
return False
def InitHead(self):
try:
time.sleep(3)
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.info
self.l_serial.flushInput()
self.l_serial.write('\x11')
data1 = self.l_serial.read(1024)
except ValueError,ex:
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.error(u'%s' % ex)
self.SetStopEvent()
return
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.info
self.output.WriteText(u'===================================\r\n')
def SendData(self, i_msg):
lmsg = ''
isOK = False
if isinstance(i_msg, unicode):
lmsg = i_msg.encode('gb18030')
lmsg = i_msg
pass
except Exception, ex:
pass
return isOK
def FirstReader(self):
data1 = ''
isQuanJiao = True
isFirstMethod = True
isEnd = True
readCount = 0
saveCount = 0
RepPos = 0
#read Head Infor content
self.InitHead()
while self.alive:
try:
data = ''
n = self.l_serial.inWaiting()
if n:
data = data + self.l_serial.read(n)
#print binascii.b2a_hex(data),
for l in xrange(len(data)):
if ord(data[l])==0x8E:
isQuanJiao = True
continue
if ord(data[l])==0x8F:
isQuanJiao = False
continue
if ord(data[l]) == 0x80 or ord(data[l]) == 0x00:
if len(data1)>10:
if not self.re_num.search(data1,1) is None:
saveCount = saveCount + 1
if RepPos==0:
RepPos = self.output.GetInsertionPoint()
self.output.Remove(RepPos,self.output.GetLastPosition())
self.SendData(data1)
data1 = ''
continue
except Exception, ex:
if not self.log is None:
self.log.error(u'%s' % ex)
self.waitEnd.set()
self.alive = False
def stop(self):
self.alive = False
self.thread_read.join()
if self.l_serial.isOpen():
self.l_serial.close()
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.info
def printHex(self, s):
s1 = binascii.b2a_hex(s)
print s1
if __name__ == '__main__':
rt = ReadThread()
f = open("sendport.cfg", "r")
rt.sendport = f.read()
f.close()
try:
if rt.start():
rt.waiting()
rt.stop()
else:
pass
except Exception,se:
print str(se)
if rt.alive:
rt.stop()
print 'End OK .'
del rt
就是打开串口后,启动一个线程来监听串口数据的进入,有数据时,就做数据的处理(也可以发送一个事件,并携带接收到的数据)。我没有用到串口处理太深的东西。
客户的原程序不能给你,不过我给你改一下吧。
里面的一些东西,已经经过了处理,要运行,可能你要自己改一下,把没有用的东西去掉。
我这里已经没有串口设备了,不能调了,你自己处理一下吧,不过基本的东西已经有了。
=================================================================
#coding=gb18030
import sys,threading,time
import serial
import binascii,encodings
import re
import socket
class ReadThread:
def __init__(self, Output=None, Port=0, Log=None, i_FirstMethod=True):
self.l_serial = None
self.alive = False
self.waitEnd = None
self.bFirstMethod = i_FirstMethod
self.sendport = ''
self.log = Log
self.output = Output
self.port = Port
self.re_num = None
def waiting(self):
if not self.waitEnd is None:
self.waitEnd.wait()
def SetStopEvent(self):
if not self.waitEnd is None:
self.waitEnd.set()
self.alive = False
self.stop()
def start(self):
self.l_serial = serial.Serial()
self.l_serial.port = self.port
self.l_serial.baudrate = 9600
self.l_serial.timeout = 2
self.re_num = re.compile('\d')
try:
if not self.output is None:
self.output.WriteText(u'打开通讯端口\r\n')
if not self.log is None:
self.log.info(u'打开通讯端口')
self.l_serial.open()
except Exception, ex:
if self.l_serial.isOpen():
self.l_serial.close()
self.l_serial = None
if not self.output is None:
self.output.WriteText(u'出错:\r\n%s\r\n' % ex)
if not self.log is None:
self.log.error(u'%s' % ex)
return False
if self.l_serial.isOpen():
if not self.output is None:
self.output.WriteText(u'创建接收任务\r\n')
if not self.log is None:
self.log.info(u'创建接收任务')
self.waitEnd = threading.Event()
self.alive = True
self.thread_read = None
self.thread_read = threading.Thread(target=self.FirstReader)
self.thread_read.setDaemon(1)
self.thread_read.start()
return True
else:
if not self.output is None:
self.output.WriteText(u'通讯端口未打开\r\n')
if not self.log is None:
self.log.info(u'通讯端口未打开')
return False
def InitHead(self):
#串口的其它的一些处理
try:
time.sleep(3)
if not self.output is None:
self.output.WriteText(u'数据接收任务开始连接网络\r\n')
if not self.log is None:
self.log.info(u'数据接收任务开始连接网络')
self.l_serial.flushInput()
self.l_serial.write('\x11')
data1 = self.l_serial.read(1024)
except ValueError,ex:
if not self.output is None:
self.output.WriteText(u'出错:\r\n%s\r\n' % ex)
if not self.log is None:
self.log.error(u'%s' % ex)
self.SetStopEvent()
return
if not self.output is None:
self.output.WriteText(u'开始接收数据\r\n')
if not self.log is None:
self.log.info(u'开始接收数据')
self.output.WriteText(u'===================================\r\n')
def SendData(self, i_msg):
lmsg = ''
isOK = False
if isinstance(i_msg, unicode):
lmsg = i_msg.encode('gb18030')
else:
lmsg = i_msg
try:
#发送数据到相应的处理组件
pass
except Exception, ex:
pass
return isOK
def FirstReader(self):
data1 = ''
isQuanJiao = True
isFirstMethod = True
isEnd = True
readCount = 0
saveCount = 0
RepPos = 0
#read Head Infor content
self.InitHead()
while self.alive:
try:
data = ''
n = self.l_serial.inWaiting()
if n:
data = data + self.l_serial.read(n)
#print binascii.b2a_hex(data),
for l in xrange(len(data)):
if ord(data[l])==0x8E:
isQuanJiao = True
continue
if ord(data[l])==0x8F:
isQuanJiao = False
continue
if ord(data[l]) == 0x80 or ord(data[l]) == 0x00:
if len(data1)>10:
if not self.re_num.search(data1,1) is None:
saveCount = saveCount + 1
if RepPos==0:
RepPos = self.output.GetInsertionPoint()
self.output.Remove(RepPos,self.output.GetLastPosition())
self.SendData(data1)
data1 = ''
continue
except Exception, ex:
if not self.log is None:
self.log.error(u'%s' % ex)
self.waitEnd.set()
self.alive = False
def stop(self):
self.alive = False
self.thread_read.join()
if self.l_serial.isOpen():
self.l_serial.close()
if not self.output is None:
self.output.WriteText(u'关闭通迅端口:[%d] \r\n' % self.port)
if not self.log is None:
self.log.info(u'关闭通迅端口:[%d]' % self.port)
def printHex(self, s):
s1 = binascii.b2a_hex(s)
print s1
#测试用部分
if __name__ == '__main__':
rt = ReadThread()
f = open("sendport.cfg", "r")
rt.sendport = f.read()
f.close()
try:
if rt.start():
rt.waiting()
rt.stop()
else:
pass
except Exception,se:
print str(se)
if rt.alive:
rt.stop()
print 'End OK .'
del rt
python里面使用serial库来 *** 作串口,serial的使用流程跟平常的类似,也是打开、关闭、读、写
一般就是设置端口,波特率。
使用serial.Serial创建实体的时候会去打开串口,之后可以使用is_open开判断下是否串口是否打开正常。
使用ser.close即可关闭串口
数据的写使用ser.write接口,如果写的是十六进制的数据使用bytearray来定义,如 writebuf = bytearray([0x55, 0xaa, 0x00, 0x01, 0x00, 0x00])
读数据使用ser.read接口,一般会先使用in_waiting来判断下是否有数据,然后开始读
下面举一个例子,说明下我们在实际的使用情况。
一般会单独创建一个进程来作为数据的接收,然后再配合上标记位或者信号量来处理逻辑
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)