如何用python写个串口通信的程序

如何用python写个串口通信的程序,第1张

就芦含是打开串口后,启动一个线程来监听串口数据的进入,有数据时,就做数据的处理(也可以发送一个事件,并携带接收到的数据)。

我没有用到串口处理太深的东西。

客户的原程序不能给你,不过我给你改一下吧。

里面的一些东西,已经经过了处理,要运行,可能你要运哗或自己改一下,把没有用的东西去掉。

我这里已经没有串口设备了,不能调了,你自己处理一下吧,不过旁伍基本的东西已经有了。

=================================================================

#coding=gb18030

import sys,threading,time

import serial

import binascii,encodings

import re

import socket

class ReadThread:

def __init__(self, Output=None, Port=0, Log=None, i_FirstMethod=True):

self.l_serial = None

self.alive = False

self.waitEnd = None

self.bFirstMethod = i_FirstMethod

self.sendport = ''

self.log = Log

self.output = Output

self.port = Port

self.re_num = None

def waiting(self):

if not self.waitEnd is None:

self.waitEnd.wait()

def SetStopEvent(self):

if not self.waitEnd is None:

self.waitEnd.set()

self.alive = False

self.stop()

def start(self):

self.l_serial = serial.Serial()

self.l_serial.port = self.port

self.l_serial.baudrate = 9600

self.l_serial.timeout = 2

self.re_num = re.compile('\d')

try:

if not self.output is None:

self.output.WriteText(u'打开通讯端口\r\n')

if not self.log is None:

self.log.info(u'打开通讯端口')

self.l_serial.open()

except Exception, ex:

if self.l_serial.isOpen():

self.l_serial.close()

self.l_serial = None

if not self.output is None:

self.output.WriteText(u'出错:\r\n%s\r\n' % ex)

if not self.log is None:

self.log.error(u'%s' % ex)

return False

if self.l_serial.isOpen():

if not self.output is None:

self.output.WriteText(u'创建接收任务\r\n')

if not self.log is None:

self.log.info(u'创建接收任务')

self.waitEnd = threading.Event()

self.alive = True

self.thread_read = None

self.thread_read = threading.Thread(target=self.FirstReader)

self.thread_read.setDaemon(1)

self.thread_read.start()

return True

else:

if not self.output is None:

self.output.WriteText(u'通讯端口未打开\r\n')

if not self.log is None:

self.log.info(u'通讯端口未打开')

return False

def InitHead(self):

#串口的其它的一些处理

try:

time.sleep(3)

if not self.output is None:

self.output.WriteText(u'数据接收任务开始连接网络\r\n')

if not self.log is None:

self.log.info(u'数据接收任务开始连接网络')

self.l_serial.flushInput()

self.l_serial.write('\x11')

data1 = self.l_serial.read(1024)

except ValueError,ex:

if not self.output is None:

self.output.WriteText(u'出错:\r\n%s\r\n' % ex)

if not self.log is None:

self.log.error(u'%s' % ex)

self.SetStopEvent()

return

if not self.output is None:

self.output.WriteText(u'开始接收数据\r\n')

if not self.log is None:

self.log.info(u'开始接收数据')

self.output.WriteText(u'===================================\r\n')

def SendData(self, i_msg):

lmsg = ''

isOK = False

if isinstance(i_msg, unicode):

lmsg = i_msg.encode('gb18030')

else:

lmsg = i_msg

try:

#发送数据到相应的处理组件

pass

except Exception, ex:

pass

return isOK

def FirstReader(self):

data1 = ''

isQuanJiao = True

isFirstMethod = True

isEnd = True

readCount = 0

saveCount = 0

RepPos = 0

#read Head Infor content

self.InitHead()

while self.alive:

try:

data = ''

n = self.l_serial.inWaiting()

if n:

data = data + self.l_serial.read(n)

#print binascii.b2a_hex(data),

for l in xrange(len(data)):

if ord(data[l])==0x8E:

isQuanJiao = True

continue

if ord(data[l])==0x8F:

isQuanJiao = False

continue

if ord(data[l]) == 0x80 or ord(data[l]) == 0x00:

if len(data1)>10:

if not self.re_num.search(data1,1) is None:

saveCount = saveCount + 1

if RepPos==0:

RepPos = self.output.GetInsertionPoint()

self.output.Remove(RepPos,self.output.GetLastPosition())

self.SendData(data1)

data1 = ''

continue

except Exception, ex:

if not self.log is None:

self.log.error(u'%s' % ex)

self.waitEnd.set()

self.alive = False

def stop(self):

self.alive = False

self.thread_read.join()

if self.l_serial.isOpen():

self.l_serial.close()

if not self.output is None:

self.output.WriteText(u'关闭通迅端口:[%d] \r\n' % self.port)

if not self.log is None:

self.log.info(u'关闭通迅端口:[%d]' % self.port)

def printHex(self, s):

s1 = binascii.b2a_hex(s)

print s1

#测试用部分

if __name__ == '__main__':

rt = ReadThread()

f = open("sendport.cfg", "r")

rt.sendport = f.read()

f.close()

try:

if rt.start():

rt.waiting()

rt.stop()

else:

pass

except Exception,se:

print str(se)

if rt.alive:

rt.stop()

print 'End OK .'

del rt

Python非常适合写一些测试的脚本,如快速的串口通信测试等。如果使用VC++ QT开发,可能用时较多,使用python,如果掌握使用方法,可以直接读写测试,配合设备或是串口助手,很快验证与实现。

Python有没有现成的串口API直接调用呢?经过实践验证,需要安装一个叫 Pyserial的组件即可。这个可以在github上下载。

在windows 7 64bit 上可以使用吗?当然可以使用,我安装的python3.5为64位的。把下载后的文件,其中有一个serial的文件夹,拷贝到python35安装路径, C:\Python35\Lib\site-packages\serial

网上可以搜一下windows的安装包,安装完也是:C:\Python35\Lib\site-packages\serial ,可以用最新的版本,替换即可。

测试的方法:在python IDE里测试:

>>>import serial

这里如果报错,是州岩python版本与pyserial版本没有配合肆腔好。如果正常,不返回,即可以导入serial模块。

>>>ser=serial.Serial("COM5",115200)

这里为COM5,115200的波特率。如果打不开,请检查安装环境。

>>>ser.write('hello,serial test'.encode())

17

发送测试(如果返回字节数,说明返回成功),这里需要转换一个编码为字节。

以上测试,可以使用现在的设备或是串口助手,如安装Virtual Serial Port Driver 7.2 虚拟串口软件,设置一对串口,进行自发自收的测试。

>>>print(ser.read()line())

b'abcdefg\r\n'

这里是串口接收,有接收的超时。设备或是串口助手发送一个字符串,以回车换行结束,这里就可以收到打印出来。

也可以用ser.read(),这里只接收一个字符来实现。

上面已经实现了基本的串口 *** 作。

关闭串口为:裂迹衫

>>>ser.close()

如果使用python,一般写个py文件,就像windows bat 批处理一样,这是python强大的地方。如果写一个py脚本呢?其实只要把上面的命令,一条条写下来,就是一个脚本,测试如下:

import serialser=serial.Serial("COM5",115200,timeout=0.5)for i in range(0,100-1):ser.write('hello\r\n'.encode())print(ser.readline())ser.close()


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/12351262.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-24
下一篇 2023-05-24

发表评论

登录后才能评论

评论列表(0条)

保存