博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty 解决粘包 和 分包的问题
阅读量:3727 次
发布时间:2019-05-22

本文共 6795 字,大约阅读时间需要 22 分钟。

netty 解决粘包 和 分包的问题

更多干货

概述

netty和tcp协议的关系

netty → Java Runtime Socket (io、nio、nio2) → OS Socket → TCP (当然也可以是UDP、SCTP);

作系统层的Socket都必须做三次握手(仅对TCP而言),Netty当然无法跳过,只不过它对用户屏蔽了三次握手(当然还有四次挥手)的部分细节。

粘包现象

客户端在一个for循环内连续发送1000个hello给Netty服务器端

Socket socket = new Socket("127.0.0.1", 10101);        for(int i = 0; i < 1000; i++){           socket.getOutputStream().write(“hello”.getBytes());        }        socket.close();

而在服务器端接受到的信息并不是预期的1000个独立的Hello字符串.

实际上是无序的hello字符串混合在一起, 如图所示. 这种现象我们称之为粘包.

  1. 为什么会出现这种现象呢? TCP是个”流”协议,流其实就是没有界限的一串数据。

  2. TCP底层中并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包划分,

  3. 所以在TCP中就有可能一个完整地包会被TCP拆分成多个包,也有可能吧多个小的包封装成一个大的数据包发送。

分包处理

顾名思义, 我们要对传输的数据进行分包. 一个简单的处理逻辑是在发送数据包之前, 先用四个字节占位, 表示数据包的长度.

数据包结构为:

Socket socket = new Socket("127.0.0.1", 10101);		String message = "hello";		byte[] bytes = message.getBytes();		ByteBuffer buffer = ByteBuffer.allocate(4 + bytes.length);		buffer.putInt(bytes.length);		buffer.put(bytes);		byte[] array = buffer.array();		for(int i=0; i<1000; i++){			socket.getOutputStream().write(array);		}		socket.close();

服务器端代码, 我们需要借助于FrameDecoder类来分包.

if(buffer.readableBytes() > 4){			if(buffer.readableBytes() > 2048){				buffer.skipBytes(buffer.readableBytes());			}			//标记			buffer.markReaderIndex();			//长度			int length = buffer.readInt();			if(buffer.readableBytes() < length){				buffer.resetReaderIndex();				//缓存当前剩余的buffer数据,等待剩下数据包到来				return null;			}			//读数据			byte[] bytes = new byte[length];			buffer.readBytes(bytes);			//往下传递对象			return new String(bytes);		}		//缓存当前剩余的buffer数据,等待剩下数据包到来		return null;
  1. 这边可能有个疑问, 为什么MyDecoder中数据没有读取完毕, 需要return null,

  2. 正常的pipeline在数据处理完都是要sendUpstream, 给下一个pipeline的.

  3. 这个需要看下FrameDecoder.messageReceived 的源码. 他在其中缓存了一个cumulation对象,

  4. 如果return了null, 他会继续往缓存里写数据来实现分包

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {          Object m = e.getMessage();          if (!(m instanceof ChannelBuffer)) {            // 数据读完了, 转下一个pipeline              ctx.sendUpstream(e);          } else {              ChannelBuffer input = (ChannelBuffer)m;              if (input.readable()) {                  if (this.cumulation == null) {                     try {                         this.callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());                     } finally {                         this.updateCumulation(ctx, input);                     }                 } else {                  // 缓存上一次没读完整的数据                     input = this.appendToCumulation(input);                     try {                         this.callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());                     } finally {                         this.updateCumulation(ctx, input);                     }                 }             }         }     }

Socket字节流攻击

在上述代码中, 我们会在服务器端为客户端发送的数据包长度, 预先分配byte数组.

如果遇到恶意攻击, 传入的数据长度与内容 不匹配. 例如声明数据长度为Integer.MAX_VALUE.

这样会消耗大量的服务器资源生成byte[], 显然是不合理的.

因此我们还要加个最大长度限制.

if(buffer.readableBytes() > 2048){             buffer.skipBytes(buffer.readableBytes());           }

新的麻烦也随之而来, 虽然可以跳过指定长度, 但是数据包本身就乱掉了.

因为长度和内容不匹配, 跳过一个长度后, 不知道下一段数据的开头在哪里了.

因此我们自定义数据包里面, 不仅要引入数据包长度, 还要引入一个包头来划分各个包的范围.

包头用任意一段特殊字符标记即可, 例如$$$.

// 防止socket字节流攻击          if(buffer.readableBytes() > 2048){            buffer.skipBytes(buffer.readableBytes());          }          // 记录包头开始的index          int beginReader = buffer.readerIndex();          while(true) {              if(buffer.readInt() == ConstantValue.FLAG) {                 break;             }         }

新的数据包结构为:

|    包头(4字节)    |    长度(4字节)    |    数据    |

Netty自带拆包类

自己实现拆包虽然可以细粒度控制, 但是也会有些不方便, 可以直接调用Netty提供的一些内置拆包类.

  1. FixedLengthFrameDecoder 按照特定长度组包

  2. DelimiterBasedFrameDecoder 按照指定分隔符组包, 例如本文中的$$$

  3. LineBasedFrameDecoder 按照换行符进行组包, \r \n等等

代码

Server

package com.server;import java.net.InetSocketAddress;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import org.jboss.netty.bootstrap.ServerBootstrap;import org.jboss.netty.channel.ChannelPipeline;import org.jboss.netty.channel.ChannelPipelineFactory;import org.jboss.netty.channel.Channels;import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;import org.jboss.netty.handler.codec.string.StringDecoder;import org.jboss.netty.handler.codec.string.StringEncoder;public class Server {	public static void main(String[] args) {		//服务类		ServerBootstrap bootstrap = new ServerBootstrap();		//boss线程监听端口,worker线程负责数据读写		ExecutorService boss = Executors.newCachedThreadPool();		ExecutorService worker = Executors.newCachedThreadPool();		//设置niosocket工厂		bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));		//设置管道的工厂		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {			@Override			public ChannelPipeline getPipeline() throws Exception {				ChannelPipeline pipeline = Channels.pipeline();				pipeline.addLast("decoder", new MyDecoder());				pipeline.addLast("handler1", new HelloHandler());				return pipeline;			}		});		bootstrap.bind(new InetSocketAddress(10101));		System.out.println("start!!!");	}}

Client

package com.server;import java.net.Socket;import java.nio.ByteBuffer;public class Client {	public static void main(String[] args) throws Exception {		Socket socket = new Socket("127.0.0.1", 10101);		String message = "hello";		byte[] bytes = message.getBytes();		ByteBuffer buffer = ByteBuffer.allocate(4 + bytes.length);		buffer.putInt(bytes.length);		buffer.put(bytes);		byte[] array = buffer.array();		for(int i=0; i<1000; i++){			socket.getOutputStream().write(array);		}		socket.close();	}}

HelloHandler

package com.server;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.channel.MessageEvent;import org.jboss.netty.channel.SimpleChannelHandler;public class HelloHandler extends SimpleChannelHandler {	private int count = 1;	@Override	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {		System.out.println(e.getMessage() + "  " +count);		count++;	}}

MyDecoder

package com.server;import org.jboss.netty.buffer.ChannelBuffer;import org.jboss.netty.channel.Channel;import org.jboss.netty.channel.ChannelHandlerContext;import org.jboss.netty.handler.codec.frame.FrameDecoder;public class MyDecoder extends FrameDecoder {	@Override	protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {		if(buffer.readableBytes() > 4){			if(buffer.readableBytes() > 2048){				buffer.skipBytes(buffer.readableBytes());			}			//标记			buffer.markReaderIndex();			//长度			int length = buffer.readInt();			if(buffer.readableBytes() < length){				buffer.resetReaderIndex();				//缓存当前剩余的buffer数据,等待剩下数据包到来				return null;			}			//读数据			byte[] bytes = new byte[length];			buffer.readBytes(bytes);			//往下传递对象			return new String(bytes);		}		//缓存当前剩余的buffer数据,等待剩下数据包到来		return null;	}}

转载地址:http://tclnn.baihongyu.com/

你可能感兴趣的文章
2021 最新 IntelliJ IDEA配置 远程Docker容器 编写Dockerfile文件 步骤演示(图文版)
查看>>
2021年 最新 多阶段构建dockerfile实现java源码编译打jar包并做成镜像
查看>>
Vue 入门 指令
查看>>
数据结构 线性表(一) 字符串插入
查看>>
数据结构 线性表(二) 多项式加法
查看>>
数据结构 线性表(三) 放苹果
查看>>
数据结构 栈与队列(一) 括号匹配
查看>>
合同法律风险管理 合同的意义
查看>>
数据结构 栈与队列(二) 栈的基本操作
查看>>
合同法律风险管理 合同的精神
查看>>
数据结构 栈与队列(三) 抓住那头牛
查看>>
数据结构 字符串(一) 全在其中
查看>>
合同法律风险管理 合同理念(一)对象与文本并重
查看>>
数据结构 字符串(二)统计字符数
查看>>
合同法律风险管理 合同理念(二)履约与签约并重
查看>>
数据结构 字符串(三)前缀中的周期
查看>>
合同法律风险管理 合同理念(三)预防与救济并重
查看>>
Python爬虫 requests库应用详解
查看>>
数据结构 二叉树(一)Huffman编码树
查看>>
网络安全之密码安全基础
查看>>