一、说明
pyserial封装了python环境下对串口的访问,其兼容各种平台,并有统一的 *** 作接口。通过python属性访问串口设置,并可对串口的各种配置参数(如串口名,波特率、停止校验位、流控、超时等等)做修改,再进行串口通信的类与接口封装后,非常方便地被调用和移植。
二、模块安装
pip insatll pyserial
三、初始化与参数说明
import serial
ser = serial.Serial('COM3', 115200, timeout=0.5, ....................)
下面看看 serial.Serial 原生类
四、不同平台下初始化
ser=serial.Serial("/dev/ttyUSB0",9600,timeout=0.5)#使用USB连接串行口ser=serial.Serial("/dev/ttyAMA0",9600,timeout=0.5)#使用树莓派的GPIO口连接串行口ser=serial.Serial(1,9600,timeout=0.5)#winsows系统使用COM1口连接串行口ser=serial.Serial("COM1",9600,timeout=0.5)#winsows系统使用COM1口连接串行口ser=serial.Serial("/dev/ttyS1",9600,timeout=0.5)#Linux系统使用COM1口连接串行口
五、串口属性
ser.name #串口名称
ser.port #端口号
ser.baudrate #波特率
ser.bytesize #字节大小
ser.parity #校验位N-无校验,E-偶校验,O-奇校验
ser.stopbits #停止位
ser.timeout #读超时设置
ser.writeTimeout #写超时
ser.xonxoff #软件流控
ser.rtscts #硬件流控
ser.dsrdtr #硬件流控
ser.interCharTimeout #字符间隔超时
六、串口常用方法
isOpen():查看端口是否被打开。
open() :打开端口‘。
close():关闭端口。
read(size=1):从端口读字节数据。默认1个字节。
read_all():从端口接收全部数据。
write(data):向端口写数据。
readline():读一行数据。
readlines():读多行数据。
in_waiting():返回输入缓存中的字节数。
out_waiting():返回输出缓存中的字节数。
flush():等待所有数据写出。
flushInput():丢弃接收缓存中的所有数据。
flushOutput():终止当前写 *** 作,并丢弃发送缓存中的数据。
sendBreadk(duration=0.25):发送BREAK条件,并于duration时间之后返回IDLE
setBreak(level=True):根据level设置break条件。
setRTS(level=True):设置请求发送(RTS)的控制信号
setDTR(level=True):设置数据终端准备就绪的控制信号
七、类与接口封装
import time
import serial
import serial.tools.list_ports
# 串口 *** 作类
class serialCommunication(object):
def __init__(self, port, bps, timeout):# 可配置更多参数
port_list =self.show_usable_com()
if len(port_list) >0:
if portnot in port_list:
self.port = port_list[0]
else:
self.port = port
else:
print("no usable serial, please plugin your serial board")
return
self.bps = bps
self.timeout = timeout
try:
# 初始化串口,并得到串口对象,根据需要可拓展更多参数
self.ser = serial.Serial(self.port, self.bps, 8, 'N', 1, timeout=self.timeout, write_timeout=self.timeout)
except Exception as e:# 抛出异常
print("Exception={}".format(e))
# 显示可用串口列表
@staticmethod
def show_usable_com():
serialport_list = []
portInfo_list =list(serial.tools.list_ports.comports())
if len(portInfo_list) <=0:
print("can not find any serial port!")
else:
print(portInfo_list)
for i in range(len(portInfo_list)):
plist =list(portInfo_list[i])
print(plist)
serialport_list.append(plist[0])
print(serialport_list)
return serialport_list
# 输出串口基本信息
def serial_infor(self):
print(self.ser.name)# 设备名字
print(self.ser.port)# 读或者写端口
print(self.ser.baudrate)# 波特率
print(self.ser.bytesize)# 字节大小
print(self.ser.parity)# 校验位
print(self.ser.stopbits)# 停止位
print(self.ser.timeout)# 读超时设置
print(self.ser.writeTimeout)# 写超时
print(self.ser.xonxoff)# 软件流控
print(self.ser.rtscts)# 软件流控
print(self.ser.dsrdtr)# 硬件流控
print(self.ser.interCharTimeout)# 字符间隔超时
# 打开串口
def serial_open(self):
try:
if not self.ser.isOpen():
self.ser.open()
except Exception as e:# 抛出异常
print("serial_open Exception={}".format(e))
self.ser.close()
# 读取指定大小的数据
# 从串口读size个字节。如果指定超时,则可能在超时后返回较少的字节;如果没有指定超时,则会一直等到收完指定的字节数。
def serial_read_with_size(self, size):
try:
self.serial_open()
return self.ser.read(size).decode("utf-8")
except Exception as e:
print("serial_read_all Exception={}".format(e))
self.ser.close()
# 读取当前串口缓存中的所有数据
def serial_read_data(self):
try:
self.serial_open()
datalen =self.ser.inWaiting()
if datalen ==0:
return None
return self.ser.read(datalen).decode("utf-8")
except Exception as e:
print("serial_read_data Exception={}".format(e))
self.ser.close()
# 读串口全部数据,注意timeout的设置
# 在设定的timeout时间范围内,如果读取的字节数据是有效的(就是非空)那就直接返回,
# 否则一直会等到设定的timeout时间并返回这段时间所读的全部字节数据。
def serial_read_all(self):
try:
self.serial_open()
return self.ser.read_all().decode("utf-8")
except Exception as e:
print("serial_read_all Exception={}".format(e))
self.ser.close()
# 读一行数据
# 使用readline()时应该注意:打开串口时应该指定超时,否则如果串口没有收到新行,则会一直等待。
# 如果没有超时,readline会报异常。
def serial_read_line(self):
try:
self.serial_open()
return self.ser.readline().decode("utf-8")
except Exception as e:
print("serial_read_line Exception={}".format(e))
self.ser.close()
# 读多行数据,返回行列表
def serial_read_lines(self):
try:
self.serial_open()
return self.ser.readlines().decode("utf-8")
except Exception as e:
print("serial_read_lines Exception={}".format(e))
self.ser.close()
# 写数据
def serial_write_data(self, data):
try:
self.serial_open()
self.ser.flushOutput()
data_len =self.ser.write(data.encode('utf-8'))
return data_len
except Exception as e:
print("serial_write_data Exception={}".format(e))
return 0
# 写行数据,注意参数差异
def serial_write_lines(self, lines):
self.ser.writelines(lines)
# 清除串口缓存
def serial_clean(self):
try:
if self.ser.isOpen():
self.ser.flush()
except Exception as e:
print("serial_clean Exception={}".format(e))
# 关闭串口
def serial_close(self):
try:
if self.ser.isOpen():
self.ser.close()
except Exception as e:
print("serial_clean Exception={}".format(e))
if __name__ =='__main__':
testSerial = serialCommunication("COM10", 1500000, 0.5)
testSerial.serial_open()
testSerial.serial_infor()
testSerial.serial_write_data("ifconfig eth0\n")
time.sleep(0.1)
data = testSerial.serial_read_all()
print(data)
testSerial.serial_close()
八、其他
1)ser.VERSION表示pyserial版本 另外,ser.name表示设备名称
2)端口设置可以被读入字典,也可从字典加载设置:
getSettingDict():返回当前串口设置的字典
applySettingDict(d):应用字典到串口设置
3) Readline()是读一行,以/n结束,要是没有/n就一直读,阻塞。注意:打开串口时应该指定超时,否则如果串口没有收到新行,则会一直等待。
4)serial.read_all 与 serial.read_all()区别
serial.read_all:读取串口所有的参数信息
serial.read_all():超时时间内从串口读取的所有数据
5) 异常信息
exception serial.SerialException
exception serial.SerialTimeoutException
python里面使用serial库来 *** 作串口,serial的使用流程跟平常的类似,也是打开、关闭、读、写
一般就是设置端口,波特率。
使用serial.Serial创建实体的时候会去打开串口,之后可以使用is_open开判断下是否串口是否打开正常。
使用ser.close即可关闭串口
数据的写使用ser.write接口,如果写的是十六进制的数据使用bytearray来定义,如 writebuf = bytearray([0x55, 0xaa, 0x00, 0x01, 0x00, 0x00])
读数据使用ser.read接口,一般会先使用in_waiting来判断下是否有数据,然后开始读
下面举一个例子,说明下我们在实际的使用情况。
一般会单独创建一个进程来作为数据的接收,然后再配合上标记位或者信号量来处理逻辑
打开串口后启动一个线程来监听串口数据的进入,有数据时,就做数据的处理。
用python写串口通信程序的示例:
#coding=gb18030
import sys,threading,time
import serial
import binascii,encodings
import re
import socket
class ReadThread:
def __init__(self, Output=None, Port=0, Log=None, i_FirstMethod=True):
self.l_serial = None
self.alive = False
self.waitEnd = None
self.bFirstMethod = i_FirstMethod
self.sendport = ''
self.log = Log
self.output = Output
self.port = Port
self.re_num = None
def waiting(self):
if not self.waitEnd is None:
self.waitEnd.wait()
def SetStopEvent(self):
if not self.waitEnd is None:
self.waitEnd.set()
self.alive = False
self.stop()
def start(self):
self.l_serial = serial.Serial()
self.l_serial.port = self.port
self.l_serial.baudrate = 9600
self.l_serial.timeout = 2
self.re_num = re.compile('\d')
try:
if not self.output is None:
self.output.WriteTex
if not self.log is None:
self.log.info
self.l_serial.open()
except Exception, ex:
if self.l_serial.isOpen():
self.l_serial.close()
self.l_serial = None
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.error(u'%s' % ex)
return False
if self.l_serial.isOpen():
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.info
self.waitEnd = threading.Event()
self.alive = True
self.thread_read = None
self.thread_read = threading.Thread(target=self.FirstReader)
self.thread_read.setDaemon(1)
self.thread_read.start()
return True
else:
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.info
return False
def InitHead(self):
try:
time.sleep(3)
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.info
self.l_serial.flushInput()
self.l_serial.write('\x11')
data1 = self.l_serial.read(1024)
except ValueError,ex:
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.error(u'%s' % ex)
self.SetStopEvent()
return
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.info
self.output.WriteText(u'===================================\r\n')
def SendData(self, i_msg):
lmsg = ''
isOK = False
if isinstance(i_msg, unicode):
lmsg = i_msg.encode('gb18030')
lmsg = i_msg
pass
except Exception, ex:
pass
return isOK
def FirstReader(self):
data1 = ''
isQuanJiao = True
isFirstMethod = True
isEnd = True
readCount = 0
saveCount = 0
RepPos = 0
#read Head Infor content
self.InitHead()
while self.alive:
try:
data = ''
n = self.l_serial.inWaiting()
if n:
data = data + self.l_serial.read(n)
#print binascii.b2a_hex(data),
for l in xrange(len(data)):
if ord(data[l])==0x8E:
isQuanJiao = True
continue
if ord(data[l])==0x8F:
isQuanJiao = False
continue
if ord(data[l]) == 0x80 or ord(data[l]) == 0x00:
if len(data1)>10:
if not self.re_num.search(data1,1) is None:
saveCount = saveCount + 1
if RepPos==0:
RepPos = self.output.GetInsertionPoint()
self.output.Remove(RepPos,self.output.GetLastPosition())
self.SendData(data1)
data1 = ''
continue
except Exception, ex:
if not self.log is None:
self.log.error(u'%s' % ex)
self.waitEnd.set()
self.alive = False
def stop(self):
self.alive = False
self.thread_read.join()
if self.l_serial.isOpen():
self.l_serial.close()
if not self.output is None:
self.output.WriteText
if not self.log is None:
self.log.info
def printHex(self, s):
s1 = binascii.b2a_hex(s)
print s1
if __name__ == '__main__':
rt = ReadThread()
f = open("sendport.cfg", "r")
rt.sendport = f.read()
f.close()
try:
if rt.start():
rt.waiting()
rt.stop()
else:
pass
except Exception,se:
print str(se)
if rt.alive:
rt.stop()
print 'End OK .'
del rt
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)