从零学Netty之粘包半包解决-使用特殊分隔符
本文我们就Netty使用中常见的半包/粘包问题进行分析和处理。
解决粘包/半包问题本质方式为:有明确的业务应用数据区分标志,能够按照边界完整的接受Netty传输的数据。
字符串类型消息解决粘包/半包方式汇总
对于字符串类型的消息,Netty提供了多种现成的编解码工具解决粘包/半包问题,具体的工具类组合如下:
- DelimiterBasedFrameDecoder+StringDecoder,通过特殊分隔符作为消息的结束标志
- LineBasedFrameDecoder+StringDecoder,通过换行符作为消息的结束标志
- FixedLengthFrameDecoder+StringDecoder,按照固定长度方式获取消息并解析
本文我们先介绍第一种方式,即:
DelimiterBasedFrameDecoder+StringDecoder
DelimiterBasedFrameDecoder的原理很好理解:通过利用特殊字符作为数据包的结束标志。发送方与接收方通过该标记对数据包进行分割解析即可。
服务端逻辑
首先看一下服务端实现代码。
DelimiterBaseServer.java
public class DelimiterBaseServer {
/**字符串分隔符*/
private static final String DELIMITER_TAG = "@#";
public static void main(String[] args) throws Exception {
int port = 8081;
new DelimiterBaseServer().bind(port);
}
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();1
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// 设置TCP参数,连接请求的最大队列长度
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 设置DelimiterBaseFrameDecoder处理器
ByteBuf delimter = Unpooled.copiedBuffer(DELIMITER_TAG.getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimter));
// 设置StringDecoder处理器
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new DelimiterBaseServerHandler());
}
});
ChannelFuture channelFuture = b.bind(port).sync();
// 等待服务端监听端口关闭
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
throw e;
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
注意看这几行代码
// 设置DelimiterBaseFrameDecoder处理器
ByteBuf delimter = Unpooled.copiedBuffer(DELIMITER_TAG.getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimter));
// 设置StringDecoder处理器
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new DelimiterBaseServerHandler());
这里设置了DelimiterBaseFrameDecoder处理器与StringDecoder处理器。
我们接着看一下DelimiterBaseServerHandler的实现。
DelimiterBaseServerHandler.java
public class DelimiterBaseServerHandler extends SimpleChannelInboundHandler {
/**字符串分隔符*/
private static final String DELIMITER_TAG = "@#";
/**计数器*/
private static final AtomicInteger counter = new AtomicInteger(0);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接受客户端发送的字符串,打印控制台
String content = (String) msg;
System.out.println("接收到客户端发送字串---[" + content + "], 当前次数=[" + counter.addAndGet(1) + "]");
// 加入分隔符,将字符串重新返回给客户端
content += DELIMITER_TAG;
ByteBuf echo = Unpooled.copiedBuffer(content.getBytes());
ctx.writeAndFlush(echo);
}
...省略不重要代码...
可以看到,channelRead0方法中对客户端的请求包进行了处理,通过对数据包末尾添加分隔符的方式对数据包进行加工并返回给客户端。
客户端逻辑
我们接着看一下客户端的实现。
DelimiterBaseClient.java
public class DelimiterBaseClient {
public DelimiterBaseClient() {}
/**字符串分隔符*/
private static final String DELIMITER_TAG = "@#";
public static void main(String[] args) throws Exception {
int port = 8081;
new DelimiterBaseClient().connect(port, "127.0.0.1");
}
public void connect(int port, String host) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 设置DelimiterBaseFrameDecoder处理器
ByteBuf delimter = Unpooled.copiedBuffer(DELIMITER_TAG.getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimter));
// 设置StringDecoder处理器
ch.pipeline().addLast(new StringDecoder());
// TODO 配置客户端处理I/O事件的handler
ch.pipeline().addLast(new DelimiterBaseClientHandler());
}
});
// 发起异步连接操作
ChannelFuture f = b.connect(host, port).sync();
// 循环发送1000次消息
for (int i = 0; i < 1000; i++) {
// 构造客户端发送的数据ByteBuf对象
String content = "Netty探秘!" + DELIMITER_TAG;
byte[] req = content.getBytes();
ByteBuf messageBuffer = Unpooled.buffer(req.length);
messageBuffer.writeBytes(req);
// 向服务端发送数据
ChannelFuture channelFuture = f.channel().writeAndFlush(messageBuffer);
channelFuture.syncUninterruptibly();
}
f.channel().closeFuture().sync();
} catch (Exception e) {
throw e;
} finally {
group.shutdownGracefully();
}
}
}
客户端在初始化完成之后,发送1000次消息,每条消息末尾都添加了分隔符。服务端在收到分隔符之后会按照分隔符进行逐行解析。
可以看到客户端的实现和服务端很相似,关键代码也是这几行:
// 设置DelimiterBaseFrameDecoder处理器
ByteBuf delimter = Unpooled.copiedBuffer(DELIMITER_TAG.getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimter));
// 设置StringDecoder处理器
ch.pipeline().addLast(new StringDecoder());
// TODO 配置客户端处理I/O事件的handler
ch.pipeline().addLast(new DelimiterBaseClientHandler());
通过对数据包进行添加分割符的方式进行发送。
DelimiterBaseClientHandler.java
public class DelimiterBaseClientHandler extends SimpleChannelInboundHandler {
private static final Logger LOGGER = Logger.getLogger("DelimiterBaseClientHandler");
private static final AtomicInteger counter = new AtomicInteger(0);
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// 获取服务端返回的数据,并打印到控制台
String content = (String) msg;
System.out.println("接收到服务端返回字串---[" + content + "], 当前次数=[" + counter.addAndGet(1) + "]");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.warning("Unexpected exception from downstream :" + cause.getMessage());
ctx.close();
}
}
这里是客户端IO处理逻辑,对服务端的返回包进行打印。
案例运行
先后启动服务端、客户端,看一下日志打印情况。
服务端日志打印
接收到客户端发送字串---[Netty探秘!], 当前次数=[1]
接收到客户端发送字串---[Netty探秘!], 当前次数=[2]
接收到客户端发送字串---[Netty探秘!], 当前次数=[3]
接收到客户端发送字串---[Netty探秘!], 当前次数=[4]
接收到客户端发送字串---[Netty探秘!], 当前次数=[5]
接收到客户端发送字串---[Netty探秘!], 当前次数=[6]
接收到客户端发送字串---[Netty探秘!], 当前次数=[7]
接收到客户端发送字串---[Netty探秘!], 当前次数=[8]
接收到客户端发送字串---[Netty探秘!], 当前次数=[9]
接收到客户端发送字串---[Netty探秘!], 当前次数=[10]
......
客户端日志打印
接收到服务端返回字串---[Netty探秘!], 当前次数=[1]
接收到服务端返回字串---[Netty探秘!], 当前次数=[2]
接收到服务端返回字串---[Netty探秘!], 当前次数=[3]
接收到服务端返回字串---[Netty探秘!], 当前次数=[4]
接收到服务端返回字串---[Netty探秘!], 当前次数=[5]
接收到服务端返回字串---[Netty探秘!], 当前次数=[6]
接收到服务端返回字串---[Netty探秘!], 当前次数=[7]
接收到服务端返回字串---[Netty探秘!], 当前次数=[8]
接收到服务端返回字串---[Netty探秘!], 当前次数=[9]
接收到服务端返回字串---[Netty探秘!], 当前次数=[10]
......
能够清楚的看到,每一个请求都是单独的一行日志打印,没有再出现上篇文章中的粘包、拆包现象。
每个数据包都是独立被解析的。
小结
通过本文的讲解和demo演示,表明通过 DelimiterBasedFrameDecoder+StringDecoder 即特殊分隔符作为消息的结束标志解决粘包、拆包问题是可行的。
当然除了这种方法外,Netty还支持其他更多的方式解决粘包、拆包问题,我们后续的文章中将会陆续呈现。
版权声明:
原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。