python - serial communication(串口通信)

python - serial communication(串口通信),第1张

由于测试工作的需要,在C端产品上经常使用串口进行通信,而测试脚本大部分时候又采用python编写,于是就不得不了解并熟悉python下的串口通信实现方法了,整理如下以备随时使用:

一、说明

pyserial封装了python环境下对串口的访问,其兼容各种平台,并有统一的 *** 作接口。通过python属性访问串口设置,并可对串口的各种配置参数(如串口名,波特率、停止校验位、流控、超时等等)做修改,再进行串口通信的类与接口封装后,非常方便地被调用和移植。

二、模块安装

pip insatll pyserial

三、初始化与参数说明

import serial

ser = serial.Serial('COM3', 115200, timeout=0.5, ....................)

下面看看 serial.Serial 原生类

四、不同平台下初始化

ser=serial.Serial("/dev/ttyUSB0",9600,timeout=0.5)#使用USB连接串行口ser=serial.Serial("/dev/ttyAMA0",9600,timeout=0.5)#使用树莓派的GPIO口连接串行口ser=serial.Serial(1,9600,timeout=0.5)#winsows系统使用COM1口连接串行口ser=serial.Serial("COM1",9600,timeout=0.5)#winsows系统使用COM1口连接串行口ser=serial.Serial("/dev/ttyS1",9600,timeout=0.5)#Linux系统使用COM1口连接串行口

五、串口属性

ser.name #串口名称

ser.port #端口号

ser.baudrate #波特率

ser.bytesize #字节大小

ser.parity #校验位N-无校验,E-偶校验,O-奇校验

ser.stopbits #停止位

ser.timeout #读超时设置

ser.writeTimeout #写超时

ser.xonxoff #软件流控

ser.rtscts #硬件流控

ser.dsrdtr #硬件流控

ser.interCharTimeout #字符间隔超时 

六、串口常用方法

isOpen():查看端口是否被打开。

open() :打开端口‘。

close():关闭端口。

read(size=1):从端口读字节数据。默认1个字节。

read_all():从端口接收全部数据。

write(data):向端口写数据。

readline():读一行数据。

readlines():读多行数据。

in_waiting():返回输入缓存中的字节数。

out_waiting():返回输出缓存中的字节数。

flush():等待所有数据写出。

flushInput():丢弃接收缓存中的所有数据。

flushOutput():终止当前写 *** 作,并丢弃发送缓存中的数据。

sendBreadk(duration=0.25):发送BREAK条件,并于duration时间之后返回IDLE

setBreak(level=True):根据level设置break条件。

setRTS(level=True):设置请求发送(RTS)的控制信号

setDTR(level=True):设置数据终端准备就绪的控制信号

七、类与接口封装

import time

import serial

import serial.tools.list_ports

# 串口 *** 作类

class serialCommunication(object):

    def __init__(self, port, bps, timeout):# 可配置更多参数

        port_list =self.show_usable_com()

        if len(port_list) >0:

            if portnot in port_list:

                self.port = port_list[0]

            else:

                self.port = port

        else:

            print("no usable serial, please plugin your serial board")

            return

        self.bps = bps

        self.timeout = timeout

        try:

            # 初始化串口,并得到串口对象,根据需要可拓展更多参数

            self.ser = serial.Serial(self.port, self.bps, 8, 'N', 1, timeout=self.timeout, write_timeout=self.timeout)

        except Exception as e:# 抛出异常

            print("Exception={}".format(e))

    # 显示可用串口列表

    @staticmethod

    def show_usable_com():

        serialport_list = []

        portInfo_list =list(serial.tools.list_ports.comports())

        if len(portInfo_list) <=0:

            print("can not find any serial port!")

        else:

            print(portInfo_list)

            for i in range(len(portInfo_list)):

                plist =list(portInfo_list[i])

                print(plist)

                serialport_list.append(plist[0])

            print(serialport_list)

            return serialport_list

# 输出串口基本信息

    def serial_infor(self):

        print(self.ser.name)# 设备名字

        print(self.ser.port)# 读或者写端口

        print(self.ser.baudrate)# 波特率

        print(self.ser.bytesize)# 字节大小

        print(self.ser.parity)# 校验位

        print(self.ser.stopbits)# 停止位

        print(self.ser.timeout)# 读超时设置

        print(self.ser.writeTimeout)# 写超时

        print(self.ser.xonxoff)# 软件流控

        print(self.ser.rtscts)# 软件流控

        print(self.ser.dsrdtr)# 硬件流控

        print(self.ser.interCharTimeout)# 字符间隔超时

    # 打开串口

    def serial_open(self):

        try:

            if not self.ser.isOpen():

                self.ser.open()

        except Exception as e:# 抛出异常

            print("serial_open Exception={}".format(e))

            self.ser.close()

    # 读取指定大小的数据

    # 从串口读size个字节。如果指定超时,则可能在超时后返回较少的字节;如果没有指定超时,则会一直等到收完指定的字节数。

    def serial_read_with_size(self, size):

        try:

            self.serial_open()

            return self.ser.read(size).decode("utf-8")

        except Exception as e:

            print("serial_read_all Exception={}".format(e))

            self.ser.close()

    # 读取当前串口缓存中的所有数据

    def serial_read_data(self):

        try:

            self.serial_open()

            datalen =self.ser.inWaiting()

            if datalen ==0:

                return None

            return self.ser.read(datalen).decode("utf-8")

        except Exception as e:

            print("serial_read_data Exception={}".format(e))

            self.ser.close()

    # 读串口全部数据,注意timeout的设置

    # 在设定的timeout时间范围内,如果读取的字节数据是有效的(就是非空)那就直接返回,

    # 否则一直会等到设定的timeout时间并返回这段时间所读的全部字节数据。

    def serial_read_all(self):

        try:

            self.serial_open()

            return self.ser.read_all().decode("utf-8")

        except Exception as e:

            print("serial_read_all Exception={}".format(e))

            self.ser.close()

    # 读一行数据

    # 使用readline()时应该注意:打开串口时应该指定超时,否则如果串口没有收到新行,则会一直等待。

    # 如果没有超时,readline会报异常。

    def serial_read_line(self):

        try:

            self.serial_open()

            return self.ser.readline().decode("utf-8")

        except Exception as e:

            print("serial_read_line Exception={}".format(e))

            self.ser.close()

    # 读多行数据,返回行列表

    def serial_read_lines(self):

        try:

            self.serial_open()

            return self.ser.readlines().decode("utf-8")

        except Exception as e:

            print("serial_read_lines Exception={}".format(e))

            self.ser.close()

    # 写数据

    def serial_write_data(self, data):

        try:

            self.serial_open()

            self.ser.flushOutput()

            data_len =self.ser.write(data.encode('utf-8'))

            return data_len

        except Exception as e:

            print("serial_write_data Exception={}".format(e))

            return 0

    # 写行数据,注意参数差异

    def serial_write_lines(self, lines):

        self.ser.writelines(lines)

    # 清除串口缓存

    def serial_clean(self):

        try:

            if self.ser.isOpen():

                self.ser.flush()

        except Exception as e:

            print("serial_clean Exception={}".format(e))

    # 关闭串口

    def serial_close(self):

        try:

            if self.ser.isOpen():

                self.ser.close()

        except Exception as e:

            print("serial_clean Exception={}".format(e))

if __name__ =='__main__':

    testSerial = serialCommunication("COM10", 1500000, 0.5)

    testSerial.serial_open()

    testSerial.serial_infor()

    testSerial.serial_write_data("ifconfig eth0\n")

    time.sleep(0.1)

    data = testSerial.serial_read_all()

    print(data)

    testSerial.serial_close()

八、其他

1)ser.VERSION表示pyserial版本 另外,ser.name表示设备名称

2)端口设置可以被读入字典,也可从字典加载设置:

    getSettingDict():返回当前串口设置的字典

    applySettingDict(d):应用字典到串口设置

3) Readline()是读一行,以/n结束,要是没有/n就一直读,阻塞。注意:打开串口时应该指定超时,否则如果串口没有收到新行,则会一直等待。

4)serial.read_all 与 serial.read_all()区别

    serial.read_all:读取串口所有的参数信息

    serial.read_all():超时时间内从串口读取的所有数据

5) 异常信息

    exception serial.SerialException

    exception serial.SerialTimeoutException

python里面使用serial库来 *** 作串口,serial的使用流程跟平常的类似,也是打开、关闭、读、写

一般就是设置端口,波特率。

使用serial.Serial创建实体的时候会去打开串口,之后可以使用is_open开判断下是否串口是否打开正常。

使用ser.close即可关闭串口

数据的写使用ser.write接口,如果写的是十六进制的数据使用bytearray来定义,如 writebuf = bytearray([0x55, 0xaa, 0x00, 0x01, 0x00, 0x00])

读数据使用ser.read接口,一般会先使用in_waiting来判断下是否有数据,然后开始读

下面举一个例子,说明下我们在实际的使用情况。

一般会单独创建一个进程来作为数据的接收,然后再配合上标记位或者信号量来处理逻辑

打开串口后启动一个线程来监听串口数据的进入,有数据时,就做数据的处理。

用python写串口通信程序的示例:

#coding=gb18030

import sys,threading,time

import serial

import binascii,encodings

import re

import socket

class ReadThread:

def __init__(self, Output=None, Port=0, Log=None, i_FirstMethod=True):

self.l_serial = None

self.alive = False

self.waitEnd = None

self.bFirstMethod = i_FirstMethod

self.sendport = ''

self.log = Log

self.output = Output

self.port = Port

self.re_num = None

def waiting(self):

if not self.waitEnd is None:

self.waitEnd.wait()

def SetStopEvent(self):

if not self.waitEnd is None:

self.waitEnd.set()

self.alive = False

self.stop()

def start(self):

self.l_serial = serial.Serial()

self.l_serial.port = self.port

self.l_serial.baudrate = 9600

self.l_serial.timeout = 2

self.re_num = re.compile('\d')

try:

if not self.output is None:

self.output.WriteTex

if not self.log is None:

self.log.info

self.l_serial.open()

except Exception, ex:

if self.l_serial.isOpen():

self.l_serial.close()

self.l_serial = None

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.error(u'%s' % ex)

return False

if self.l_serial.isOpen():

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.info

self.waitEnd = threading.Event()

self.alive = True

self.thread_read = None

self.thread_read = threading.Thread(target=self.FirstReader)

self.thread_read.setDaemon(1)

self.thread_read.start()

return True

else:

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.info

return False

def InitHead(self):

try:

time.sleep(3)

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.info

self.l_serial.flushInput()

self.l_serial.write('\x11')

data1 = self.l_serial.read(1024)

except ValueError,ex:

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.error(u'%s' % ex)

self.SetStopEvent()

return

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.info

self.output.WriteText(u'===================================\r\n')

def SendData(self, i_msg):

lmsg = ''

isOK = False

if isinstance(i_msg, unicode):

lmsg = i_msg.encode('gb18030')

lmsg = i_msg

pass

except Exception, ex:

pass

return isOK

def FirstReader(self):

data1 = ''

isQuanJiao = True

isFirstMethod = True

isEnd = True

readCount = 0

saveCount = 0

RepPos = 0

#read Head Infor content

self.InitHead()

while self.alive:

try:

data = ''

n = self.l_serial.inWaiting()

if n:

data = data + self.l_serial.read(n)

#print binascii.b2a_hex(data),

for l in xrange(len(data)):

if ord(data[l])==0x8E:

isQuanJiao = True

continue

if ord(data[l])==0x8F:

isQuanJiao = False

continue

if ord(data[l]) == 0x80 or ord(data[l]) == 0x00:

if len(data1)>10:

if not self.re_num.search(data1,1) is None:

saveCount = saveCount + 1

if RepPos==0:

RepPos = self.output.GetInsertionPoint()

self.output.Remove(RepPos,self.output.GetLastPosition())

self.SendData(data1)

data1 = ''

continue

except Exception, ex:

if not self.log is None:

self.log.error(u'%s' % ex)

self.waitEnd.set()

self.alive = False

def stop(self):

self.alive = False

self.thread_read.join()

if self.l_serial.isOpen():

self.l_serial.close()

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.info

def printHex(self, s):

s1 = binascii.b2a_hex(s)

print s1

if __name__ == '__main__':

rt = ReadThread()

f = open("sendport.cfg", "r")

rt.sendport = f.read()

f.close()

try:

if rt.start():

rt.waiting()

rt.stop()

else:

pass

except Exception,se:

print str(se)

if rt.alive:

rt.stop()

print 'End OK .'

del rt


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/11215835.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-14
下一篇 2023-05-14

发表评论

登录后才能评论

评论列表(0条)

保存