python基础(21)-线程通信

python基础(21)-线程通信,第1张

到这里,我们要聊一下线程通信的内容;

首先,我们抛开语言不谈,先看看比较基础的东西,线程间通信的方式;其实也就是哪几种(我这里说的,是我的所谓的知道的。。。)事件,消息队列,信号量,条件变量(锁算不算?我只是认为是同步的一种);所以我们也就是要把这些掌握了,因为各有各的好处嘛;

条件变量我放到了上面的线程同步里面讲了,我总感觉这算是同步的一种,没有很多具体信息的沟通;同时吧,我认为条件变量比较重要,因为这种可以应用于线程池的 *** 作上;所以比较重要;这里,抛开条件变量不谈,我们看看其他的东西;

1、消息队列:

queue 模块下提供了几个阻塞队列,这些队列主要用于实现线程通信。在 queue 模块下主要提供了三个类,分别代表三种队列,它们的主要区别就在于进队列、出队列的不同。

关于这三个队列类的简单介绍如下:

queue.Queue(maxsize=0):代表 FIFO(先进先出)的常规队列,maxsize 可以限制队列的大小。如果队列的大小达到队列的上限,就会加锁,再次加入元素时就会被阻塞,直到队列中的元素被消费。如果将 maxsize 设置为 0 或负数,则该队列的大小就是无限制的。

queue.LifoQueue(maxsize=0):代表 LIFO(后进先出)的队列,与 Queue 的区别就是出队列的顺序不同。

PriorityQueue(maxsize=0):代表优先级队列,优先级最小的元素先出队列。

这三个队列类的属性和方法基本相同, 它们都提供了如下属性和方法:

Queue.qsize():返回队列的实际大小,也就是该队列中包含几个元素。

Queue.empty():判断队列是否为空。

Queue.full():判断队列是否已满。

Queue.put(item, block=True, timeout=None):向队列中放入元素。如果队列己满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到该队列的元素被消费;如果队列己满,且 block 参数为 False(不阻塞),则直接引发 queue.FULL 异常。

Queue.put_nowait(item):向队列中放入元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。

Queue.get(item, block=True, timeout=None):从队列中取出元素(消费元素)。如果队列已满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到有元素被放入队列中; 如果队列己空,且 block 参数为 False(不阻塞),则直接引发 queue.EMPTY 异常。

Queue.get_nowait(item):从队列中取出元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。

其实我们想想,这个队列,是python进行封装的,那么我们可以用在线程间的通信;同时也是可以用做一个数据结构;先进先出就是队列,后进先出就是栈;我们用这个栈写个十进制转二进制的例子:

没毛病,可以正常的打印;其中需要注意的就是,maxsize在初始化的时候如果是0或者是个负数的话,那么就会是不限制大小;

那么其实我们想想,我们如果用做线程通信的话,我们两个线程,可以把队列设置为1的大小,如果是1对多,比如是创建者和消费者的关系,我们完全可以作为消息队列,比如说创建者一直在创建一些东西,然后放入到消息队列里面,然后供消费着使用;就是一个很好的例子;所以,其实说是消息队列,也就是队列,没差;

=====================================================================

下面来看一下事件

Event 是一种非常简单的线程通信机制,一个线程发出一个 Event,另一个线程可通过该 Event 被触发。

Event 本身管理一个内部旗标,程序可以通过 Event 的 set() 方法将该旗标设置为 True,也可以调用 clear() 方法将该旗标设置为 False。程序可以调用 wait() 方法来阻塞当前线程,直到 Event 的内部旗标被设置为 True。

Event 提供了如下方法:

is_set():该方法返回 Event 的内部旗标是否为True。

set():该方法将会把 Event 的内部旗标设置为 True,并唤醒所有处于等待状态的线程。

clear():该方法将 Event 的内部旗标设置为 False,通常接下来会调用 wait() 方法来阻塞当前线程。

wait(timeout=None):该方法会阻塞当前线程。

这里我想解释一下;其实对于事件来说,事件可以看成和条件变量是一样的,只是我们说说不一样的地方;

1、对于事件来说,一旦触发了事件,也就是说,一旦set为true了,那么就会一直为true,需要clear调内部的标志,才能继续wait;但是conditon不是,他是一次性的唤醒其他线程;

2、conditon自己带锁;事件呢?不是的;没有自己的锁;比如说有一个存钱的线程,有一个是取钱的线程;那么存钱的线程要存钱;需要怎么办呢?1、发现银行没有钱了(is_set判断);2、锁住银行;3、存钱;4、释放银行;5、唤醒事件;对于取钱的人;1、判断是否有钱;2、被唤醒了,然后锁住银行;3、开始取钱;4、清理告诉存钱的人,我没钱了(clear);5、释放锁;6、等着钱存进去;

其实说白了,就是记住一点;这个旗标需要自己clear就对了

写个例子,怕以后忘了怎么用;

其实时间和信号量比较像;但是信号量不用自己清除标志位;但是事件是需要的;

通过python的网络通信支持,通过网络模块,python程序可以非常方便地相互访问互联网上的HTTP服务和FTP服务等。可以直接获取互联网上的远程资源,还可以向远程资源发送GET POST请求。

计算机网络是线代通信技术与计算机技术相结合的产物,计算机网络主要可以提供

通信协议一般由三部分组成:一是语义部分,用于决定双方对话类型;二是语法部分,用于决定双方对话的格式;三是变化规则,用于决定通信双方的应答关系。

应用层:与其它计算机进行通讯的一个应用,它是对应应用程序的通信服务的。有HTTP, FTP , NFS, SMTP, TELNET

表示层:这一层主要是定义数据格式及加密。如加密, ASCII

会话层:它定义了如何开始、控制和结束一个会话,包括对多个双向消息的控制和管理,以便在只完成连续消息的一部分时可以通知应用,从而使表示层看到的数据是连续的。如 RPC,SQL

传输层:这层的功能包括是否选择差错恢复协议还是无差错恢复协议,及在泳衣主机上对不同应用的数据流的输入进行复用,还包括对收到的顺序不对的数据包的重新排序功能,如 TCP UDP SPX

网络层:这层对端对端的包传输进行定义,它定义了能够标识所有结点的逻辑地址,还定义了路由实现的方式和学习的方式。如IP

数据链路层:它定义了在单个链路上如何传输数据。这些协议与被讨论的各种介质有关

物理层:OSI的物理层规范是有关传输介质的特性,这些规范通常也参考了其他组织制定的标准。

IP地址用于唯一标识网络中的一个通信实体,这个通信实体既可以是一个主机,也可以是路由器的某个端口,。而在基于IP协议的网络中传输数据包都必须使用IP地址来进行标识。

端口,程序与外界进行交互的出入口。

Tcp/IP通信协议是一种可靠的网络协议,他在通信的两端建立一个socket,从而形成虚拟的网络链路。一旦建立了虚拟网络链路,两端的程序就可以通过该链路进行通信。

IP 是Internet上使用的一个关键协议,通过IP协议,使internet成为一个允许连接不同类型的计算机和不同 *** 作系统的网络。同时还需要TCP协议来提供可靠且无差错的服务。

TCP协议被称为端对端协议,这是因为他在两台计算机的连接中起了非常重要的角色,当一台计算机需要与另外一台计算机连接时,TCP协议会让他们之间建立一个虚拟链路,用于发送和接受数据。

TCP协议负责收集这些数据包,并将其按照适当的顺序传送,接收端收到数据包后将其正确的还原。TCP保证数据包在传送过程中准确无误。TCP协议采用重发机制,当一个通信实体发送一个消息给另外一个通信实体后,需要接收到另外一个通信实体的确认信息,如果没有接收到该确认信息,则会重发信息。

使用socket之前,必须先创建socket对象,可通过该类的构造器来创建socket实例。

socket.socket(family = AF_INET, type= SOCK_STREAM, proto=0, fileno= None)

socket对象常用的方法:

基本步骤

创建客户端的步骤:

小实例:服务端

客户端:

通过这样就可以实现socket之间的通信。

打开串口后启动一个线程来监听串口数据的进入,有数据时,就做数据的处理。

用python写串口通信程序的示例:

#coding=gb18030

import sys,threading,time

import serial

import binascii,encodings

import re

import socket

class ReadThread:

def __init__(self, Output=None, Port=0, Log=None, i_FirstMethod=True):

self.l_serial = None

self.alive = False

self.waitEnd = None

self.bFirstMethod = i_FirstMethod

self.sendport = ''

self.log = Log

self.output = Output

self.port = Port

self.re_num = None

def waiting(self):

if not self.waitEnd is None:

self.waitEnd.wait()

def SetStopEvent(self):

if not self.waitEnd is None:

self.waitEnd.set()

self.alive = False

self.stop()

def start(self):

self.l_serial = serial.Serial()

self.l_serial.port = self.port

self.l_serial.baudrate = 9600

self.l_serial.timeout = 2

self.re_num = re.compile('\d')

try:

if not self.output is None:

self.output.WriteTex

if not self.log is None:

self.log.info

self.l_serial.open()

except Exception, ex:

if self.l_serial.isOpen():

self.l_serial.close()

self.l_serial = None

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.error(u'%s' % ex)

return False

if self.l_serial.isOpen():

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.info

self.waitEnd = threading.Event()

self.alive = True

self.thread_read = None

self.thread_read = threading.Thread(target=self.FirstReader)

self.thread_read.setDaemon(1)

self.thread_read.start()

return True

else:

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.info

return False

def InitHead(self):

try:

time.sleep(3)

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.info

self.l_serial.flushInput()

self.l_serial.write('\x11')

data1 = self.l_serial.read(1024)

except ValueError,ex:

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.error(u'%s' % ex)

self.SetStopEvent()

return

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.info

self.output.WriteText(u'===================================\r\n')

def SendData(self, i_msg):

lmsg = ''

isOK = False

if isinstance(i_msg, unicode):

lmsg = i_msg.encode('gb18030')

lmsg = i_msg

pass

except Exception, ex:

pass

return isOK

def FirstReader(self):

data1 = ''

isQuanJiao = True

isFirstMethod = True

isEnd = True

readCount = 0

saveCount = 0

RepPos = 0

#read Head Infor content

self.InitHead()

while self.alive:

try:

data = ''

n = self.l_serial.inWaiting()

if n:

data = data + self.l_serial.read(n)

#print binascii.b2a_hex(data),

for l in xrange(len(data)):

if ord(data[l])==0x8E:

isQuanJiao = True

continue

if ord(data[l])==0x8F:

isQuanJiao = False

continue

if ord(data[l]) == 0x80 or ord(data[l]) == 0x00:

if len(data1)>10:

if not self.re_num.search(data1,1) is None:

saveCount = saveCount + 1

if RepPos==0:

RepPos = self.output.GetInsertionPoint()

self.output.Remove(RepPos,self.output.GetLastPosition())

self.SendData(data1)

data1 = ''

continue

except Exception, ex:

if not self.log is None:

self.log.error(u'%s' % ex)

self.waitEnd.set()

self.alive = False

def stop(self):

self.alive = False

self.thread_read.join()

if self.l_serial.isOpen():

self.l_serial.close()

if not self.output is None:

self.output.WriteText

if not self.log is None:

self.log.info

def printHex(self, s):

s1 = binascii.b2a_hex(s)

print s1

if __name__ == '__main__':

rt = ReadThread()

f = open("sendport.cfg", "r")

rt.sendport = f.read()

f.close()

try:

if rt.start():

rt.waiting()

rt.stop()

else:

pass

except Exception,se:

print str(se)

if rt.alive:

rt.stop()

print 'End OK .'

del rt


欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/yw/11561679.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-17
下一篇 2023-05-17

发表评论

登录后才能评论

评论列表(0条)

保存