理论基础:
IO多路复用之selectors模块
服务端代码:
import os
import socket
import selectors
import Json
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
buffer_size = 1024
STATUS_CODE = {
700: '路径存在',
701: '路径不存在,请重新新输入',
702: "文件名已存在,请重命名文件",
800: "文件已存在,但是大小不够,是否继续上传",
801: "文件已存在",
802: "准备发送文件",
803: "准备接受数据",
804: "文件不存在",
}
class SelectFtpServer:
def __init__(self):
self.dic = {}
self.sel = selectors.DefaultSelector()
self.create_socket()
self.handle()
def create_socket(self):
server = socket.socket()
server.bind(("127.0.0.1",8885))
server.Listen(5)
server.setblocking(False)
self.sel.register(server,selectors.EVENT_READ,self.accept)
print("服务端开启,等待用户连接...")
def handle(self):
while True:
events = self.sel.select()
for key,mask in events:
callback = key.data
callback(key.fileobj,mask)
def accept(self,sock,mask):
conn,addr = sock.accept()
print("{}连接成功".format(addr))
conn.setblocking(False)
self.sel.register(conn,self.read)
self.dic[conn] = {}
# print(self.dic)
def read(self,conn,mask):
try:
if not self.dic[conn]:
data = Json.loads(conn.recv(buffer_size).decode("utf-8"))
if data:
cmd = data["action"]
file_name = data["file_name"]
file_size = data['file_size']
self.dic[conn] = {"action": cmd,"file_name": file_name,"file_size": file_size,"has_recv": 0}
if cmd == 'put':
data = {
"status_code": 802
}
conn.send(Json.dumps(data).encode())
elif cmd == 'get':
file_path = os.path.join(BASE_DIR,"upload",file_name)
# print(file_path)
if os.path.exists(file_path):
file_size = os.path.getsize(file_path)
data = {
"status_code": 803,
"file_size": file_size
}
else:
data = {
"status_code": 804,
}
conn.send(Json.dumps(data).encode())
else:
pass
else:
self.sel.unregister(conn)
conn.close()
else:
cmd = self.dic[conn].get("action",None)
if cmd:
if hasattr(self,cmd):
func = getattr(self,cmd)
func(conn)
else:
print("错误的命令!")
conn.close()
else:
print("错误的命令!")
conn.close()
except ConnectionresetError as e:
print('error',e)
self.sel.unregister(conn)
conn.close()
def put(self,conn):
file_size = self.dic[conn]['file_size']
path = os.path.join(BASE_DIR,self.dic[conn]['file_name'])
recv_data = conn.recv(buffer_size)
self.dic[conn]['has_recv'] += len(recv_data)
with open(path,'ab') as f:
f.write(recv_data)
if self.dic[conn]['has_recv'] >= file_size:
print("%s上传完毕!" % self.dic[conn]['file_name'])
print(self.dic[conn])
self.dic[conn] = {}
def get(self,conn):
file_name = self.dic[conn]['file_name']
path = os.path.join(BASE_DIR,file_name)
if Json.loads(conn.recv(buffer_size).decode("utf-8"))["status_code"] == 802:
with open(path,'rb') as f:
for line in f:
conn.send(line)
self.dic[conn] = {}
print('文件下载完毕!')
# print(self.dic[conn])
if __name__ == '__main__':
SelectFtpServer()
客户端代码:
import socket
import os,sys
import Json
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
buffer_size = 1024
class SelectFtpClIEnt:
def __init__(self):
self.args = sys.argv
if len(self.args) > 1:
self.port = (self.args[1],int(self.args[2]))
else:
self.port = ("127.0.0.1",8885)
self.create_socket()
self.command()
def create_socket(self):
try:
self.sk = socket.socket()
self.sk.connect(self.port)
print('连接FTP服务器成功!')
except Exception as e:
print("error: ",e)
def command(self):
while True:
cmd = input('>>>').strip()
if cmd == 'exit()':
print("结束运行")
break
try:
cmd,file = cmd.split()
if hasattr(self,cmd):
func = getattr(self,cmd)
func(cmd,file)
else:
print('调用错误!')
except ValueError:
print("请正确输入命令")
def put(self,cmd,file):
if os.path.isfile(file):
file_name = os.path.basename(file)
file_size = os.path.getsize(file)
data = {
"action": cmd,
"file_name": file_name,
"file_size": file_size
}
self.sk.send(Json.dumps(data).encode())
recv_data = Json.loads(self.sk.recv(buffer_size).decode("utf-8"))
print('recvstatus',recv_data)
has_send = 0
if recv_data['status_code'] == 802:
with open(file,'rb') as f:
while file_size > has_send:
contant = f.read(1024)
self.sk.send(contant)
has_send += len(contant)
self.progress_bar(has_send,file_size)
print('\n%s文件上传完毕' % (file_name,))
else:
print('文件不存在')
def get(self,file_name):
data = {
"action": cmd,
"file_name": file_name,
"file_size": 0
}
self.sk.send(Json.dumps(data).encode())
rec_data = Json.loads(self.sk.recv(buffer_size).decode("utf-8"))
if rec_data["status_code"] == 803:
data = {
"status_code": 802
}
self.sk.send(Json.dumps(data).encode("utf-8"))
path = os.path.join(BASE_DIR,file_name)
has_recv = 0
with open(path,'wb') as f_write:
while has_recv < rec_data['file_size']:
content = self.sk.recv(buffer_size)
has_recv += len(content)
f_write.write(content)
self.progress_bar(has_recv,rec_data['file_size'])
print('\nfile %s 下载完成!' % file_name)
else:
print("文件不存在!")
def progress_bar(self,f_size,total):
percent = round(f_size / total * 100)
str = '*' * (percent // 2) + ' ' * ((100 - percent) // 2)
print('\r' + str + ' {}%'.format(percent),end='',flush=True,)
if __name__ == '__main__':
SelectFtpClIEnt()
以上是内存溢出为你收集整理的一个基于selectors开发ftp并发上传下载文件小程序全部内容,希望文章能够帮你解决一个基于selectors开发ftp并发上传下载文件小程序所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)