Java网络编程进化史:从IO到NIO再到Netty

Java网络编程进化史:从IO到NIO再到Netty,第1张

Java网络编程进化史:从IO到NIO再到Netty

目录
  • 基于IO、NIO、Netty的Java网络程序
    • IO
    • NIO
    • Netty
    • 基于Web的聊天室(Springboot+netty)
  • 动态网页的信息爬取
    • 自动填充百度网页的查询关键字,完成自动搜索
    • 爬取一个动态网页的数据
    • 爬取京东网站上的感兴趣书籍信息(如关键字“python编程”的前200本图书),并保存
  • 参考

基于IO、NIO、Netty的Java网络程序 IO
public class Main {

    public static void main(String[] args) throws IOException {
	// write your code here
        //创建客户端的Socket对象(SevereSocket)
        //ServerSocket (int port)创建绑定到指定端口的服务器套接字
        ServerSocket ss=new ServerSocket(50000);

        //Socket accept()侦听要连接到此套接字并接受他
        Socket s=ss.accept();

        //获取输入流,读数据,并把数据显示在控制台
        InputStream is=s.getInputStream();
        byte[] bys=new byte[1024];
        int len=is.read(bys);
        String data=new String(bys,0,len);
        System.out.println("数据是:"+data);

        //释放资源
        s.close();
        ss.close();
    }
}

public class Main {

    public static void main(String[] args) throws IOException {
	// write your code here
        //创建客户端的Socket对象
        Socket s=new Socket("10.160.89.70", 50000);

        //获取输出流,写数据
        OutputStream os=s.getOutputStream();
        os.write("hello,物联网19级".getBytes());

        //释放资源
        s.close();
    }
}


NIO
public class Main {
    //网络通信IO *** 作,TCP协议,针对面向流的监听套接字的可选择通道(一般用于服务端)
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;

    
    public void start(Integer port) throws Exception {
        serverSocketChannel = ServerSocketChannel.open();
        selector = Selector.open();
        //绑定监听端口
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        //设置为非阻塞模式
        serverSocketChannel.configureBlocking(false);
        //注册到Selector上
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        startListener();
    }
    private void startListener() throws Exception {
        while (true) {
            // 如果客户端有请求select的方法返回值将不为零
            if (selector.select(1000) == 0) {
                System.out.println("当前没有任务!!!");
                continue;
            }
            // 如果有事件集合中就存在对应通道的key
            Set selectionKeys = selector.selectedKeys();
            Iterator iterator = selectionKeys.iterator();
            // 遍历所有的key找到其中事件类型为Accept的key
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                if (key.isAcceptable())
                    handleConnection();
                if (key.isReadable())
                    handleMsg(key);
                iterator.remove();
            }
        }
    }
    
    private void handleConnection() throws Exception {
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
    }
    
    private void handleMsg(SelectionKey key) throws Exception {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer attachment = (ByteBuffer) key.attachment();
        channel.read(attachment);
        System.out.println("当前信息: " + new String(attachment.array()));
    }

    public static void main(String[] args) throws Exception {
	// write your code here
        Main Server = new Main();
        Server.start(8080);
    }
}


public class Main {

    public static void main(String[] args) throws IOException {
	// write your code here
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);

        // 连接服务器
        if (!socketChannel.connect(new InetSocketAddress("10.160.89.70", 8080))) {
            while (!socketChannel.finishConnect()) {
                System.out.println("connecting...");
            }
        }
        //发送数据
        String str = "hello,物联网19级";
        ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes());
        socketChannel.write(byteBuffer);
        System.in.read();
    }
}


Netty
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.net.InetSocketAddress;

public class Main {

    private int port=8080;

    public static void main(String[] args){
        new Main().start();
    }

    public void start() {
        
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup work = new NioEventLoopGroup();
        try {
            
            ServerBootstrap server = new ServerBootstrap()
                    .group(boss, work).channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast("decoder", new StringDecoder())
                                    .addLast("encoder", new StringEncoder())
                                    .addLast(new HelloWorldServerHandler());
                        }
                    });
            //绑定端口
            ChannelFuture future = server.bind().sync();
            System.out.println("server started and listen " + port);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }

    public static class HelloWorldServerHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("HelloWorldServerHandler active");
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("server channelRead..");
            System.out.println(ctx.channel().remoteAddress()+"->Server :"+ msg.toString());
            ctx.write("server write"+msg);
            ctx.flush();
        }
    }
}


import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class Main {
    private static final String HOST = "localhost";
    private static final int PORT= 8080;

    public static void main(String[] args){
        new Main().start(HOST, PORT);
    }

    public void start(String host, int port) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap client = new Bootstrap().group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast("decoder", new StringDecoder())
                                    .addLast("encoder", new StringEncoder())
                                    .addLast(new HelloWorldClientHandler());
                        }
                    });
            ChannelFuture future = client.connect(host, port).sync();
            future.channel().writeAndFlush("Hello Netty Server ,I am a netty client");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();

        }

    }

    public static class HelloWorldClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("HelloWorldClientHandler Active");
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("HelloWorldClientHandler read Message:"+msg);
        }
    }
}


基于Web的聊天室(Springboot+netty)

主要代码:

  • MyApplication
package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.Environment;

import java.net.InetAddress;
import java.net.UnknownHostException;

@SpringBootApplication
public class MyApplication {

    public static void main(String[] args) throws UnknownHostException {

        ConfigurableApplicationContext application = SpringApplication.run(Demo1Application.class, args);
        Environment env = application.getEnvironment();
        String host = InetAddress.getLocalHost().getHostAddress();
        String port = env.getProperty("server.port");
        System.out.println("[----------------------------------------------------------]");
        System.out.println("聊天室启动成功!点击进入:t http://" + host + ":" + port);
        System.out.println("[----------------------------------------------------------");
        WebSocketServer.inst().run(53134);
    }

}


  • SessionGroup
package com.example.demo;

import com.google.gson.Gson;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketframe;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.springframework.util.StringUtils;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public final class SessionGroup {

    private static SessionGroup singleInstance = new SessionGroup();

    // 组的映射
    private ConcurrentHashMap groupMap = new ConcurrentHashMap<>();

    public static SessionGroup inst() {
        return singleInstance;
    }

    public void shutdownGracefully() {

        Iterator groupIterator = groupMap.values().iterator();
        while (groupIterator.hasNext()) {
            ChannelGroup group = groupIterator.next();
            group.close();
        }
    }

    public void sendToOthers(Map result, SocketSession s) {
        // 获取组
        ChannelGroup group = groupMap.get(s.getGroup());
        if (null == group) {
            return;
        }
        Gson gson=new Gson();
        String json = gson.toJson(result);
        // 自己发送的消息不返回给自己
//      Channel channel = s.getChannel();
        // 从组中移除通道
//      group.remove(channel);
        ChannelGroupFuture future = group.writeAndFlush(new TextWebSocketframe(json));
        future.addListener(f -> {
            System.out.println("完成发送:"+json);
//          group.add(channel);//发送消息完毕重新添加。

        });
    }

    public void addSession(SocketSession session) {

        String groupName = session.getGroup();
        if (StringUtils.isEmpty(groupName)) {
            // 组为空,直接返回
            return;
        }
        ChannelGroup group = groupMap.get(groupName);
        if (null == group) {
            group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
            groupMap.put(groupName, group);
        }
        group.add(session.getChannel());
    }

    
    public void closeSession(SocketSession session, String echo) {
        ChannelFuture sendFuture = session.getChannel().writeAndFlush(new TextWebSocketframe(echo));
        sendFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                System.out.println("关闭连接:"+echo);
                future.channel().close();
            }
        });
    }

    
    public void closeSession(SocketSession session) {

        ChannelFuture sendFuture = session.getChannel().close();
        sendFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                System.out.println("发送所有完成:"+session.getUser().getNickname());
            }
        });

    }

    
    public void sendMsg(ChannelHandlerContext ctx, String msg) {
        ChannelFuture sendFuture = ctx.writeAndFlush(new TextWebSocketframe(msg));
        sendFuture.addListener(f -> {//发送监听
            System.out.println("对所有发送完成:"+msg);
        });
    }
}


  • SocketSession
package com.example.demo;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class SocketSession {

    public static final AttributeKey SESSION_KEY = AttributeKey.valueOf("SESSION_KEY");

    
// 通道
    private Channel channel;
    // 用户
    private User user;

    // session唯一标示
    private final String sessionId;

    private String group;

    
    private Map map = new HashMap();

    public SocketSession(Channel channel) {//注意传入参数channel。不同客户端会有不同channel
        this.channel = channel;
        this.sessionId = buildNewSessionId();
        channel.attr(SocketSession.SESSION_KEY).set(this);
    }

    // 反向导航
    public static SocketSession getSession(ChannelHandlerContext ctx) {//注意ctx,不同的客户端会有不同ctx
        Channel channel = ctx.channel();
        return channel.attr(SocketSession.SESSION_KEY).get();
    }

    // 反向导航
    public static SocketSession getSession(Channel channel) {
        return channel.attr(SocketSession.SESSION_KEY).get();
    }

    public String getId() {
        return sessionId;
    }

    private static String buildNewSessionId() {
        String uuid = UUID.randomUUID().toString();
        return uuid.replaceAll("-", "");
    }

    public synchronized void set(String key, Object value) {
        map.put(key, value);
    }

    public synchronized  T get(String key) {
        return (T) map.get(key);
    }

    public boolean isValid() {
        return getUser() != null ? true : false;
    }

    public User getUser() {
        return user;
    }

    public void setUser(User user) {
        this.user = user;
    }

    public String getGroup() {
        return group;
    }

    public void setGroup(String group) {
        this.group = group;
    }

    public Channel getChannel() {
        return channel;
    }
}

动态网页的信息爬取 自动填充百度网页的查询关键字,完成自动搜索
from selenium import webdriver

driver = webdriver.Chrome(r"C:/Program Files/Google/Chrome/Application/chromedriver.exe")
# 进入网页
driver.get("https://www.baidu.com/")

# noinspection PyDeprecation
# 填充搜索框
search = driver.find_element_by_id("kw")
search.send_keys("重庆交通大学")

# noinspection PyDeprecation
# 模拟点击
send_button = driver.find_element_by_id("su")
send_button.click()

爬取一个动态网页的数据
from selenium import webdriver

driver = webdriver.Chrome(r"C:/Program Files/Google/Chrome/Application/chromedriver.exe")
# 名言所在网站
driver.get("https://quotes.toscrape.com/js/")
# 表头
csvHeaders = ['作者', '名言']
# 所有数据
subjects = []
# 单个数据
subject = []
# 获取所有含有quote的标签
res_list = driver.find_elements_by_class_name("quote")

# 分离出需要的内容
for tmp in res_list:
    subject.append(tmp.find_element_by_class_name("author").text)
    subject.append(tmp.find_element_by_class_name("text").text)
    print(subject)
    subjects.append(subject)
    subject = []

爬取京东网站上的感兴趣书籍信息(如关键字“python编程”的前200本图书),并保存
import csv
import time

from selenium import webdriver

driver = webdriver.Chrome(r"C:/Program Files/Google/Chrome/Application/chromedriver.exe")
driver.set_window_size(1920, 1080)
# 京东网站
driver.get("https://www.jd.com/")

# 输入需要查找的关键字
key = driver.find_element_by_id("key").send_keys("python编程")
time.sleep(1)

# 点击搜素按钮
button = driver.find_element_by_class_name("button").click()
time.sleep(1)

# 获取所有窗口
windows = driver.window_handles
# 切换到最新的窗口
driver.switch_to.window(windows[-1])
time.sleep(1)

# js语句
js = 'return document.body.scrollHeight'
# 获取body高度
max_height = driver.execute_script(js)
max_height = (int(max_height / 1000)) * 1000
# 当前滚动条高度
tmp_height = 1000
# 所有书籍的字典
res_dict = {}

# 需要爬取的数量
num = 200
while len(res_dict) < num:
    # 当切换网页后重新设置高度
    tmp_height = 1000
    while tmp_height < max_height:
        # 向下滑动
        js = "window.scrollBy(0,1000)"
        driver.execute_script(js)
        tmp_height += 1000

        # 书籍列表
        J_goodsList = driver.find_element_by_id("J_goodsList")
        ul = J_goodsList.find_element_by_tag_name("ul")
        # 所有书籍
        res_list = ul.find_elements_by_tag_name("li")
        # 把没有记录过的书籍加入字典
        for res in res_list:
            # 以书名为键,价格为值
            # 两种方式获取指定标签值
            res_dict[res.find_element_by_class_name('p-name').find_element_by_tag_name('em').text] 
                = res.find_element_by_xpath("//div[@class='p-price']//i").text
            if len(res_dict) == num:
                break
        time.sleep(2)
        if len(res_dict) == num:
            break
    # 下一页按钮所在父标签
    J_bottomPage = driver.find_element_by_id("J_bottomPage")
    # 下一页按钮
    next_button = J_bottomPage.find_element_by_class_name("pn-next").click()
    # 切换窗口
    windows = driver.window_handles
    driver.switch_to.window(windows[-1])
    time.sleep(3)

# 表头
csvHeaders = ['书名', '价格']
# 所有书籍
csvRows = []
# 书籍
row = []

# 字典转列表
for key, value in res_dict.items():
    row.append(key)
    row.append(value)
    csvRows.append(row)
    row = []
# 保存爬取结果
with open('./jd_books.csv', 'w', newline='') as file:
    fileWriter = csv.writer(file)
    fileWriter.writerow(csvHeaders)
    fileWriter.writerows(csvRows)

参考

https://blog.csdn.net/qq_45659777/article/details/121730888
https://blog.csdn.net/m0_51120713/article/details/121855991

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5678105.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存