Python3 Windows多处理传递套接字来处理

Python3 Windows多处理传递套接字来处理,第1张

概述Python3 Windows多处理传递套接字来处理

我正在尝试使多处理ServerApp在windows上工作。 我猜这个问题是缺lessos.fork()function,所以我将不得不通过socket不能pickleable(?!)。

我已经看到,这可能使用reduce_handle和rebuild_handle从multiprocessing.reduction如下所示,但是这些方法在Python 3( reduce_handle中不可用。 虽然我有可用的duplicate和steal_handle可用我找不到一个示例如何使用它们,或者我是否需要它们。

另外,我想知道在创build新stream程时是否会出现logging问题?

这是我的ServerApp示例:

使用IPython Notebook在windows上的类中使用multiprocessing.Pool

父进程和subprocess,如果退出,如何通知其他退出?

PyInstaller构buildwindows EXE失败,多处理

在C中同步进程和信号量和信号

Python多处理stdininput

import logging import socket from select import select from threading import Thread from multiprocessing import Queue from multiprocessing import Process from sys import stdout from time import sleep class ServerApp(object): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) handler = logging.StreamHandler(stdout) formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) def conn_handler(self,connection,address,buffer): self.logger.info("[%d] - Connection from %s:%d",self.ID,address[0],address[1]) try: while True: command = None received_data = b'' readable,writable,exceptional = select([connection],[],0) # Check for clIEnt commands if readable: # Get Command ... There is more code here command = 'Something' if command == 'Something': connection.sendall(command_response) else: print(':(') except Exception as e: print(e) finally: connection.close() self.clIEnt_buffers.remove(buffer) self.logger.info("[%d] - Connection from %s:%d has been closed.",address[1]) def join(self): while self.Listener.is_alive(): self.Listener.join(0.5) def acceptor(self): while True: self.logger.info("[%d] - Waiting for connection on %s:%d",self.ip,self.port) # Accept a connection on the bound socket and fork a child process to handle it. conn,address = self.socket.accept() # Create Queue which will represent buffer for specific clIEnt and add it o List of all clIEnt buffers buffer = Queue() self.clIEnt_buffers.append(buffer) process = Process(target=self.conn_handler,args=(conn,buffer)) process.daemon = True process.start() self.clIEnts.append(process) # Close the connection fd in the parent,since the child process has its own reference. conn.close() def __init__(self,ID,port=4545,ip='127.0.0.1',method='tcp',buffer_size=2048): self.ID = ID self.port = port self.ip = ip self.socket = None self.Listener = None self.buffer_size = buffer_size # Additional attributes here.... self.clIEnts = [] self.clIEnt_buffers = [] def run(self): # Create TCP socket,bind port and Listen for incoming connections self.socket = socket.socket(socket.AF_INET,socket.soCK_STREAM) self.socket.bind((self.ip,self.port)) self.socket.Listen(5) self.Listener = Thread(target=self.acceptor) # Run acceptor thread to handle new connection self.Listener.daemon = True self.Listener.start()

在linux下读写PIPE

只能在一个处理器上运行的进程可以在其他处理器上运行吗?

Python在windows中使用多处理进行日志logging

获取互斥量,偏好升级过程

Bash:在多个核心上运行相同的程序

为了允许python3连接酸洗(包括套接字),你应该使用mulitprocessing.allow_connection_pickling 。 它为ForkingPickler套接字注册reducers。 例如:

import socket import multiprocessing as mp mp.allow_connection_pickling() def _test_connection(conn): msg = conn.recv(2) conn.send(msg) conn.close() print("ok") if __name__ == '__main__': server,clIEnt = socket.socketpair() p = mp.Process(target=_test_connection,args=(server,)) p.start() clIEnt.settimeout(5) msg = b'42' clIEnt.send(msg) assert clIEnt.recv(2) == msg p.join() assert p.exitcode == 0 clIEnt.close() server.close()

我还注意到,你还有其他一些问题没有被清理的socket 。

当使用self.conn_handler作为目标时,多处理将尝试腌制整个对象self 。 这是一个问题,因为你的对象包含一些不能被酸洗的Thread 。 因此,您应该从目标函数的关闭中删除self 。 这可以通过使用@staticmethod装饰器,并删除函数中的所有self提到。

另外, logging模块没有完成处理多个进程。 基本上,启动Process所有日志都将丢失您当前的代码。 要解决这个问题,您可以在启动第二个Process (在conn_handler的开始conn_handler )或使用multiprocessing日志记录实用程序时启动新的logging记录。

这可以给出这样的事情:

import logging import socket from select import select from threading import Thread from multiprocessing import util,get_context from sys import stdout from time import sleep util.log_to_stderr(20) ctx = get_context("spawn") class serverApp(object): logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) handler = logging.StreamHandler(stdout) formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) def __init__(self,buffer_size=2048): self.ID = ID self.port = port self.ip = ip self.socket = None self.Listener = None self.buffer_size = buffer_size # Additional attributes here.... self.clIEnts = [] self.clIEnt_buffers = [] @staticmethod def conn_handler(ID,buffer): print("test") util.info("[%d] - Connection from %s:%d",address[1]) try: while True: command = None received_data = b'' # Check for clIEnt commands readable,0) if readable: # Get Command ... There is more code here command = 'Something' if command == 'Something': connection.sendall(b"Coucouc") break else: print(':(') sleep(.1) except Exception as e: print(e) finally: connection.close() util.info("[%d] - Connection from %s:%d has been closed.",address[1]) print("Close") def join(self): while self.Listener.is_alive(): self.Listener.join(0.5) def acceptor(self): while True: self.logger.info("[%d] - Waiting for connection on %s:%d",self.port) # Accept a connection on the bound socket and fork a child process # to handle it. conn,address = self.socket.accept() # Create Queue which will represent buffer for specific clIEnt and # add it o List of all clIEnt buffers buffer = ctx.Queue() self.clIEnt_buffers.append(buffer) process = ctx.Process(target=self.conn_handler,args=(self.ID,conn,since the child process # has its own reference. conn.close() def run(self): # Create TCP socket,self.port)) self.socket.Listen(5) # Run acceptor thread to handle new connection self.Listener = Thread(target=self.acceptor) self.Listener.daemon = True self.Listener.start() self.Listener.join() def main(): app = serverApp(0) app.run() if __name__ == '__main__': main()

我只测试它在Unix和python3.6,但它不应该有太多不同的行为,因为我使用派生上下文,which should behave like the windows中,which should behave like the进程。

总结

以上是内存溢出为你收集整理的Python3 Windows多处理传递套接字来处理全部内容,希望文章能够帮你解决Python3 Windows多处理传递套接字来处理所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/1289933.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-09
下一篇 2022-06-09

发表评论

登录后才能评论

评论列表(0条)

保存