cd ..

SpringBoot 集成 Netty

当 SpringBoot 自带的 HTTP 服务满足不了长连接和高并发的需求时,Netty 是一个值得考虑的选择。本文记录了将 Netty 集成到 SpringBoot 项目中的完整过程,包括服务端搭建、客户端通信、心跳机制和自定义协议。

为什么需要 Netty?

做后端开发,迟早会遇到一个问题:SpringBoot 自带的 Tomcat 在处理 HTTP 请求时表现不错,但当你需要长连接、自定义协议、或者极致的网络性能时,它就显得力不从心了。这时候,Netty 就该登场了。

Netty 是一个基于 NIO 的异步非阻塞网络框架,简单来说,它能让你用很少的线程处理大量的并发连接。它的核心优势在于:

  • 基于 NIO 的异步非阻塞 I/O 模型,天然适合高并发场景
  • 支持 HTTPWebSocket 等多种协议,扩展性极强
  • 提供了清晰的编程模型和 API,上手门槛比直接用 NIO 低得多

那么问题来了——如何把 Netty 优雅地集成到 SpringBoot 项目里,让两者各司其职?接下来我们一步步来实现。

第一步:添加依赖

先把基础的依赖加上,这没什么好说的:

<!-- Netty依赖 -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.76.Final</version>
</dependency>

<!-- SpringBoot依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

第二步:配置 Netty 服务器参数

把端口和地址抽到配置文件里,方便不同环境切换。在 application.yml 中添加:

netty:
  server:
    port: 8090 # Netty服务器端口
    host: 127.0.0.1 # Netty服务器地址

注意这里 Netty 监听的是 8090 端口,和 SpringBoot 的 8080 互不冲突。两个服务各跑各的,互不干扰。

第三步:搭建服务端

服务端是整个架构的核心。Netty 的服务端由两部分组成:启动类负责配置和启动,处理器负责处理具体的业务逻辑。

服务器启动类

这里有个关键设计:用 @PostConstruct 让 Netty 服务器随 SpringBoot 一起启动,用 @PreDestroy 确保优雅关闭。bossGroup 负责接收连接,workerGroup 负责处理 I/O 事件——这是 Netty 经典的 Reactor 模式。

服务器启动类
@Component
public class NettyServer {

  private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

  @Value("${netty.server.port}")
  private int port;

  private ServerBootstrap serverBootstrap;
  private EventLoopGroup bossGroup;
  private EventLoopGroup workerGroup;
  private ChannelFuture channelFuture;

  @PostConstruct
  public void start() throws InterruptedException {
      bossGroup = new NioEventLoopGroup(1);
      workerGroup = new NioEventLoopGroup();

      try {
          serverBootstrap = new ServerBootstrap();
          serverBootstrap.group(bossGroup, workerGroup)
              .channel(NioServerSocketChannel.class)
              .option(ChannelOption.SO_BACKLOG, 128)
              .childOption(ChannelOption.SO_KEEPALIVE, true)
              .childHandler(new ChannelInitializer<SocketChannel>() {
                  @Override
                  protected void initChannel(SocketChannel ch) throws Exception {
                      ChannelPipeline pipeline = ch.pipeline();
                      // 添加编解码器
                      pipeline.addLast(new StringDecoder());
                      pipeline.addLast(new StringEncoder());
                      // 添加自定义处理器
                      pipeline.addLast(new NettyServerHandler());
                  }
              });

          // 绑定端口,启动服务器
          channelFuture = serverBootstrap.bind(port).sync();
          logger.info("Netty服务器启动成功,监听端口: {}", port);

          // 等待服务器关闭
          // channelFuture.channel().closeFuture().sync();
      } catch (Exception e) {
          logger.error("Netty服务器启动异常", e);
          bossGroup.shutdownGracefully();
          workerGroup.shutdownGracefully();
      }
  }

  @PreDestroy
  public void destroy() {
      logger.info("Netty服务器关闭");
      if (channelFuture != null) {
          channelFuture.channel().close();
      }
      if (bossGroup != null) {
          bossGroup.shutdownGracefully();
      }
      if (workerGroup != null) {
          workerGroup.shutdownGracefully();
      }
  }
}

服务器处理器

处理器是真正干活的地方。ChannelGroup 用来管理所有连接的客户端,方便后续做广播推送之类的功能。每个生命周期回调都有明确的职责:连接时加入分组,断开时移除,收到消息时处理业务逻辑。

服务器处理器
public class NettyServerHandler extends SimpleChannelInboundHandler<String> {

  private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);

  // 用于存储所有连接的客户端Channel
  private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
      Channel channel = ctx.channel();
      logger.info("客户端连接成功:{}", channel.remoteAddress());

      // 将新连接的客户端加入到channelGroup
      channelGroup.add(channel);
  }

  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
      Channel channel = ctx.channel();
      logger.info("客户端断开连接:{}", channel.remoteAddress());

      // 从channelGroup移除断开的客户端
      channelGroup.remove(channel);
  }

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
      Channel channel = ctx.channel();
      logger.info("收到客户端{}消息: {}", channel.remoteAddress(), msg);

      // 处理接收到的消息
      String response = "服务器已收到消息: " + msg;
      channel.writeAndFlush(response);
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      logger.error("Netty服务器异常", cause);
      ctx.close();
  }
}

第四步:搭建客户端

有了服务端,自然需要客户端来验证通信。客户端的结构和服务端类似,只不过用的是 Bootstrap 而不是 ServerBootstrap,并且它主动发起连接而不是监听端口。

客户端启动类

客户端启动类
@Component
public class NettyClient {

  private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);

  @Value("${netty.server.host}")
  private String host;

  @Value("${netty.server.port}")
  private int port;

  private Bootstrap bootstrap;
  private EventLoopGroup group;
  private Channel channel;

  @PostConstruct
  public void start() {
      group = new NioEventLoopGroup();

      try {
          bootstrap = new Bootstrap();
          bootstrap.group(group)
              .channel(NioSocketChannel.class)
              .option(ChannelOption.TCP_NODELAY, true)
              .handler(new ChannelInitializer<SocketChannel>() {
                  @Override
                  protected void initChannel(SocketChannel ch) throws Exception {
                      ChannelPipeline pipeline = ch.pipeline();
                      // 添加编解码器
                      pipeline.addLast(new StringDecoder());
                      pipeline.addLast(new StringEncoder());
                      // 添加自定义处理器
                      pipeline.addLast(new NettyClientHandler());
                  }
              });

          // 连接服务器
          ChannelFuture future = bootstrap.connect(host, port).sync();
          channel = future.channel();
          logger.info("Netty客户端启动成功,连接到服务器: {}:{}", host, port);

      } catch (Exception e) {
          logger.error("Netty客户端启动异常", e);
          group.shutdownGracefully();
      }
  }

  public void sendMessage(String message) {
      if (channel != null && channel.isActive()) {
          channel.writeAndFlush(message);
      }
  }

  @PreDestroy
  public void destroy() {
      logger.info("Netty客户端关闭");
      if (channel != null) {
          channel.close();
      }
      if (group != null) {
          group.shutdownGracefully();
      }
  }
}

客户端处理器

客户端处理器
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {

  private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);

  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
      logger.info("连接到服务器成功");
      // 连接成功后发送消息
      ctx.writeAndFlush("Hello, Netty Server!");
  }

  @Override
  protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
      logger.info("收到服务器消息: {}", msg);
  }

  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
      logger.error("Netty客户端异常", cause);
      ctx.close();
  }
}

进阶:心跳机制

基础通信跑通之后,下一个必须解决的问题就是心跳检测。在长连接场景下,如果不做心跳,你根本不知道对面是断了还是只是没说话。Netty 内置了 IdleStateHandler,省去了自己写定时器的麻烦。

ChannelInitializer 中加上这两行就够了:

// 在 ChannelInitializer 中添加 IdleStateHandler
pipeline.addLast(new IdleStateHandler(0, 0, 5, TimeUnit.SECONDS)); // 5秒没有读写操作就触发事件
pipeline.addLast(new HeartbeatHandler()); // 自定义心跳处理器

然后实现一个简单的心跳处理器,在空闲超时时发送心跳包:

public class HeartbeatHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.ALL_IDLE) {
                logger.info("发送心跳包");
                // 发送心跳包
                ctx.writeAndFlush("HEARTBEAT").addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

进阶:自定义协议

用字符串直接传输在 demo 阶段没问题,但真实项目里你一定需要自定义协议。原因很简单:你需要处理粘包拆包问题,需要传输结构化数据,需要区分不同类型的消息。

下面是一个基于「长度前缀 + JSON 消息体」的简单协议实现。

消息实体

@Data
public class Message {
    private String id;
    private String type;
    private String content;
    private Date createTime;
}

编码器

编码器的职责很明确:把 Java 对象序列化成字节流。这里先写入 4 字节的长度字段,再写入 JSON 内容,接收端就能准确知道一条消息在哪里结束。

public class MessageEncoder extends MessageToByteEncoder<Message> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        // 将对象序列化为JSON字符串
        String json = new ObjectMapper().writeValueAsString(msg);
        byte[] bytes = json.getBytes(StandardCharsets.UTF_8);

        // 写入消息长度
        out.writeInt(bytes.length);
        // 写入消息内容
        out.writeBytes(bytes);
    }
}

解码器

解码器是编码器的逆过程,但要多考虑一件事:TCP 是流式协议,数据可能分多次到达。所以需要先检查可读字节数是否足够,不够就等下一次数据到来再处理。

public class MessageDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 如果可读字节少于4(消息长度字段),等待更多数据
        if (in.readableBytes() < 4) {
            return;
        }

        // 标记当前读取位置
        in.markReaderIndex();

        // 读取消息长度
        int length = in.readInt();

        // 如果消息体不完整,重置读取位置,等待更多数据
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
            return;
        }

        // 读取消息内容
        byte[] bytes = new byte[length];
        in.readBytes(bytes);

        // 将JSON字符串反序列化为对象
        String json = new String(bytes, StandardCharsets.UTF_8);
        Message message = new ObjectMapper().readValue(json, Message.class);

        out.add(message);
    }
}

运行效果

一切就绪后,启动项目,你会依次看到这样的日志输出:

服务器启动

2023-05-13 10:15:23.456 INFO [main] com.example.netty.NettyServer : Netty服务器启动成功,监听端口: 8090

客户端连接

2023-05-13 10:15:25.789 INFO [nioEventLoopGroup-2-1] com.example.netty.NettyServerHandler : 客户端连接成功:/127.0.0.1:54321

消息交互

2023-05-13 10:15:26.123 INFO [nioEventLoopGroup-2-1] com.example.netty.NettyServerHandler : 收到客户端/127.0.0.1:54321消息: Hello, Netty Server!
2023-05-13 10:15:26.234 INFO [nioEventLoopGroup-1-1] com.example.netty.NettyClientHandler : 收到服务器消息: 服务器已收到消息: Hello, Netty Server!

心跳检测

2023-05-13 10:15:31.456 INFO [nioEventLoopGroup-2-1] com.example.netty.HeartbeatHandler : 发送心跳包

回顾

整个集成过程走下来,核心思路其实很清晰:让 SpringBoot 管理 Netty 的生命周期,让 Netty 专注于网络通信。@PostConstruct@PreDestroy 是衔接两者的关键。

在实际项目中,你可能还需要考虑重连机制、消息确认、序列化性能优化等问题,但本文的基础架构已经足够作为起点。掌握了这套模式,后续不管是做 IM 系统、物联网网关还是游戏服务器,都能快速搭起骨架来。