简说Netty

2023年3月30日

1、Netty整体架构

2、poll和epoll

进程通过将一个或多个fd传递给select或poll系统调用,阻塞在select操作上,select/poll是顺序扫描fd是否就绪,需要扫描所有的客户端是否就绪
epoll使用基于事件驱动方式替代顺序扫描,当有fd就绪时,立即回调函数rollback,哪个客户端就绪处理哪个客户端

3、线程模型

NioEventLoopGroup

每个EventLoopGroup里包括一个或多个EventLoop,每个EventLoop中维护一个Selector实例。
NioEventLoopGroup,本质底层就是一个线程池,可以让你从里面获取新的线程,以及他会负责管理这些线程的生命周期

两层线程模型

BossGroup
HeadContext -> ServerBootstrapAcceptor -> TailContext

WorkerGroup
HeadContext -> childHandler -> TailContext

4、NioEventLoop深入分析

里面有网络请求的处理和(普通任务、调度任务的处理),普通任务和调度任务都会放入到taskQueue中,和网络请求等时间的进行处理,就是说,网络请求占用了1分钟,任务处理也处理1分钟(最多)
代码分析 :

io.netty.bootstrap.AbstractBootstrap#initAndRegister

channel = channelFactory.newChannel(); 
反射实例化NioServerSocketChannel,主要做的事情是创建ServerSocketChannel,设置非阻塞并关联一个 pipeline,pipeline又是ChannelHandlerContext组成的

io.netty.bootstrap.ServerBootstrap#init
主要是向BossGroup Channel pipeline添加ServerBootstrapAcceptor(这个很有意思,其实是BossGroup ServerSocketChannel接受网络连接OP_ACCEPT事件(在NioEventLoop中做),一旦有连接事件,将甩到WorkerGroup中(也是以注册的方式,启动子线程池中的线程))

io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
childGroup.register(child).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (!future.isSuccess()) {
            forceClose(child, future.cause());
        }
    }
});


这个是BossGroup的线程池注册channel,将channel注册到selector,这个挺有意思,会启动NioEventLoop线程,无限run,阻塞selector.select()获取事件,处理channel
ChannelFuture regFuture = config().group().register(channel);

io.netty.channel.AbstractChannel.AbstractUnsafe#register,这个是对应的BossGroup中NioEventLoop Channel到Selector的注册

特别注意,如果线程启动了,直接注册进去,如果没有启动,走的是启动一个NioEventLoop,eventLoop.execute方法就是启动线程
然后会在线程里面调用 register0
if (eventLoop.inEventLoop()) {
    register0(promise);
} else {
    try {
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                register0(promise);
            }
        });
    } catch (Throwable t) {
        logger.warn(
                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                AbstractChannel.this, t);
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

register0中, pipeline.invokeHandlerAddedIfNeeded(); 
针对BossGroup来说,会向pipeline中加 ServerBootstrapAcceptor
对于WorkerGroup来说,会向pipeline中加自定义的Handler

对于BossGrop中的 NioEventLoop是监听 OP_ACCEPT事件

对于WorkerGroup中的 NioEventLoop是监听的 OP_READ事件


最终其实 BossGroup线程池的NioEventLoop监听 OP_ACCEPT,WorkerGruop线程池中的NioEventLoop监听OP_READ事件,通过selector.select()
多路复用的方式接受请求

io.netty.channel.nio.NioEventLoop#processSelectedKey
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

如果是accpetor线程的话,则会走 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
其实要明白,一旦有新的网络连接走过来就会走到这里,然后会调用 ServerSocketChannel对应的pipeline 中的 ServerBootstrapAcceptor,会调用 
pipeline.fireChannelRead,也就是调用的是 ServerBootstrapAcceptor 中的 channelRead,这个方法可不简单,主要做的事情其实就是启动一个WorkerGroup中的NioEventLoop(里面封装了acceptor传递过来的channel和自己创建的Selector)

io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
如果是process线程的话,其实就是每个SocketChannel,会甩到WorkerGroup线程池中的NioEventLoop中,Channel注册到Selector上,然后读事件

5、处理器Handler主要分两种

ChannelInboundHandlerAdapter(入站处理器) : 数据从底层Java NIO Channel到Netty的Channel,典型的比如说decoder
1、注册事件 fireChannelRegistered
2、连接建立事件 fireChannelActive
3、读事件和读完成事件 fireChannelRead、fireChannelReadComplete
4、异常通知事件 fireExeceptionCaught
5、用户自定义事件 fireUserEventTriggered
6、Channel可写状态变化事件 fireChannelWritabilityChanged
7、连接关闭事件 fireChannelInactive

ChannelOutboundHandler(出站管理器) : 通过Netty的Channel来操作底层的Java NIO Channel,典型的比如说encoder
1、端口绑定bind
2、连接服务端connect
3、写事件write
4、刷新事件flush
5、读事件read
6、主动断开连接 disconnect
7、关闭channel 事件close

chanel、chanelpipeline、channelHandlerContext

首先通过NioServerSocketChannel创建 channel的时候,就会创建一个 pipeline 和 unsafe
pipeline(DefaultChannelPipeline),pipeline中的 HeadContext,自定义HandlerContext和TailCotnext,都是继承ChannelHandlerContext的
所以有 ChannelHandlerContext 特性,ChannelHandlerContext 可以获取pipeline和channel

自定义的Handler是有 ChannelHandlerContext 特性的,所以可以拿到 pipeline和channel

6、TailContext & HeadContext

TailContext 会对入站的ByteBuf进行回收
HeadContext 会对出站的ByteBuf进行回收

7、ByteBuf

readerIndex,writeIndex,capacity
readerIndex -> writeIndex 之间是可读取空间
writeIndex -> capacity 是可写空间
如果内存不足,自动扩容

ByteBuf的分配其实很有意思
默认大小是256个字节

HeapBuffer
PooledByteBufAllocator
UnpooledByteBufAllocator

DirectBuffer
PooledByteBufAllocator
UnpooledByteBufAllocator

所以可以组合成 2 * 2 = 4种
Pooled Unpooled
Direct Headp

ByteBuf内存管理 :
PoolThreadCache线程本地内存 -> PoolArena内存池(内存池不够) -> PoolChunk 内存块(16M) -> Unpooled 非池化临时内存(申请大于12M)

8、client doConnect

io.netty.channel.socket.nio.NioSocketChannel#doConnect

// 建立网络连接
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        doBind0(localAddress);
    }

    boolean success = false;
    try {
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        // 一般不会立马建立网络连接,所以在这里会监听OP_CONNECT事件,接着在NioEventLoop中建立网络连接,取消OP_CONNECT事件
        if (!connected) {
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}

io.netty.channel.nio.NioEventLoop#processSelectedKey
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
    // See https://github.com/netty/netty/issues/924
    int ops = k.interestOps();
    ops &= ~SelectionKey.OP_CONNECT;
    k.interestOps(ops);

    unsafe.finishConnect();
}

9、写数据不会一直阻塞

io.netty.channel.nio.AbstractNioByteChannel#doWrite
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    // 默认是16
    int writeSpinCount = config().getWriteSpinCount();
    do {
        Object msg = in.current();
        if (msg == null) {
            // Wrote all messages.
            clearOpWrite();
            // Directly return here so incompleteWrite(...) is not called.
            return;
        }
        // 正常情况下,其实如果发送完毕了,这里返回的是1,writeSpinCount=16-1=15,继续循环,msg为空,取消write关注
        // 否则,比如说一个大的数据,然后发送了16次都没有发送完毕,那就会走 incompleteWrite
        writeSpinCount -= doWriteInternal(in, msg);
        // 最多轮询16次
    } while (writeSpinCount > 0);

    // 一旦走到这里,说明消息没有发送完毕,NioEventLoop继续消息发送,继续关注OP_WRITE事件
    incompleteWrite(writeSpinCount 

10、断开连接

SocketChannel channel = (SocketChannel) sk.channel();
try {
    ByteBuffer byteBuffer = ByteBuffer.allocate(200);
    int num;
    //这里只读数据,未作任何处理
    num = channel.read(byteBuffer);
    if(num == -1)
        throw new IOException("读完成");

} catch (IOException e) {
    System.out.println(e.getMessage());
    sk.cancel();
    if (channel != null)
        channel.close();
}

其实会有两种情况断开连接
1、客户端主动断开连接,要明白也是一种读事件,只是num=-1,所以这里可以干点事情
2、超时断开连接,后台任务处理

11、粘包和拆包

本质是TCP是流式协议,消息无边界
粘包主要是比如说减少网络传输的频率,数据挤压到一起进行传输,比如开启nagle算法
拆包主要是表现在消息的不完整性

TCP是一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但如果这么做,缺点是包的往返时间越长性能就越差
为了解决这个问题,引入滑动窗口,窗口大小即决定了无需等待答应可继续发送的数据最大值,起到一个缓冲的作用,同时也得到流量的控制

Netty一般解决粘包拆包使用的是LengthFieldBaseFrameDecoder,说白了就是先传 递数据的长度 + 再是数据

encoder 一般是MessageToByteEncoder
decoder 一般是LengthFieldBasedFrameDecoder,自己写长度 + 内容,使用LengthFieldBasedFrameDecoder可以进行校验,以及截取方式的读取,说白了,其实就是读取有效的数据,我想要的数据

int maxFrameLength 消息最大长度
int lengthFieldOffset 偏移量,从什么地方开始读取,一般是0
int lengthFieldLength 其实在encoder写过来之后,netty会封装成 数据长度 + 数据,这个是指定数据需要多大,一般比如说4个字节,能放很大数据了
int lengthAdjustment 比如说长度 + magic + 数据,这样的话magic就是可以进行调整的,initialBytesToStrip=长度 去掉,lengthAdjustment=magic去掉,这样配置的话,在decoder的时候,只需要读取 真正数据了
int initialBytesToStrip 上面已经说了

对于MessageToByteEncoder encode 方法,类名上必须加@ChannelHandler.Sharable注解,否则只能有一个客户端读服务器发送消息

LengthFieldPrepender 这个会自动给发送的content内容,加上它对应的length

.addLast(new LengthFieldPrepender(4)) 在encoder之前
.addLast(new NettyPacketEncoder())
.addLast(new NettyPacketDecoder())
.addLast(new ClientHandler());

12、空闲检测

连接假死
原因
1、网络设备出现故障,例如网卡、机房等,底层的TCP连接已经断开了,但是应用程序没有感知到,仍然占用着资源
2、公共网络不稳定,出现丢包。如果连续出现丢包,这是线程就是客户端数据发送不出去
服务端需要做的事情
每隔一段时间就检查这段时间内是否接受到客户端数据,没有就可以判断为连接假死,关掉channel即可
使用IdleStateHandler 重新userEventTriggered,判断READER_IDLE

客户端要做的事情
定时心跳,只要这个时间间隔小于服务端定义的空闲检测时间间隔,那么就能防止前面提到的误判,比如说客户端写空闲是3,服务端可以读空闲5来检验,一般客户端的超时是服务端的一半
使用IdleStateHandler 重新userEventTriggered,判断WRITER_IDLE,发送比如说PingMessage,服务端接受可以打印一个日志即可,不用处理,相当于续命

13、FutureTask & ChannelFuture & DefaultChannelPromise

Netty中的Future与JDK中的Future同名,但是是两个接口,Netty中的Future继承自JDK的Future,而Promise又对Netty Future进行了扩展
1、JDK Future只能同步等待任务结束(或成功,或失败)才能得到结果
2、Netty Future可以同步、异步等待得到结果(添加Listener),支持函数回调,类似JDK中的CompletableFuture,但是要等待任务结束
3、NettyPromise,不仅支持Netty Future的功能,而且脱离了任务独立存在,只作为两个线程间结果的传递

14、无锁串行化

说的是WorkerGropu中的事件处理和普通任务,是串行操作的,一般情况下会再加一层Handler线程池,之所以Netty原生不这样做,Hander线程那种设计思路,可能会导致线程与线程之间进行共享资源争抢,一旦发生了锁问题,那么会导致并发能力更差
所以Netty默认是一个IO线程无锁串行化处理各个客户端的请求,处理完了一个返回响应再处理下一个

15、JDK bug

说清楚其实就是,selector.slect(1s),如果1s内有任务,selectCnt=0重置为0,继续统计
如果1s内没有任务,你这么狂轮询(selectCnt默认阈值是512),那肯定是问题哈,做法是重建selector,创建一个新的selector,将之前的在这个selector的channel进行转移
Netty最新源码会判断,有任务情况下,也有可能会这么轮询,只打印日志告诉用户说明可能是JDK bug

服务器托管,北京服务器托管,服务器租用 http://www.hhisp.net

hackdl

咨询热线/微信 13051898268