Netty 核心原理与高性能网络编程 ⭐⭐
🎯 面试题:Netty 为什么性能这么高?它解决了 NIO 的哪些问题?
Netty 是一个异步事件驱动的网络应用框架,广泛用于 RPC 框架(Dubbo、gRPC)、即时通讯、游戏服务器等场景。面试中常考察线程模型、粘包半包问题、零拷贝等核心原理。
一、为什么需要 Netty?
原生 NIO 的三大痛点
❌ 痛点一:API 复杂难用
Selector.select() / SelectionKey 体系过于底层
需要手动管理多个 Channel,处理各种事件
❌ 痛点二:粘包半包问题
TCP 是流式协议,不保留消息边界
一次 read() 可能读到半个包,也可能读到多个包
❌ 痛点三:线程模型不完善
Selector 的空轮询 bug(事件通知了但实际没有事件)
多线程下 Channel 的线程安全问题
Netty 对 NIO 进行了全面封装,提供简洁易用的 API,同时解决了上述所有问题。
二、线程模型演进
1. 阻塞 IO(BIO)
主线程: 主线程:
│ │
├─ accept() → 等待连接 ──────────────────────────→ │
│ │
├─ read() → 等待数据 ──────────────────────────→ │ ← 每个客户端一个线程
│ 读取数据 │ 线程资源浪费严重
├─ write() → 写回数据 ──────────────────────────→ │
│ │
└─ 回到 accept 继续等待下一个连接 ────────────────→ │
每个连接一个线程,线程资源严重浪费,无法支撑高并发。
2. 非阻塞 IO(NIO)
单个线程管理所有 Channel:
Selector ←── 监听所有 Channel 的事件
│
├── Channel A: OP_ACCEPT
├── Channel B: OP_READ ←── 有数据可读
├── Channel C: OP_WRITE ←── 可以写数据
└── Channel D: 无事件
优点:一个线程管理多个连接
缺点:API 复杂、粘包半包、线程安全、空轮询 bug
3. Netty 线程模型(Reactor)
┌──────────────────┐
│ BossGroup │ ← Boss线程:处理连接
│ (NioEventLoop) │ accept() → 新连接
└────────┬─────────┘
│
┌────────▼─────────┐
│ ChannelHandler │ ← 将连接注册到 WorkerGroup
│ (pipline) │
└────────┬─────────┘
│
┌──────────────┼──────────────┐
│ │ │
┌────────▼────┐ ┌──────▼─────┐ ┌─────▼──────┐
│ WorkerGroup │ │ WorkerGroup│ │ WorkerGroup│
│ Worker-1 │ │ Worker-2 │ │ Worker-N │
│ NioEventLoop│ │ NioEventLoop│ │ NioEventLoop│
└─────────────┘ └────────────┘ └────────────┘
│ │ │
处理读写 处理读写 处理读写
BossGroup:线程数 = CPU 核数,通常 1-2 个
WorkerGroup:线程数 = CPU 核数 × 2,处理所有 IO 事件
4. 线程模型核心概念
// Netty 启动代码
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 处理连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 默认 = CPU*2
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup) // 绑定线程组
.channel(NioServerSocketChannel.class) // NIO 实现
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline() // ChannelPipeline
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(65536))
.addLast(new MyBusinessHandler());
}
});
ChannelFuture f = bootstrap.bind(8080).sync();
f.channel().closeFuture().sync();
三、核心组件
Channel
// Channel 是连接的抽象,类似 NIO 的 SelectableChannel
// 常见类型:NioSocketChannel(客户端)、NioServerSocketChannel(服务端)
// OioSocketChannel(阻塞 BIO)、EpollSocketChannel(Linux Epoll)
Channel channel = ctx.channel();
channel.writeAndFlush(msg); // 写并发送
channel.close(); // 关闭连接
channel.isActive(); // 连接是否活跃
EventLoop 与 EventLoopGroup
EventLoop = Selector + TaskQueue(单线程)
EventLoopGroup = 多个 EventLoop
核心保证:
1. 一个 Channel 在其生命周期内始终由同一个 EventLoop 处理
2. 所有 ChannelHandler 的事件处理都在同一个线程中
3. 无需加锁,天然线程安全
// Boss EventLoop:处理 OP_ACCEPT
// Worker EventLoop:处理 OP_READ / OP_WRITE
EventLoopGroup group = new NioEventLoopGroup(4); // 4 个 EventLoop
// 处理耗时任务:不要阻塞 EventLoop!
// ❌ 错误:在 EventLoop 中执行耗时操作
channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
db.query(); // 数据库操作耗时,会阻塞 EventLoop
}
});
// ✅ 正确:提交到专门的业务线程池
channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.channel().eventLoop().parent().execute(() -> {
db.query(); // 在单独的线程池执行
});
}
});
ChannelPipeline 与 ChannelHandler
Pipeline 是责任链模式的应用:
入站事件(数据从网络到应用):
SocketChannel.read() → Pipeline.fireChannelRead()
→ Handler1.channelRead() → Handler2.channelRead() → ...
出站事件(数据从应用到网络):
ctx.writeAndFlush(msg)
→ Handler2.write() → Handler1.write() → SocketChannel.write()
每个 Handler 处理完可以决定:
- 传递给下一个 Handler(ctx.fireChannelRead() / ctx.write())
- 或者中断链条,不继续传递
// Inbound Handler(处理入站事件)
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到数据: " + buf.toString(StandardCharsets.UTF_8));
// 处理完后传递下去
ctx.fireChannelRead(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close(); // 发生异常关闭连接
}
}
// Outbound Handler(处理出站事件)
public class MyEncoder extends MessageToByteEncoder<String> {
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) {
out.writeBytes(msg.getBytes(StandardCharsets.UTF_8));
}
}
四、ByteBuf:Netty 的字节容器
ByteBuf vs ByteBuffer
| 特性 | ByteBuffer | ByteBuf |
|---|---|---|
| 长度 | 固定(创建时指定) | 自动扩容 |
| 读写指针 | 共用 position,flip() 切换 | 分离:readerIndex / writerIndex |
| 池化 | 无(JVM 直接分配) | 有(池化技术,减少 GC) |
| 引用计数 | 无 | 有(reference count,用于池回收) |
ByteBuf 结构
┌──────────────────────────┌───────────────┬────────────┐
│ 可丢弃字节 │ 可读字节 │ 可写字节 │
│ (discardable bytes) │ (readable) │ (writable) │
└──────────────────────────┴───────────────┴────────────┘
0 readerIndex writerIndex capacity
maxCapacity
读操作:readerIndex++(不移动数据)
写操作:writerIndex++(容量不够自动扩容)
池化与零拷贝
// ByteBuf 的三种分配方式
// 堆内存(Heap ByteBuf):分配在 JVM 堆上,GC 回收
ByteBuf heapBuf = Unpooled.buffer(1024);
// 直接内存(Direct ByteBuf):分配在堆外,操作系统内存
// 优点:Socket 读写时少一次内存拷贝
ByteBuf directBuf = Unpooled.directBuffer(1024);
// 池化堆内存(推荐):减少 GC 压力
ByteBuf pooledHeapBuf = PooledHeapByteBufAllocator.DEFAULT.buffer(1024);
// 零拷贝技术: CompositeByteBuf 组合多个 ByteBuf
ByteBuf buf1 = Unpooled.buffer(10);
ByteBuf buf2 = Unpooled.buffer(20);
CompositeByteBuf composite = Unpooled.compositeBuffer();
composite.addComponents(buf1, buf2); // 不拷贝数据,逻辑组合
引用计数
ByteBuf buf = ctx.alloc().buffer();
// ... 使用 buf ...
buf.release(); // 很重要!减少引用计数
// 内存泄漏检测(开发环境开启)
-Dio.netty.leakDetection.level=PARANOID
// 常见泄漏等级:
// DISABLED - 禁用
// SIMPLE - 采样 1% 检测
// ADVANCED - 全量检测(生产禁用,性能差)
// PARANOID - 全量 + 立即报告
五、粘包半包问题
什么是粘包半包?
粘包:多个消息粘在一起
发送:A、B、C → 接收:A、B、C(一次收到三个)
原因:Nagle 算法合并小包 / 接收方 read buffer 足够大
半包:一个消息被拆成多次接收
发送:A(很大) → 接收:A(前半段)、A(后半段)(分两次收到)
原因:TCP buffer 不够大,需要分片传输
自定义协议粘包:
发送:{"name":"Alice"}|{"name":"Bob"}
接收:{"name":"Alice"},{"name":"Bob"} ❌ 粘包
解决方案
解决方案一:固定长度
每个消息固定 N 字节,不够补空格
优点:简单
缺点:浪费带宽
解决方案二:固定分隔符(如 \n)
每个消息以换行符结尾
优点:简单直观
缺点:消息内容不能包含分隔符
解决方案三:LengthFieldBased(推荐)
消息格式:[长度字段(4字节)] + [消息内容]
先读长度,再根据长度读内容
Netty 中的解决方案
// LengthFieldBasedFrameDecoder 配置
// 消息格式:Header(2字节长度) + Body
pipeline.addLast(new LengthFieldBasedFrameDecoder(
1024, // 最大帧长度
0, // 长度字段偏移量
2, // 长度字段占的字节数
0, // 长度字段之后有多少字节要跳过
2 // 长度的调整值(lengthFieldOffset + lengthFieldLength + adjustedLengthFieldOffset)
));
// 服务端
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
// 粘包半包处理器(按长度字段解析)
.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4))
// 自定义解码器
.addLast(new MyDecoder())
// 业务 Handler
.addLast(new MyBusinessHandler());
}
});
bootstrap.bind(8080).sync().channel().closeFuture().sync();
}
}
六、编解码器
自定义协议设计
// 协议格式:[魔数(4)] [版本(1)] [类型(1)] [长度(4)] [内容(N)]
// |---- 12 字节头部 ----| |--- 变长内容 ---|
public class MyEncoder extends MessageToByteEncoder<MyMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, MyMessage msg, ByteBuf out) {
byte[] content = msg.getContent().getBytes(StandardCharsets.UTF_8);
out.writeInt(0x12345678) // 魔数
.writeByte(1) // 版本
.writeByte(msg.getType())
.writeInt(content.length) // 内容长度
.writeBytes(content); // 内容
}
}
public class MyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 必须有 12 字节才能解析头部
if (in.readableBytes() < 12) return;
in.markReaderIndex(); // 标记当前位置
int magic = in.readInt();
if (magic != 0x12345678) {
throw new IllegalArgumentException("非法魔数: " + magic);
}
byte version = in.readByte();
byte type = in.readByte();
int length = in.readInt();
if (in.readableBytes() < length) {
in.resetReaderIndex(); // 数据不够,重置
return;
}
byte[] content = new byte[length];
in.readBytes(content);
MyMessage msg = new MyMessage();
msg.setVersion(version);
msg.setType(type);
msg.setContent(new String(content, StandardCharsets.UTF_8));
out.add(msg); // 添加到输出列表
}
}
七、心跳检测
// IdleStateHandler:检测读写空闲
// 触发条件:超过指定时间没有读/写/读写操作
pipeline.addLast(new IdleStateHandler(
30, // 读空闲:30 秒没读到数据触发
0, // 写空闲:不检测
60 // 读写空闲:60 秒没有读或写触发
));
// 心跳处理器
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleEvent = (IdleStateEvent) evt;
if (idleEvent.state() == IdleState.READER_IDLE) {
System.out.println("读空闲,连接超时,关闭连接");
ctx.channel().close();
} else if (idleState.state() == IdleState.ALL_IDLE) {
System.out.println("读写空闲,发送心跳");
ctx.writeAndFlush(Unpooled.copiedBuffer("ping".getBytes()));
}
}
}
}
八、Netty 高性能的原因
1. 无锁化设计
→ EventLoop 单线程处理所有事件,不用加锁
→ 不同 Channel 分配给不同 EventLoop,天然隔离
2. 串行化设计
→ 每个 Channel 由固定的 EventLoop 处理
→ 避免了锁竞争,也保证了消息顺序
3. 零拷贝
→ DirectBuffer:减少一次 JVM 堆到操作系统内存的拷贝
→ CompositeByteBuf:多个 ByteBuf 逻辑组合,不需要拷贝
→ FileRegion:文件传输直接通过内核拷贝(sendfile)
4. 内存池化
→ 减少频繁的内存分配和 GC
→ PooledByteBufAllocator 默认使用
5. Reactor 线程模型
→ Boss 处理连接,Worker 处理 IO,充分利用多核
→ 事件驱动,非阻塞
6. 减少系统调用
→ Selector 批量处理事件
→ write() 批量写,减少 syscall
九、高频面试题
Q1: Netty 的线程模型是怎样的?
Netty 基于 Reactor 模式,采用 BossGroup + WorkerGroup 两组 EventLoopGroup。BossGroup 负责处理连接(accept),WorkerGroup 负责处理读写事件。每个 Channel 在生命周期内由固定的 EventLoop(对应一个线程)处理,所有 ChannelHandler 的事件都在同一线程执行,天然无锁。EventLoop 数量默认为 CPU 核数 × 2。
Q2: 什么是粘包半包?如何解决?
粘包是多个消息被合并成一个 TCP 包接收,半包是一个消息被拆成多个 TCP 包接收。根本原因是 TCP 是流式协议不保留消息边界。解决方案:① 固定长度(浪费带宽);② 分隔符(如换行符);③ 长度字段(推荐):在消息头加一个长度字段,先读长度再读内容。Netty 提供 LengthFieldBasedFrameDecoder 零配置解决。
Q3: Netty 为什么性能高?
四大优化:① 无锁化:每个 Channel 由固定 EventLoop 处理,不用加锁;② 零拷贝:DirectBuffer 减少一次内存拷贝,CompositeByteBuf 组合 ByteBuf 不拷贝,FileRegion 内核级传输;③ 内存池:PooledByteBuf 减少 GC 压力;④ 串行化:Channel 与 EventLoop 绑定,保证消息顺序同时避免竞争。
Q4: ByteBuf 和 ByteBuffer 的区别是什么?
三个核心区别:① ByteBuf 自动扩容,ByteBuffer 固定长度;② ByteBuf 有分离的读/写指针(readerIndex/writerIndex),不需要 flip() 切换;③ ByteBuf 支持池化(PooledByteBuf),减少 GC,而 ByteBuffer 每次分配都在堆上。ByteBuf 还有引用计数用于精确内存管理。
Q5: ChannelPipeline 的执行顺序是怎样的?
Pipeline 是责任链模式。入站事件按添加顺序从头部到尾部执行(Handler1 → Handler2 → …);出站事件按添加逆序从尾部到头部执行。可以通过
ctx.fireChannelRead()跳过后续 Handler,也可以调用ctx.channel().pipeline().remove()动态删除 Handler。