Rust小结, 端到端聊天,半完成版

Rust小结, 端到端聊天,半完成版,第1张

Rust小结,端到端聊天

没有跟老师走,选择了难度跟老师类似的另一个选题,真的有点要命(warnning有点多,我跪了)
使用前记得再cargo.toml下面的dependencies依赖装上encoding这个包
跪了跪了

// Rust更偏向Server端, 当然也可以当用户端用
// main.rs主函数
mod Record;
mod message;
mod udp_;
mod default_listen_send;

use std::io::{ErrorKind, Read, Write};
use std::net::{AddrParseError, IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream};
use std::{net, thread};
use std::borrow::BorrowMut;
use std::collections::HashMap;
use std::io::stdin;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::thread::Thread;
use std::time::Duration;
use std::io;
use std::mem::uninitialized;
use std::os;
use std::str::FromStr;
use encoding::{DecoderTrap, Encoding};
use crate::Record::{Client, operator, record};
use std::sync::*;
use std::sync::mpsc::{Receiver, Sender};
use encoding::all::GBK;
use crate::default_listen_send::*;
use crate::message::*;
use crate::udp_::*;


// 主函数,整体逻辑
fn main() {
    let (sender, receiver) = mpsc::channel();
    let mut udp_server = thread::spawn(move || { udp_server(sender) });
    let mut information_flush = thread::spawn(move || { message_main(receiver) });
    let mut listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    thread::sleep(Duration::from_secs(2));
    let mut x = 1;
    for stream in listener.incoming() {
        let mut stream = stream.unwrap();
        stream.set_nonblocking(true);
        let mut m = Arc::new(Mutex::new(stream));
        let mut n = Arc::clone(&m);
        let mut p = thread::spawn(move || { sender_thread(m, x.clone()) });
        let (a, b) = mpsc::channel();
        // 这一些参数主要是为了代码复用,毕竟我讨厌函数编程的继承,再说本来就不是一个类
        let mut q = thread::spawn(move || { listen_thread(n, false, a, String::from(""), String::from("")) });
        // println!("123");
        x += 1;
    }
}


//Record.rs  全部数据结构与trait
use std::borrow::BorrowMut;
use std::cell::RefCell;
use std::collections::HashMap;
use std::io::Write;
use std::net::*;
use std::net::Shutdown;
use encoding::all::GBK;
use encoding::{EncoderTrap, Encoding};


// this part's theme is data_stuct with data_method

pub trait operator {
    fn new() -> record;
    fn flush_record(&self, addr: SocketAddr);
}

pub struct record {
    pub(crate) liuyan: HashMap<String, Vec<String>>,
}


impl operator for record {
    fn new() -> record {
        record { liuyan: HashMap::new() }
    }
    // 将留言用TCp连接传给登录的用户
    // message transfer to the user who logged and with message not read/receievd yet
    fn flush_record(&self, addr: SocketAddr) {
        let mut n = TcpStream::connect(addr).unwrap();
        for name in self.liuyan.keys() {
            let mut str = String::from("接下来的留言来自") + name;
            str = str + "\n";
            let h = GBK.encode(&str, EncoderTrap::Strict).unwrap();
            n.write(&h);
            for words in &self.liuyan[name] {
                let h = GBK.encode(words, EncoderTrap::Strict).unwrap();
                n.write(&h);
                n.write(&GBK.encode("\n", EncoderTrap::Strict).unwrap());
            }
        }
        // 我再另一端加了判断,收到"close_"结束监听这样的判断
        // The code on Client need a sign to end listen , And I choose to use "close_"
        let h = GBK.encode("close_", EncoderTrap::Strict).unwrap();
        n.write(&h);
        n.shutdown(Shutdown::Write);
        return ();
    }
}

pub struct Client {
    pub name_: String,
    pub socket: SocketAddr,
}

impl Client {
    pub fn new(name: String, addr_: SocketAddr) -> Client {
        Client { name_: name, socket: addr_ }
    }
}

#[derive(Debug)]
pub struct ChangeMessStru {
    pub name1: String,
    pub name2: String,
    pub message: Vec<String>,
}

impl ChangeMessStru {
    pub fn new(name1: String, name2: String, message: Vec<String>) -> ChangeMessStru {
        ChangeMessStru { name1, name2, message }
    }
}


// udp.rs, 完全作为服务器的功能,用户端使用不到。当然,如果是纯p2p是用户端可以用上的,毕竟纯p2p每个人都是client
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::str::FromStr;
use std::sync::mpsc::Sender;
use encoding::{DecoderTrap, Encoding};
use crate::Client;

//以下是服务器端的UDP连接, 负责给出地址与服务器登录, 并给留言模块中的flush留言进程一个信号用来释放留言
// This is the part of udp_server , with the fuction of Login, Give recore, And give record_flush fuction/Thread a signal
pub fn udp_server(sender: Sender<Client>) {
    use std::net::UdpSocket;
    let mut name_hash_: HashMap<String, SocketAddr> = HashMap::new();
    // 服务器端设置了一个默认用户default一直在线,就是default_listen_send里面的内容
    // I create a user named default to test and display listen and send fuction in default_listen_send.rs.
    // default start with main Thread and will always be online
    let default_socket = SocketAddr::new(IpAddr::from_str("127.0.0.1").unwrap(), 7878);
    name_hash_.insert(String::from("default"), default_socket);
    use encoding::all::GBK;
    // 这里的端口都是我写死的udp服务器端口
    // This is Server Udp settings and Sorry for there's no api for you to change it easily
    let mut listener = UdpSocket::bind("127.0.0.1:7978").unwrap();
    // 这玩应要一直在线的所以直接死循环不让他停下来
    // listener will be always online So I use loop
    loop {
        // listen data from udp request
        // buf存储udp传过来的数据
        let mut buf = [0; 128];
        let (size, addr) = listener.recv_from(&mut buf).unwrap();
        let mut buf = buf.to_vec();
        let (buf1, buf2) = buf.split_at(size);
        let info = GBK.decode(buf1, DecoderTrap::Strict).unwrap();
        println!("udp请求: {}", info);
        // Ths following is some judjing on the data transfered. Data need to be formated as we can parse Or we just return 403.
        // 下面是对数据的规范性判断。只有符合我们需要的格式才返回结果否则统统403
        // 数据格式要求:  request someone_name  请求someone_name存储的addr
        //               give addr someone_name  我们在本地的Hashmap中增添someone_name的记录,且数据为addr
        if info.len() <= 8 {
            listener.send_to("403 Not a good request".as_bytes(), addr);
            continue;
        }
        if &info[0..7] == "request" {
            let name = &info[8..];
            match name_hash_.get(name) {
                None => { listener.send_to("No such user".as_bytes(), addr); }
                Some(k) => { listener.send_to(k.to_string().as_bytes(), addr); }
            }
        } else if info[0..4] == *"give" {
            let mut ip_String = String::from("");
            let mut jishu = 5;
            for p in info[5..].chars() {
                if p != ':' {
                    jishu += 1;
                } else {
                    break;
                }
            }
            ip_String += &info[5..jishu];
            let ip_String = &ip_String;
            let mut Ip: Ipv4Addr;
            match Ipv4Addr::from_str(ip_String) {
                Ok(s) => { Ip = s }
                Err(_) => {
                    listener.send_to("403 Not a good request".as_bytes(), addr);
                    continue;
                }
            }
            jishu += 1;
            let mut z = jishu;
            for p in info[jishu..].chars() {
                if p != ' ' {
                    z += 1;
                } else {
                    break;
                }
            }
            let mut port = &info[jishu..z];
            let port: u16 = port.trim().parse().unwrap();
            let mut name = &info[z + 1..];
            let h = SocketAddr::new(IpAddr::from(Ip), port);
            name_hash_.insert(String::from(name), h);
            listener.send_to("Success_receive".as_bytes(), addr);
            //When we success get a give message, That means someone must online 
            //so we give flush_message fuction/thread a signal
            //当我们收到give的消息的时候,我们在Client端定义好了要么登录,要么连接成功,欢聚话说就是give的信息中的user必定online
            //所以我们给释放留言的函数或叫做运行线程一个信号让他把留言统统给释放
            sender.send(Client::new(String::from(name), h));
        } else {
            listener.send_to("403 Not a good request".as_bytes(), addr);
        }
    }
}


// message.rs 处理留言的功能
use std::borrow::BorrowMut;
use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::{Shutdown, TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::*;
use std::sync::mpsc;
use std::thread;
use std::thread::JoinHandle;
use encoding::all::GBK;
use encoding::{DecoderTrap, EncoderTrap, Encoding};
use crate::{Client, operator, record};
use crate::listen_thread;
use crate::Record::ChangeMessStru;

// this mode is all about message-options
// 这整个一part主要就是message模块

// This is the main fuction of message-options, and will create three(at_least)  son-thread to do all fuctions.
//主函数主要负责进程的创建以及一些主要参数逻辑调度。在主函数中将创建3个(至少)的子线程完成整个message *** 作
pub fn message_main(listener: Receiver<Client>) {
    // information is message-data
    // 这个information就是message_data存储的地方
    let mut Information: HashMap<String, record> = HashMap::new();
    let (changer, fixer) = mpsc::channel();
    let mut information_can_change1 = Arc::new(Mutex::new(Information));
    let mut information_can_change2 = Arc::clone(&information_can_change1);
    thread::spawn(move || { listen_message_thread(changer); });
    thread::spawn(move || { change_message(fixer, information_can_change1); });
    thread::spawn(move || { information_flush(listener, information_can_change2); });
}

// When someone online will flush the message leave to him
// 上线留言清空模块
pub fn information_flush(listener: Receiver<Client>, information: Arc<Mutex<HashMap<String, record>>>) {
    loop {
        let socket_ = listener.recv().unwrap();
        // Fisrt block at signal then lock the field to prevent dead_lock
        // 先信号阻塞再锁住information防止死锁
        let mut Information = information.lock().unwrap();
        match (*Information).get(&socket_.name_) {
            // 没有相关的留言
            // No message leave will continue, and
            None => {
                let mut n = TcpStream::connect(socket_.socket).unwrap();
                n.write(&GBK.encode("close_",EncoderTrap::Strict).unwrap());
                n.shutdown(Shutdown::Write);
                // continue;
                }
            Some(h) => {
                let mut p = h;
                // I define the record trait in Record.rs
                // 这里是我在这个类型(record)中定义了将所有留言发送给用户的trait,所以直接调用即可
                p.flush_record(socket_.socket);
                // Data need to be remove after flush
                // flush数据后肯定要清空
                (*Information).remove(&socket_.name_);
            }
        }
    }
}

// 监听留言模块
// listen message when somepeople leave
pub fn listen_message_thread(changer: Sender<ChangeMessStru>) {
    let mut listener = TcpListener::bind("127.0.0.1:8008").unwrap();
    for stream in listener.incoming() {
        let mut stream = stream.unwrap();
        let mut buf = [0; 128];
        let size = stream.read(&mut buf).unwrap();
        let (buf1, buf2) = buf.split_at(size);
        let info = GBK.decode(buf1, DecoderTrap::Strict).unwrap();
        let mut size = 0;
        for c in info.chars() {
            if c == ' ' {
                break;
            }
            size += 1
        }
        // pre-processing data to get name1(who will reveive meassage)  and name2(who leave the message)
        // name1 是要发送的人, 是Information的键, name2 是发送的人, 是Information[name1].liuyan的键
        let name1 = String::from(&info[..size]);
        let name2 = String::from(&info[size + 1..]);
        let p = changer.clone();
        thread::spawn(move || {
            //复用listen代码顺便让他传数据
            let p = listen_thread(Arc::new(Mutex::new(stream)), true, p, name1, name2);
        });
    }
}

// 收到留言的信号协同改变数据
// Update message data whene receive signal with data
pub fn change_message(listener: Receiver<ChangeMessStru>, Information: Arc<Mutex<HashMap<String, record>>>) {
    loop {
        let data = listener.recv();
        let mut information = Information.lock().unwrap();
        let ChangeMessStru { mut name1, mut name2, mut message } = data.unwrap();
        match information.get(&name1) {
            None => {
                let mut p = record::new();
                p.liuyan.insert(name2, message);
                (*information).insert(name1, p);
                continue;
            }
            Some(_) => {
                let mut p = information.get(&name1).unwrap().liuyan.clone();
                match p.get(&name2) {
                    None => {
                        p.insert(name2, message);
                    }
                    Some(_) => {
                        let mut h = p.get(&name2).unwrap().clone();
                        h.append(&mut message);
                        p.insert(name2, h);
                    }
                };
                let mut q = record::new();
                q.liuyan = p;
                (*information).insert(name1, q);
            }
        }
    }
}


// default_listen_send.rs  因为我完全是在本机127.0.0.1上跑的,所以Rust端要写一个p2pTCP能建立连接的演示,就是这个功能
use std::io;
use std::io::{ErrorKind, Read, Write};
use std::net::TcpStream;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::Sender;
use crate::{operator, record};
use crate::Record::ChangeMessStru;


// These two fuction is a diasplay of client , can do listen and send.
// 这个相当于Client 接收端, 其实主要是演示而存在的default一直在线的用户,就是监听和发送的功能
// 这些复杂的参数不是default用的,为了代码复用,主要是为了留言监听功能的参数。只是为了default功能的话只保留第一个参数就可以了

// The first args I need to use Arc to make the tcpstream , or a tcp_connection, can be shared by listen and send two thread.
// Because listen's read and send's write will change the tcp_stream, and this is Rust, So I only can use Arc Plus Mutex done it.
// 第一个参数是这两个thread的核心, 我用了Arc只能指针再加上Mutex只能指针。因为针对同一个TCP连接我们肯定希望listen和send是在同
// 一个TCP流下面完成的,而且这两个功能势必要对TCP流变量进行修改,而且,这里是Rust.只能用Arc智能指针增加所有权人数,让后用Mutex锁
// 智能指针保证TCP流变量可修改,真的 *** 碎了心

pub fn listen_thread(listen_mux: Arc<Mutex<TcpStream>>, robot: bool, sender: Sender<ChangeMessStru>, n1: String, n2: String) {
    let mut message: Vec<String> = Vec::new();
    loop {
        // 作用域在loop里面,第二次循环应该会自动解锁
        // The mutexs lock filed in loop, when turns next loop will auto unlock
        {
            let mut listen_lock = listen_mux.lock().unwrap();
            let mut p = [0; 64];
            match (*listen_lock).read(&mut p) {
                Ok(_) => {}
                Err(e) => {
                    // 连接终端回收线程
                    // Kill the Thread whene TCP connection disconnect
                    if e.kind() == ErrorKind::ConnectionReset {
                        // 判断是不是要发信号的线程
                        // This fuction is just for message listener. Because message leave code I just choose to use this listener
                        // When done message leave, This thread will signal message_change thread with data
                        // so message_change thread will update message_leave data
                        if robot {
                            sender.send(ChangeMessStru { name1: n1, name2: n2, message });
                            return ();
                        } else {
                            return ();
                        }
                    }
                }
            }
            if p == [0; 64] {
                continue;
            }
            let mut s = String::from("");
            for h in p {
                if h == 0 {
                    break;
                }
                s += &(String::from(h as char))
            }
            // When receive data, print it
            // 打印读取到的流数据
            println!("Read Data: {}", s);
            if robot {
                message.push(s)
            }
        }
    }
}

// send data, No special
// 用Tcp流发送数据,也没啥好说的
pub fn sender_thread(sender_: Arc<Mutex<TcpStream>>, thread_num: i32) {
    loop {
        let mut a = String::from("");
        io::stdin().read_line(&mut a).expect("Can't read for some_reason");
        {
            let mut sender_lock = sender_.lock().unwrap();
            // 连接中断回收线程
            match (*sender_lock).write(a.as_bytes()) {
                Ok(_) => {}
                Err(_) => {
                    println!("This is a new connection, Please resend your information");
                    return ();
                }
            }
            println!("Send data");
        }
    }
}
# Python主要负责轻量客户端,很简单, main.py
import sys
import socket
from multiprocessing import Process, Queue
from request_user import *
from socket_setting import *

self_name = ""


class send_to_server(Process):
    def __init__(self, socket, queue):
        super().__init__()
        # 创建socket
        self.socket = socket
        self.queue = queue

    def send_message(self, message):
        self.socket.send(message.encode('ascii'))

    def close(self):
        self.socket.close()
        return super().close()

    def run(self):
        while 1:
            p = self.queue.get()
            self.send_message(p)


class listen_self(Process):
    def __init__(self, socket, listen_queue):
        super().__init__()
        # 创建socket
        self.socket = socket

    def listen(self):
        listen__ = self.socket.recv(1024)
        listen__ = listen__.decode('ascii').replace('\n', '')
        print(listen__)

    # def close(self):
    #     self.sender_server.close()
    def run(self):
        while 1:
            self.listen()

    def close(self):
        self.socket.close()
        return super().close()


def login():
    global self_name
    self_name = input("请输入你要登录的用户名,第一次登录会为您注册")
    host_ip = socket.gethostbyname(socket.gethostname())
    # port一般来说是应用直接指定的
    # host_ip = '127.0.0.1'
    port = default_port
    # port = 3706
    print('默认为您选择打开' + str(port) + '端口')
    addr_src = host_ip + ':' + str(port)
    name = 'give ' + addr_src + ' ' + self_name
    data = get_attr_from_server(name)
    if data == 'Success_receive':
        ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        ss.bind((str(host_ip), port))
        ss.listen(1)
        connection_, addr_ = ss.accept()
        while 1:
            buf = connection_.recv(128).decode('GBK')
            if 'close_' in buf:
                buf = buf.split('\n')
                for k in buf:
                    if k != 'close_':
                        print(k)
                connection_.close()
                ss.close()
                break
            print(buf)
        return True
    else:
        return False


def get_usr_ipaddr(name):
    type = 'request '
    return get_attr_from_server(type + name)


def connect_to_some():
    name = input('请选择需要交流的用户名')
    p = get_usr_ipaddr(name)
    while ':' not in p:
        print(p)
        name = input('用户不存在, 请选择另一个需要交流的用户名')
        p = get_usr_ipaddr(name)
    [ip, port] = p.split(':')
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        time.sleep(1)
        s.connect((ip, eval(port)))
        print('对方在线,可直接进行沟通')
        return s, True
    except:
        print("对方暂不在线,启动留言模式,将数据留言到服务器上,用户上线时会收到留言")
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.connect(tcp_server)
        global self_name
        data = name + ' ' + self_name
        s.sendto(data.encode('gbk'), tcp_server)
        return s, False


import time

# 先登录,之后判断有没有留言,有留言直接接收到,之后选择你要连线聊天的用户,判断是否在线,不在线留言,在线直接聊天
def main():
    p = login()
    while p is False:
        print("登录失败,请重新登录")
        p = login()
    while 1:
        socket, online_judge = connect_to_some()
        send_queue = Queue(50)
        send_ = send_to_server(socket, send_queue)
        if online_judge:
            listen_queue = Queue(50)
            listen_ = listen_self(socket, listen_queue)
            listen_.start()
        send_.start()
        print("输入exit将退出本次会话, 输入ctrl + c 将强制退出程序")
        while 1:
            time.sleep(1)
            h = input("")
            if h == 'exit':
                break
            send_queue.put(h)
        k = input("是否退出?")
        if k == 'exit':
            send_.socket.close()
            send_.terminate()
            try:
                listen_.socket.close()
                listen_.terminate()
            except:
                pass
            return

if __name__ == '__main__':
    main()

    
##request_user.py
import socket
from socket_setting import *


def get_attr_from_server(user_name):
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_socket.bind(('127.0.0.1', 4886))
    data = user_name
    data = data.encode('gbk')
    udp_socket.sendto(data, udp_server)
    data_ = udp_socket.recv(1024)
    data_ = data_.decode('ascii')
    udp_socket.close()
    return data_

##socket_setting.py
udp_server = ('127.0.0.1', 7978)
tcp_server = ('127.0.0.1', 8008)
test_online_user_server = ('127.0.0.1', 7878)
default_port = 3705

运行的样子:左边Rust 右边Python

留言功能:

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/797277.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-06
下一篇 2022-05-06

发表评论

登录后才能评论

评论列表(0条)

保存