首先需要一个聊天服务器,这里继承asyncore的dispatcher类来实现,代码如下
class ChatServer(dispatcher):
"""
聊天服务器
"""
def __init__(self, port):
dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind(('', port))
self.listen(5)
self.users = {}
self.main_room = ChatRoom(self)
def handle_accept(self):
conn, addr = self.accept()
ChatSession(self, conn)
2.会话类
有了服务器类还需要能维护每个用户的连接会话,这里继承asynchat的async_chat类来实现,代码如下:
class ChatSession(async_chat):
"""
负责和单用户通信
"""
def __init__(self, server, sock):
async_chat.__init__(self, sock)
self.server = server
self.set_terminator('n')
self.data = []
self.name = None
self.enter(LoginRoom(server))
def enter(self, room):
'从当前房间移除自身,然后添加到指定房间'
try:
cur = self.room
except AttributeError:
pass
else:
cur.remove(self)
self.room = room
room.add(self)
def collect_incoming_data(self, data):
'接受客户端的数据'
self.data.append(data)
def found_terminator(self):
'当客户端的一条数据结束时的处理'
line = ''.join(self.data)
self.data = []
try:
self.room.handle(self, line)
except EndSession:
self.handle_close()
def handle_close(self):
async_chat.handle_close(self)
self.enter(LogoutRoom(self.server))
3.命令解释器
现在就需要一个命令解释器能够解释用户的命令,例如登录、查询在线用户和发消息等,代码如下:
class CommandHandler:
"""
命令处理类
"""
def unknown(self, session, cmd):
'响应未知命令'
session.push('Unknown command: %sn' % cmd)
def handle(self, session, line):
'命令处理'
if not line.strip():
return
parts = line.split(' ', 1)
cmd = parts[0]
try:
line = parts[1].strip()
except IndexError:
line = ''
meth = getattr(self, 'do_' + cmd, None)
try:
meth(session, line)
except TypeError:
self.unknown(session, cmd)
4.房间
接下来就需要实现聊天室的房间了,这里我们定义了三种房间,分别是用户刚登录时的房间、聊天的房间和退出登录的房间,这三种房间都有一个公共的父类,代码如下:
class Room(CommandHandler):
"""
包含多个用户的环境,负责基本的命令处理和广播
"""
def __init__(self, server):
self.server = server
self.sessions = []
def add(self, session):
'一个用户进入房间'
self.sessions.append(session)
def remove(self, session):
'一个用户离开房间'
self.sessions.remove(session)
def broadcast(self, line):
'向所有的用户发送指定消息'
for session in self.sessions:
session.push(line)
def do_logout(self, session, line):
'退出房间'
raise EndSession
class LoginRoom(Room):
"""
刚登录的用户的房间
"""
def add(self, session):
'用户连接成功的回应'
Room.add(self, session)
session.push('Connect Success')
def do_login(self, session, line):
'登录命令处理'
name = line.strip()
if not name:
session.push('UserName Empty')
elif name in self.server.users:
session.push('UserName Exist')
else:
session.name = name
session.enter(self.server.main_room)
class ChatRoom(Room):
"""
聊天用的房间
"""
def add(self, session):
'广播新用户进入'
session.push('Login Success')
self.broadcast(session.name + ' has entered the room.n')
self.server.users[session.name] = session
Room.add(self, session)
def remove(self, session):
'广播用户离开'
Room.remove(self, session)
self.broadcast(session.name + ' has left the room.n')
def do_say(self, session, line):
'客户端发送消息'
self.broadcast(session.name + ': ' + line + 'n')
def do_look(self, session, line):
'查看在线用户'
session.push('Online Users:n')
for other in self.sessions:
session.push(other.name + 'n')
class LogoutRoom(Room):
"""
用户退出时的房间
"""
def add(self, session):
'从服务器中移除'
try:
del self.server.users[session.name]
except KeyError:
pass
5.服务器端完整代码
#!/usr/bin/python
# encoding: utf-8
from asyncore import dispatcher
from asynchat import async_chat
import socket, asyncore
PORT = 6666 #端口
class EndSession(Exception):
"""
自定义会话结束时的异常
"""
pass
class CommandHandler:
"""
命令处理类
"""
def unknown(self, session, cmd):
'响应未知命令'
session.push('Unknown command: %sn' % cmd)
def handle(self, session, line):
'命令处理'
if not line.strip():
return
parts = line.split(' ', 1)
cmd = parts[0]
try:
line = parts[1].strip()
except IndexError:
line = ''
meth = getattr(self, 'do_' + cmd, None)
try:
meth(session, line)
except TypeError:
self.unknown(session, cmd)
class Room(CommandHandler):
"""
包含多个用户的环境,负责基本的命令处理和广播
"""
def __init__(self, server):
self.server = server
self.sessions = []
def add(self, session):
'一个用户进入房间'
self.sessions.append(session)
def remove(self, session):
'一个用户离开房间'
self.sessions.remove(session)
def broadcast(self, line):
'向所有的用户发送指定消息'
for session in self.sessions:
session.push(line)
def do_logout(self, session, line):
'退出房间'
raise EndSession
class LoginRoom(Room):
"""
刚登录的用户的房间
"""
def add(self, session):
'用户连接成功的回应'
Room.add(self, session)
session.push('Connect Success')
def do_login(self, session, line):
'登录命令处理'
name = line.strip()
if not name:
session.push('UserName Empty')
elif name in self.server.users:
session.push('UserName Exist')
else:
session.name = name
session.enter(self.server.main_room)
class ChatRoom(Room):
"""
聊天用的房间
"""
def add(self, session):
'广播新用户进入'
session.push('Login Success')
self.broadcast(session.name + ' has entered the room.n')
self.server.users[session.name] = session
Room.add(self, session)
def remove(self, session):
'广播用户离开'
Room.remove(self, session)
self.broadcast(session.name + ' has left the room.n')
def do_say(self, session, line):
'客户端发送消息'
self.broadcast(session.name + ': ' + line + 'n')
def do_look(self, session, line):
'查看在线用户'
session.push('Online Users:n')
for other in self.sessions:
session.push(other.name + 'n')
class LogoutRoom(Room):
"""
用户退出时的房间
"""
def add(self, session):
'从服务器中移除'
try:
del self.server.users[session.name]
except KeyError:
pass
class ChatSession(async_chat):
"""
负责和单用户通信
"""
def __init__(self, server, sock):
async_chat.__init__(self, sock)
self.server = server
self.set_terminator('n')
self.data = []
self.name = None
self.enter(LoginRoom(server))
def enter(self, room):
'从当前房间移除自身,然后添加到指定房间'
try:
cur = self.room
except AttributeError:
pass
else:
cur.remove(self)
self.room = room
room.add(self)
def collect_incoming_data(self, data):
'接受客户端的数据'
self.data.append(data)
def found_terminator(self):
'当客户端的一条数据结束时的处理'
line = ''.join(self.data)
self.data = []
try:
self.room.handle(self, line)
except EndSession:
self.handle_close()
def handle_close(self):
async_chat.handle_close(self)
self.enter(LogoutRoom(self.server))
class ChatServer(dispatcher):
"""
聊天服务器
"""
def __init__(self, port):
dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind(('', port))
self.listen(5)
self.users = {}
self.main_room = ChatRoom(self)
def handle_accept(self):
conn, addr = self.accept()
ChatSession(self, conn)
if __name__ == '__main__':
s = ChatServer(PORT)
try:
asyncore.loop()
except KeyboardInterrupt:
最简单的办法是使用内置的httpserver,通过多线程socketserver,和simplehttpserver实现简单的交互。
python聊天室(python2.7版本):
暂时先给出两种版本的,tcp+udp
都是分别运行server.py和client.py,就可以进行通讯了。
别外还有websocket版本,这个是有web界面的和基本web服务的,如果需要的话,我会把基本的代码贴一版上来。
TCP版本:
socket-tcp-server.py(服务端):#-*- encoding:utf-8 -*-
#socket.getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0)
#根据给定的参数host/port,相应的转换成一个包含用于创建socket对象的五元组,
#参数host为域名,以字符串形式给出代表一个IPV4/IPV6地址或者None.
#参数port如果字符串形式就代表一个服务名,比如“http”"ftp""email"等,或者为数字,或者为None
#参数family为地主族,可以为AF_INET ,AF_INET6 ,AF_UNIX.
#参数socktype可以为SOCK_STREAM(TCP)或者SOCK_DGRAM(UDP)
#参数proto通常为0可以直接忽略
#参数flags为AI_*的组合,比如AI_NUMERICHOST,它会影响函数的返回值
#附注:给参数host,port传递None时建立在C基础,通过传递NULL。
#该函数返回一个五元组(family, socktype, proto, canonname, sockaddr),同时第五个参数sockaddr也是一个二元组(address, port)
#更多的方法及链接请访问
# Echo server program
from socket import *
import sys
import threading
from time import ctime
from time import localtime
import traceback
import time
import subprocess
reload(sys)
sys.setdefaultencoding("utf8")
HOST='127.0.0.1'
PORT=8555 #设置侦听端口
BUFSIZ=1024
class TcpServer():
def __init__(self):
self.ADDR=(HOST, PORT)
try:
self.sock=socket(AF_INET, SOCK_STREAM)
print '%d is open' % PORT
self.sock.bind(self.ADDR)
self.sock.listen(5)
#设置退出条件
self.STOP_CHAT=False
# 所有监听的客户端
self.clients = {}
self.thrs = {}
self.stops = []
except Exception,e:
print "%d is down" % PORT
return False
def IsOpen(ip, port):
s = socket(AF_INET, SOCK_STREAM)
try:
s.connect((ip, int(port)))
# s.shutdown(2)
# 利用shutdown()函数使socket双向数据传输变为单向数据传输。shutdown()需要一个单独的参数,
# 该参数表示s了如何关闭socket。具体为:0表示禁止将来读;1表示禁止将来写;2表示禁止将来读和写。
print '%d is open' % port
return True
except:
print '%d is down' % port
return False
def listen_client(self):
while not self.STOP_CHAT:
print(u'等待接入,侦听端口:%d' % (PORT))
self.tcpClientSock, self.addr=self.sock.accept()
print(u'接受连接,客户端地址:',self.addr)
address = self.addr
#将建立的client socket链接放到列表self.clients中
self.clients[address] = self.tcpClientSock
#分别将每个建立的链接放入进程中,接收且分发消息
self.thrs[address] = threading.Thread(target=self.readmsg, args=[address])
self.thrs[address].start()
time.sleep(0.5)
def readmsg(self,address):
#如果地址不存在,则返回False
if address not in self.clients:
return False
#得到发送消息的client socket
client = self.clients[address]
while True:
try:
#获取到消息内容data
data=client.recv(BUFSIZ)
except:
print(e)
self.close_client(address)
break
if not data:
break
#python3使用bytes,所以要进行编码
#s='%s发送给我的信息是:[%s] %s' %(addr[0],ctime(), data.decode('utf8'))
#对日期进行一下格式化
ISOTIMEFORMAT='%Y-%m-%d %X'
stime=time.strftime(ISOTIMEFORMAT, localtime())
s=u'%s发送给我的信息是:%s' %(str(address),data.decode('utf8'))
#将获得的消息分发给链接中的client socket
for k in self.clients:
self.clients[k].send(s.encode('utf8'))
self.clients[k].sendall('sendall:'+s.encode('utf8'))
print str(k)
print([stime], ':', data.decode('utf8'))
#如果输入quit(忽略大小写),则程序退出
STOP_CHAT=(data.decode('utf8').upper()=="QUIT")
if STOP_CHAT:
print "quit"
self.close_client(address)
print "already quit"
break
def close_client(self,address):
try:
client = self.clients.pop(address)
self.stops.append(address)
client.close()
for k in self.clients:
self.clients[k].send(str(address) + u"已经离开了")
except:
pass
print(str(address)+u'已经退出')
if __name__ == '__main__':
tserver = TcpServer()
tserver.listen_client()
——————————华丽的分割线——————————
socket-tcp-client.py (客户端):
#-*- encoding:utf-8 -*-
from socket import *
import sys
import threading
import time
reload(sys)
sys.setdefaultencoding("utf8")
#测试,连接本机
HOST='127.0.0.1'
#设置侦听端口
PORT=8555
BUFSIZ=1024
class TcpClient:
ADDR=(HOST, PORT)
def __init__(self):
self.HOST = HOST
self.PORT = PORT
self.BUFSIZ = BUFSIZ
#创建socket连接
self.client = socket(AF_INET, SOCK_STREAM)
self.client.connect(self.ADDR)
#起一个线程,监听接收的信息
self.trecv = threading.Thread(target=self.recvmsg)
self.trecv.start()
def sendmsg(self):
#循环发送聊天消息,如果socket连接存在则一直循环,发送quit时关闭链接
while self.client.connect_ex(self.ADDR):
data=raw_input('>:')
if not data:
break
self.client.send(data.encode('utf8'))
print(u'发送信息到%s:%s' %(self.HOST,data))
if data.upper()=="QUIT":
self.client.close()
print u"已关闭"
break
def recvmsg(self):
#接收消息,如果链接一直存在,则持续监听接收消息
try:
while self.client.connect_ex(self.ADDR):
data=self.client.recv(self.BUFSIZ)
print(u'从%s收到信息:%s' %(self.HOST,data.decode('utf8')))
except Exception,e:
print str(e)
if __name__ == '__main__':
client=TcpClient()
client.sendmsg()
UDP版本:
socket-udp-server.py# -*- coding:utf8 -*-
import sys
import time
import traceback
import threading
reload(sys)
sys.setdefaultencoding('utf-8')
import socket
import traceback
HOST = "127.0.0.1"
PORT = 9555
CHECK_PERIOD = 20
CHECK_TIMEOUT = 15
class UdpServer(object):
def __init__(self):
self.clients = []
self.beats = {}
self.ADDR = (HOST,PORT)
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(self.ADDR) # 绑定同一个域名下的所有机器
self.beattrs = threading.Thread(target=self.checkheartbeat)
self.beattrs.start()
except Exception,e:
traceback.print_exc()
return False
def listen_client(self):
while True:
time.sleep(0.5)
print "hohohohohoo"
try:
recvData,address = self.sock.recvfrom(2048)
if not recvData:
self.close_client(address)
break
if address in self.clients:
senddata = u"%s发送给我的信息是:%s" %(str(address),recvData.decode('utf8'))
if recvData.upper() == "QUIT":
self.close_client(address)
if recvData == "HEARTBEAT":
self.heartbeat(address)
continue
else:
self.clients.append(address)
senddata = u"%s发送给我的信息是:%s" %(str(address),u'进入了聊天室')
for c in self.clients:
try:
self.sock.sendto(senddata,c)
except Exception,e:
print str(e)
self.close_client(c)
except Exception,e:
# traceback.print_exc()
print str(e)
pass
def heartbeat(self,address):
self.beats[address] = time.time()
def checkheartbeat(self):
while True:
print "checkheartbeat"
print self.beats
try:
for c in self.clients:
print time.time()
print self.beats[c]
if self.beats[c] + CHECK_TIMEOUT <time.time():
print u"%s心跳超时,连接已经断开" %str(c)
self.close_client(c)
else:
print u"checkp%s,没有断开" %str(c)
except Exception,e:
traceback.print_exc()
print str(e)
pass
time.sleep(CHECK_PERIOD)
def close_client(self,address):
try:
if address in self.clients:
self.clients.remove(address)
if self.beats.has_key(address):
del self.beats[address]
print self.clients
for c in self.clients:
self.sock.sendto(u'%s已经离开了' % str(address),c)
print(str(address)+u'已经退出')
except Exception,e:
print str(e)
raise
if __name__ == "__main__":
udpServer = UdpServer()
udpServer.listen_client()
——————————华丽的分割线——————————
socket-udp-client.py:
# -*- coding:utf8 -*-
import sys
import threading
import time
reload(sys)
sys.setdefaultencoding('utf-8')
import socket
HOST = "127.0.0.1"
PORT = 9555
#BEAT_PORT = 43278
BEAT_PERIOD = 5
class UdpClient(object):
def __init__(self):
self.clientsock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
self.HOST = HOST
self.ADDR = (HOST,PORT)
self.clientsock.sendto(u'请求建立链接',self.ADDR)
self.recvtrs = threading.Thread(target=self.recvmsg)
self.recvtrs.start()
self.hearttrs = threading.Thread(target=self.heartbeat)
self.hearttrs.start()
def sendmsg(self):
while True:
data = raw_input(">:")
if not data:
break
self.clientsock.sendto(data.encode('utf-8'),self.ADDR)
if data.upper() == 'QUIT':
self.clientsock.close()
break
def heartbeat(self):
while True:
self.clientsock.sendto('HEARTBEAT',self.ADDR)
time.sleep(BEAT_PERIOD)
def recvmsg(self):
while True:
recvData,addr = self.clientsock.recvfrom(1024)
if not recvData:
break
print(u'从%s收到信息:%s' %(self.HOST,recvData.decode('utf8')))
if __name__ == "__main__":
udpClient = UdpClient()
udpClient.sendmsg()
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)