SpringBoot 集成 Netty
当 SpringBoot 自带的 HTTP 服务满足不了长连接和高并发的需求时,Netty 是一个值得考虑的选择。本文记录了将 Netty 集成到 SpringBoot 项目中的完整过程,包括服务端搭建、客户端通信、心跳机制和自定义协议。
为什么需要 Netty?
做后端开发,迟早会遇到一个问题:SpringBoot 自带的 Tomcat 在处理 HTTP 请求时表现不错,但当你需要长连接、自定义协议、或者极致的网络性能时,它就显得力不从心了。这时候,Netty 就该登场了。
Netty 是一个基于 NIO 的异步非阻塞网络框架,简单来说,它能让你用很少的线程处理大量的并发连接。它的核心优势在于:
- 基于 NIO 的异步非阻塞 I/O 模型,天然适合高并发场景
- 支持
HTTP、WebSocket等多种协议,扩展性极强 - 提供了清晰的编程模型和
API,上手门槛比直接用 NIO 低得多
那么问题来了——如何把 Netty 优雅地集成到 SpringBoot 项目里,让两者各司其职?接下来我们一步步来实现。
第一步:添加依赖
先把基础的依赖加上,这没什么好说的:
<!-- Netty依赖 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.76.Final</version>
</dependency>
<!-- SpringBoot依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
第二步:配置 Netty 服务器参数
把端口和地址抽到配置文件里,方便不同环境切换。在 application.yml 中添加:
netty:
server:
port: 8090 # Netty服务器端口
host: 127.0.0.1 # Netty服务器地址
注意这里 Netty 监听的是 8090 端口,和 SpringBoot 的 8080 互不冲突。两个服务各跑各的,互不干扰。
第三步:搭建服务端
服务端是整个架构的核心。Netty 的服务端由两部分组成:启动类负责配置和启动,处理器负责处理具体的业务逻辑。
服务器启动类
这里有个关键设计:用 @PostConstruct 让 Netty 服务器随 SpringBoot 一起启动,用 @PreDestroy 确保优雅关闭。bossGroup 负责接收连接,workerGroup 负责处理 I/O 事件——这是 Netty 经典的 Reactor 模式。
服务器启动类
@Component
public class NettyServer {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
@Value("${netty.server.port}")
private int port;
private ServerBootstrap serverBootstrap;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ChannelFuture channelFuture;
@PostConstruct
public void start() throws InterruptedException {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
try {
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 添加自定义处理器
pipeline.addLast(new NettyServerHandler());
}
});
// 绑定端口,启动服务器
channelFuture = serverBootstrap.bind(port).sync();
logger.info("Netty服务器启动成功,监听端口: {}", port);
// 等待服务器关闭
// channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
logger.error("Netty服务器启动异常", e);
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@PreDestroy
public void destroy() {
logger.info("Netty服务器关闭");
if (channelFuture != null) {
channelFuture.channel().close();
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}
服务器处理器
处理器是真正干活的地方。ChannelGroup 用来管理所有连接的客户端,方便后续做广播推送之类的功能。每个生命周期回调都有明确的职责:连接时加入分组,断开时移除,收到消息时处理业务逻辑。
服务器处理器
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
// 用于存储所有连接的客户端Channel
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
logger.info("客户端连接成功:{}", channel.remoteAddress());
// 将新连接的客户端加入到channelGroup
channelGroup.add(channel);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
logger.info("客户端断开连接:{}", channel.remoteAddress());
// 从channelGroup移除断开的客户端
channelGroup.remove(channel);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel channel = ctx.channel();
logger.info("收到客户端{}消息: {}", channel.remoteAddress(), msg);
// 处理接收到的消息
String response = "服务器已收到消息: " + msg;
channel.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("Netty服务器异常", cause);
ctx.close();
}
}
第四步:搭建客户端
有了服务端,自然需要客户端来验证通信。客户端的结构和服务端类似,只不过用的是 Bootstrap 而不是 ServerBootstrap,并且它主动发起连接而不是监听端口。
客户端启动类
客户端启动类
@Component
public class NettyClient {
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
@Value("${netty.server.host}")
private String host;
@Value("${netty.server.port}")
private int port;
private Bootstrap bootstrap;
private EventLoopGroup group;
private Channel channel;
@PostConstruct
public void start() {
group = new NioEventLoopGroup();
try {
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加编解码器
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 添加自定义处理器
pipeline.addLast(new NettyClientHandler());
}
});
// 连接服务器
ChannelFuture future = bootstrap.connect(host, port).sync();
channel = future.channel();
logger.info("Netty客户端启动成功,连接到服务器: {}:{}", host, port);
} catch (Exception e) {
logger.error("Netty客户端启动异常", e);
group.shutdownGracefully();
}
}
public void sendMessage(String message) {
if (channel != null && channel.isActive()) {
channel.writeAndFlush(message);
}
}
@PreDestroy
public void destroy() {
logger.info("Netty客户端关闭");
if (channel != null) {
channel.close();
}
if (group != null) {
group.shutdownGracefully();
}
}
}
客户端处理器
客户端处理器
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("连接到服务器成功");
// 连接成功后发送消息
ctx.writeAndFlush("Hello, Netty Server!");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
logger.info("收到服务器消息: {}", msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("Netty客户端异常", cause);
ctx.close();
}
}
进阶:心跳机制
基础通信跑通之后,下一个必须解决的问题就是心跳检测。在长连接场景下,如果不做心跳,你根本不知道对面是断了还是只是没说话。Netty 内置了 IdleStateHandler,省去了自己写定时器的麻烦。
在 ChannelInitializer 中加上这两行就够了:
// 在 ChannelInitializer 中添加 IdleStateHandler
pipeline.addLast(new IdleStateHandler(0, 0, 5, TimeUnit.SECONDS)); // 5秒没有读写操作就触发事件
pipeline.addLast(new HeartbeatHandler()); // 自定义心跳处理器
然后实现一个简单的心跳处理器,在空闲超时时发送心跳包:
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.ALL_IDLE) {
logger.info("发送心跳包");
// 发送心跳包
ctx.writeAndFlush("HEARTBEAT").addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
进阶:自定义协议
用字符串直接传输在 demo 阶段没问题,但真实项目里你一定需要自定义协议。原因很简单:你需要处理粘包拆包问题,需要传输结构化数据,需要区分不同类型的消息。
下面是一个基于「长度前缀 + JSON 消息体」的简单协议实现。
消息实体
@Data
public class Message {
private String id;
private String type;
private String content;
private Date createTime;
}
编码器
编码器的职责很明确:把 Java 对象序列化成字节流。这里先写入 4 字节的长度字段,再写入 JSON 内容,接收端就能准确知道一条消息在哪里结束。
public class MessageEncoder extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 将对象序列化为JSON字符串
String json = new ObjectMapper().writeValueAsString(msg);
byte[] bytes = json.getBytes(StandardCharsets.UTF_8);
// 写入消息长度
out.writeInt(bytes.length);
// 写入消息内容
out.writeBytes(bytes);
}
}
解码器
解码器是编码器的逆过程,但要多考虑一件事:TCP 是流式协议,数据可能分多次到达。所以需要先检查可读字节数是否足够,不够就等下一次数据到来再处理。
public class MessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 如果可读字节少于4(消息长度字段),等待更多数据
if (in.readableBytes() < 4) {
return;
}
// 标记当前读取位置
in.markReaderIndex();
// 读取消息长度
int length = in.readInt();
// 如果消息体不完整,重置读取位置,等待更多数据
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}
// 读取消息内容
byte[] bytes = new byte[length];
in.readBytes(bytes);
// 将JSON字符串反序列化为对象
String json = new String(bytes, StandardCharsets.UTF_8);
Message message = new ObjectMapper().readValue(json, Message.class);
out.add(message);
}
}
运行效果
一切就绪后,启动项目,你会依次看到这样的日志输出:
服务器启动
2023-05-13 10:15:23.456 INFO [main] com.example.netty.NettyServer : Netty服务器启动成功,监听端口: 8090
客户端连接
2023-05-13 10:15:25.789 INFO [nioEventLoopGroup-2-1] com.example.netty.NettyServerHandler : 客户端连接成功:/127.0.0.1:54321
消息交互
2023-05-13 10:15:26.123 INFO [nioEventLoopGroup-2-1] com.example.netty.NettyServerHandler : 收到客户端/127.0.0.1:54321消息: Hello, Netty Server!
2023-05-13 10:15:26.234 INFO [nioEventLoopGroup-1-1] com.example.netty.NettyClientHandler : 收到服务器消息: 服务器已收到消息: Hello, Netty Server!
心跳检测
2023-05-13 10:15:31.456 INFO [nioEventLoopGroup-2-1] com.example.netty.HeartbeatHandler : 发送心跳包
回顾
整个集成过程走下来,核心思路其实很清晰:让 SpringBoot 管理 Netty 的生命周期,让 Netty 专注于网络通信。@PostConstruct 和 @PreDestroy 是衔接两者的关键。
在实际项目中,你可能还需要考虑重连机制、消息确认、序列化性能优化等问题,但本文的基础架构已经足够作为起点。掌握了这套模式,后续不管是做 IM 系统、物联网网关还是游戏服务器,都能快速搭起骨架来。