前言
本篇博客主要是自己梳理了一些Netty的技术点,通过梳理可以夯实自己的知识框架
【云原生 | 中间件】Netty是干什么的
Netty 是一个异步事件驱动的网络应用框架,用于开发高性能、高可靠性的网络 IO 程序。
Netty 主要用于:
- 服务器和客户端的开发:用于TCP、UDP的Socket服务器/客户端开发,HTTP服务器开发等。
- 例如可以用Netty开发构建一个自定义的HTTP服务器,实现对HTTP请求的处理。
- 远程过程调用(RPC)框架的开发:像Dubbo框架就使用Netty来构建其通信模块。
- 消息推送系统的开发:比如使用Netty来开发一个推送服务,用于推送消息给客户端。
Netty的主要功能和特点: - 程序开发简单,API友好,大大简化了TCP和UDPsockets编程。
- 高性能,吞吐量高,延时低。Netty基于异步非阻塞IO模型开发。
- 稳定,使用广泛,有许多知名公司和开源项目都是基于Netty开发的。
- 支持多种协议:TCP、UDP、HTTP、WebSocket等。
- 具有较好的拓展性,可以很容易扩展出新的协议。
- 成熟的生态圈,有许多开源项目构建在Netty之上。
所以简单来说,Netty是一个异步事件驱动的网络应用框架,用来快速开发高性能、高可靠性的网络IO程序,如HTTP服务、RPC框架等。它极大地简化和流行了TCP和UDPSOCKET编程。搭建第一个Netty服务器
这里我们搭建一个简单的Netty Echo服务器。服务端接收客户端发来的消息,并直接发送回复。
服务器端:public class EchoServer { public void start() throws Exception { // 配置服务端NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // Netty服务端启动器 ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) // 设置线程组 .channel(NioServerSocketChannel.class) // 设置服务端通道的类型 .localAddress(8080) // 服务端监听端口 .childHandler(new ChannelInitializer<SocketChannel>() { // 自定义ChannelHandler处理器 @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoServerHandler()); } }); // 启动服务端 ChannelFuture f = b.bind().sync(); f.channel().closeFuture().sync(); } finally { // 优雅关闭线程组 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
EchoServerHandler 处理器:
public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.write(msg); // 发送数据回客户端 ctx.flush(); // 刷新 } }
客户端:
public class EchoClient { public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .remoteAddress(8080) // 连接服务端地址和端口 .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
EchoClientHandler:
public class EchoClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush("Netty rocks!"); // 发送数据到服务端 } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Received message: " + msg); } }
启动服务器和客户端,就可以看到客户端发送"Netty rocks!"消息,服务端返回相同消息。
使用Netty开发聊天室功能
- 服务端CHANNEL初始化,添加CHATSERVERHANDLER处理器。
b.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ChatServerHandler()); } });
- ChatServerHandler处理器继承ChannelInboundHandlerAdapter,负责读取客户端发送的消息,并转发给所有客户端。
public class ChatServerHandler extends ChannelInboundHandlerAdapter { // 客户端消息集合 private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // 当有新的客户端接入时,把新的channel加入到clients clients.add(ctx.channel()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 读取客户端发送的消息,并转发给所有客户端 clients.writeAndFlush(msg); } }
- 客户端CHANNEL初始化,添加ChatClientHandler处理器。
b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ChatClientHandler()); } });
- ChatClientHandler处理器继承SimpleChannelInboundHandler,负责读取服务端转发的消息并打印。同时发送消息给服务端。
public class ChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { // 读取服务端传回的消息并打印 System.out.println(msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 向服务端发送消息 ctx.writeAndFlush("Hello from " + InetAddress.getLocalHost().getHostName()); } }
- 启动服务器和多个客户端,就可以在各个客户端进行聊天了。
这样一个简单的Netty聊天室就完成了。我们使用Netty的ChannelGroup来将所有客户端的channel集中管理,然后就可以 very 方便的群发消息给所有的客户端。使用Netty远程传输文件
- 文件传输协议设计。我们使用自定义的文件传输协议,由文件名长度、文件名、文件长度、文件内容四部分构成。
|文件名长度|文件名|文件长度|文件内容|
- 服务端初始化,添加FileServerHandler处理器。
b.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new FileServerHandler()); } });
- FileServerHandler负责解析客户端发来的文件传输请求,读取文件并发送给客户端。
public class FileServerHandler extends SimpleChannelInboundHandler<ByteBuf> { private static final String FILE_PATH = "/Users/zcw/file/"; @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { // 解析文件传输请求协议 // 1. 文件名长度 int fileNameLength = msg.readInt(); // 2. 文件名 byte[] fileName = new byte[fileNameLength]; msg.readBytes(fileName); // 3. 文件长度 int fileLength = msg.readInt(); //读取本地文件,发送给客户端 RandomAccessFile raf = new RandomAccessFile(FILE_PATH + new String(fileName), "r"); raf.seek(raf.length()); FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, fileLength); ctx.writeAndFlush(region); } }
- 客户端初始化,添加FileClientHandler处理器。
b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new FileClientHandler()); } });
-
FileClientHandler负责构建文件传输请求,接受服务端发送的文件内容并保存。
public class FileClientHandler extends SimpleChannelInboundHandler<ByteBuf> { private static final String FILE_PATH = "/Users/zcw/download/"; private static final String FILE_NAME = "test.txt"; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 构建文件传输请求协议 ByteBuf req = ctx.alloc().buffer(); req.writeInt(FILE_NAME.length()); //文件名长度 req.writeBytes(FILE_NAME.getBytes()); //文件名 req.writeInt((int) new File(FILE_PATH + FILE_NAME).length()); //文件总长度 ctx.writeAndFlush(req); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { //将接收到的文件内容写入到本地文件中 FileOutputStream fos = new FileOutputStream(FILE_PATH + FILE_NAME); fos.getChannel().transferFrom(msg.nioBuffer(), 0, msg.readableBytes()); fos.close(); } }
- 启动服务器和客户端,就可以实现文件从服务器到客户端的传输了。
这样一个基于Netty的简单文件传输示例就完成了。我们使用自定义的文件传输协议,在服务端读取本地文件并发送,客户端接收文件内容并写入本地,实现远程文件传输。使用Netty实现心跳检测机制
在Netty中,可以使用IdleStateHandler和IdleStateEvent实现心跳检测机制。主要步骤如下:
- 服务端和客户端CHANNEL初始化都添加IdleStateHandler处理器。
ch.pipeline().addLast(new IdleStateHandler(0, 0, 10)); // 10秒无数据包触发事件
- 服务端和客户端都添加IdleStateEvent处理器,用于监听IdleStateHandler产生的IdleState事件。
ch.pipeline().addLast(new IdleStateHandler(0, 0, 10)); ch.pipeline().addLast(new IdleStateEventDemoHandler());
- IdleStateEventDemoHandler处理器继承ChannelDuplexHandler,并实现userEventTriggered方法。在该方法中检查是否是IdleStateEvent事件,如果是则发送心跳包。
public class IdleStateEventDemoHandler extends ChannelDuplexHandler { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { //发送心跳包 ctx.writeAndFlush(Unpooled.wrappedBuffer("heartbeat".getBytes())); } else { super.userEventTriggered(ctx, evt); } } }
- 启动服务器和客户端,当10秒内没有收发数据包时,各自的IdleStateEventDemoHandler会发送心跳包,这样就实现了心跳检测机制。
如果一定时间没有接收到心跳包,说明连接异常已断开,这时就可以关闭CONNECTION,释放资源。
所以利用Netty的IdleStateHandler和IdleStateEvent,很容易实现客户端和服务端的心跳检测机制。当连接一定时间无数据通信时,定期发送心跳包以检测连接是否正常。
一个完整的Netty心跳检测示例如下:
服务端:b.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler(0, 0, 10)); // 10s无数据触发 ch.pipeline().addLast(new IdleStateEventDemoHandler()); ch.pipeline().addLast(new ChatServerHandler()); } });
客户端:
b.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new IdleStateHandler(0, 0, 10)); ch.pipeline().addLast(new IdleStateEventDemoHandler()); ch.pipeline().addLast(new ChatClientHandler()); } });
IdleStateEventDemoHandler:
public class IdleStateEventDemoHandler extends ChannelDuplexHandler { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { ctx.writeAndFlush(Unpooled.wrappedBuffer("Heartbeat".getBytes())); } else { super.userEventTriggered(ctx, evt); } } }
Netty知识点汇总