Netty知识点汇总

前言

本篇博客主要是自己梳理了一些Netty的技术点,通过梳理可以夯实自己的知识框架

【云原生 | 中间件】Netty是干什么的

Netty 是一个异步事件驱动的网络应用框架,用于开发高性能、高可靠性的网络 IO 程序。
Netty 主要用于:

  1. 服务器和客户端的开发:用于TCP、UDP的Socket服务器/客户端开发,HTTP服务器开发等。
  2. 例如可以用Netty开发构建一个自定义的HTTP服务器,实现对HTTP请求的处理。
  3. 远程过程调用(RPC)框架的开发:像Dubbo框架就使用Netty来构建其通信模块。
  4. 消息推送系统的开发:比如使用Netty来开发一个推送服务,用于推送消息给客户端。
    Netty的主要功能和特点:
  5. 程序开发简单,API友好,大大简化了TCP和UDPsockets编程。
  6. 高性能,吞吐量高,延时低。Netty基于异步非阻塞IO模型开发。
  7. 稳定,使用广泛,有许多知名公司和开源项目都是基于Netty开发的。
  8. 支持多种协议:TCP、UDP、HTTP、WebSocket等。
  9. 具有较好的拓展性,可以很容易扩展出新的协议。
  10. 成熟的生态圈,有许多开源项目构建在Netty之上。
    所以简单来说,Netty是一个异步事件驱动的网络应用框架,用来快速开发高性能、高可靠性的网络IO程序,如HTTP服务、RPC框架等。它极大地简化和流行了TCP和UDPSOCKET编程。

    搭建第一个Netty服务器

    这里我们搭建一个简单的Netty Echo服务器。服务端接收客户端发来的消息,并直接发送回复。
    服务器端:

    public class EchoServer {
    public void start() throws Exception {
        // 配置服务端NIO线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(); 
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // Netty服务端启动器
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)  // 设置线程组
             .channel(NioServerSocketChannel.class) // 设置服务端通道的类型
             .localAddress(8080)   // 服务端监听端口 
             .childHandler(new ChannelInitializer<SocketChannel>() {  
                 // 自定义ChannelHandler处理器
                 @Override  
                 public void initChannel(SocketChannel ch) throws Exception {  
                     ch.pipeline().addLast(new EchoServerHandler());  
                 }  
             });  
            // 启动服务端
            ChannelFuture f = b.bind().sync(); 
            f.channel().closeFuture().sync();
        } finally {
            // 优雅关闭线程组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    }

    EchoServerHandler 处理器:

    public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.write(msg);      // 发送数据回客户端
        ctx.flush();        // 刷新
    }
    } 

    客户端:

    public class EchoClient { 
    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap(); 
            b.group(group)    
             .channel(NioSocketChannel.class)  
             .remoteAddress(8080)   // 连接服务端地址和端口
             .handler(new ChannelInitializer<SocketChannel>() {  
                 @Override  
                 public void initChannel(SocketChannel ch) throws Exception {  
                     ch.pipeline().addLast(new EchoClientHandler());  
                 }  
             });
            ChannelFuture f = b.connect().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
    }

    EchoClientHandler:

    public class EchoClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception { 
        ctx.writeAndFlush("Netty rocks!");    // 发送数据到服务端
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("Received message: " + msg);
    }
    } 

    启动服务器和客户端,就可以看到客户端发送"Netty rocks!"消息,服务端返回相同消息。

    使用Netty开发聊天室功能

  11. 服务端CHANNEL初始化,添加CHATSERVERHANDLER处理器。
    b.childHandler(new ChannelInitializer<SocketChannel>() {  
    @Override
    public void initChannel(SocketChannel ch) throws Exception {  
        ch.pipeline().addLast(new ChatServerHandler());
    }
    });
  12. ChatServerHandler处理器继承ChannelInboundHandlerAdapter,负责读取客户端发送的消息,并转发给所有客户端。
    public class ChatServerHandler extends ChannelInboundHandlerAdapter {
    // 客户端消息集合
    private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 
        // 当有新的客户端接入时,把新的channel加入到clients
        clients.add(ctx.channel());
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 读取客户端发送的消息,并转发给所有客户端
        clients.writeAndFlush(msg);
    }
    }
  13. 客户端CHANNEL初始化,添加ChatClientHandler处理器。
    b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new ChatClientHandler());
    }
    });
  14. ChatClientHandler处理器继承SimpleChannelInboundHandler,负责读取服务端转发的消息并打印。同时发送消息给服务端。
    public class ChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // 读取服务端传回的消息并打印
        System.out.println(msg); 
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 向服务端发送消息
        ctx.writeAndFlush("Hello from " + InetAddress.getLocalHost().getHostName());
    } 
    }
  15. 启动服务器和多个客户端,就可以在各个客户端进行聊天了。
    这样一个简单的Netty聊天室就完成了。我们使用Netty的ChannelGroup来将所有客户端的channel集中管理,然后就可以 very 方便的群发消息给所有的客户端。

    使用Netty远程传输文件

  16. 文件传输协议设计。我们使用自定义的文件传输协议,由文件名长度、文件名、文件长度、文件内容四部分构成。
    |文件名长度|文件名|文件长度|文件内容|
  17. 服务端初始化,添加FileServerHandler处理器。
    b.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new FileServerHandler());
    } 
    });
  18. FileServerHandler负责解析客户端发来的文件传输请求,读取文件并发送给客户端。
    public class FileServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private static final String FILE_PATH = "/Users/zcw/file/";
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        // 解析文件传输请求协议
        // 1. 文件名长度
        int fileNameLength = msg.readInt();
        // 2. 文件名
        byte[] fileName = new byte[fileNameLength];
        msg.readBytes(fileName);
        // 3. 文件长度
        int fileLength = msg.readInt();  
        //读取本地文件,发送给客户端
        RandomAccessFile raf = new RandomAccessFile(FILE_PATH + new String(fileName), "r");
        raf.seek(raf.length());
        FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, fileLength);
        ctx.writeAndFlush(region);
    }  
    }
  19. 客户端初始化,添加FileClientHandler处理器。
    b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new FileClientHandler());
    }
    }); 
  20. FileClientHandler负责构建文件传输请求,接受服务端发送的文件内容并保存。

    public class FileClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
    private static final String FILE_PATH = "/Users/zcw/download/";
    private static final String FILE_NAME = "test.txt";
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 构建文件传输请求协议
        ByteBuf req = ctx.alloc().buffer();
        req.writeInt(FILE_NAME.length());     //文件名长度
        req.writeBytes(FILE_NAME.getBytes()); //文件名
        req.writeInt((int) new File(FILE_PATH + FILE_NAME).length()); //文件总长度
        ctx.writeAndFlush(req);
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {      
        //将接收到的文件内容写入到本地文件中
        FileOutputStream fos = new FileOutputStream(FILE_PATH + FILE_NAME);
        fos.getChannel().transferFrom(msg.nioBuffer(), 0, msg.readableBytes());
        fos.close();
    }
    }
  21. 启动服务器和客户端,就可以实现文件从服务器到客户端的传输了。
    这样一个基于Netty的简单文件传输示例就完成了。我们使用自定义的文件传输协议,在服务端读取本地文件并发送,客户端接收文件内容并写入本地,实现远程文件传输。

    使用Netty实现心跳检测机制

    在Netty中,可以使用IdleStateHandler和IdleStateEvent实现心跳检测机制。主要步骤如下:

  22. 服务端和客户端CHANNEL初始化都添加IdleStateHandler处理器。
    ch.pipeline().addLast(new IdleStateHandler(0, 0, 10)); // 10秒无数据包触发事件 
  23. 服务端和客户端都添加IdleStateEvent处理器,用于监听IdleStateHandler产生的IdleState事件。
    ch.pipeline().addLast(new IdleStateHandler(0, 0, 10));
    ch.pipeline().addLast(new IdleStateEventDemoHandler());
  24. IdleStateEventDemoHandler处理器继承ChannelDuplexHandler,并实现userEventTriggered方法。在该方法中检查是否是IdleStateEvent事件,如果是则发送心跳包。
    public class IdleStateEventDemoHandler extends ChannelDuplexHandler {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            //发送心跳包
            ctx.writeAndFlush(Unpooled.wrappedBuffer("heartbeat".getBytes()));
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
    }
  25. 启动服务器和客户端,当10秒内没有收发数据包时,各自的IdleStateEventDemoHandler会发送心跳包,这样就实现了心跳检测机制。
    如果一定时间没有接收到心跳包,说明连接异常已断开,这时就可以关闭CONNECTION,释放资源。
    所以利用Netty的IdleStateHandler和IdleStateEvent,很容易实现客户端和服务端的心跳检测机制。当连接一定时间无数据通信时,定期发送心跳包以检测连接是否正常。
    一个完整的Netty心跳检测示例如下:
    服务端:

    b.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new IdleStateHandler(0, 0, 10));  // 10s无数据触发
        ch.pipeline().addLast(new IdleStateEventDemoHandler());
        ch.pipeline().addLast(new ChatServerHandler());
    }
    });

    客户端:

    b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new IdleStateHandler(0, 0, 10));  
        ch.pipeline().addLast(new IdleStateEventDemoHandler());
        ch.pipeline().addLast(new ChatClientHandler());
    }  
    });

    IdleStateEventDemoHandler:

    public class IdleStateEventDemoHandler extends ChannelDuplexHandler {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            ctx.writeAndFlush(Unpooled.wrappedBuffer("Heartbeat".getBytes()));
        } else {
            super.userEventTriggered(ctx, evt);  
        }
    }
    }
Netty知识点汇总

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注

滚动到顶部