随笔分类
消息出站
上层业务调用 ctx.flush()时,最终会来到 HeadContext.flush()
@Override
public void flush(ChannelHandlerContext ctx) {
unsafe.flush();
}
接着,来到 Adstract.flush()
// ctx.flush() - 最终会来到此
@Override
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
// 预准备刷新操作 - 让 flushedEntry指向第一个需要被刷新的 Entry节点, 统计需要被刷新的 Entry数目, 值记录在 flushed字段内
outboundBuffer.addFlush();
// 真正去执行刷新操作
flush0();
}
这里主要涉及到两个核心方法,addflush()、flush0(),前者是做些预准备刷新的操作,后者则是真正去执行刷新操作
addflush()
ChannelOutboundBuffer.addflush()
这个方法其实就是与 addMessage()向对应
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
// 让 flushedEntry指向第一个需要被刷新的 Entry
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
// 迭代链表, 统计需要被刷新的 Entry数目, 值保存在 字段 flushed内
do {
flushed ++;
// true - 更新失败, entry在此之前已经被设置为 取消状态了
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
// 获取 取消状态的 Entry自身总大小
int pending = entry.cancel();
// 这一步来更新下出站缓冲区的总数据量
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
// 最后, 设置 unflushedEntry值为 null
unflushedEntry = null;
}
}
可以看到,这里无非就是去统计出站缓冲区中待刷新的 Entry数目,将数目保存到字段 flushed中去,并且将 flushedEntry执行第一个待刷新的 Entry,而将 unflushedEntry置为 null
这也是前面所讲着一种情况:消息出站时对应的 flushedEntry != null && unflushedEntry == null
这里还来判断了 Entry是否被取消了,是的话需要去修改下相关的统计信息,即出站缓冲区大小 - 取消了的 Entry自身总大小
对应 decrementPendingOutboundBytes()
此方法与消息入站时调用的 incrementPendingOutboundBytes()相对应,需要注意的是,此方法后面还会被调用
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
// 条件一:true
// 条件二:更新后的出站缓冲区总容量小于
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
// 这里来更新下出站缓冲区为可写状态
setWritable(invokeLater);
}
}
接着来看数据真正的刷新操作
flush0()
protected void flush0() {
// 判断是否处于刷新状态
if (inFlush0) {
// Avoid re-entrance
return;
}
// 获取 Channel对应的出站缓冲区
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true; // 表示出站缓冲区处于刷新状态
// Mark all pending write requests as failure if the channel is inactive.
// 一般不会成立 - channel处于关闭的阶段
if (!isActive()) {
try {
// Check if we need to generate the exception at all.
if (!outboundBuffer.isEmpty()) {
if (isOpen()) {
outboundBuffer.failFlushed(new NotYetConnectedException(), true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
}
}
} finally {
inFlush0 = false;
}
return;
}
try {
// 核心方法 - 参数:当前 Channel的出站缓冲区
doWrite(outboundBuffer);
} catch (Throwable t) {
handleWriteError(t);
} finally {
// 更新 inFlush0 - 表示出站缓冲区已经刷新完毕
inFlush0 = false;
}
}
可以看到,此方法主要来设置下一个状态变量 inFlush0,表示当前出站缓冲区正处于数据刷新状态
接着,执行了 doWrite()
NioSocketChannel.doWrite()
// 参数:当前 Channel的出站缓冲区
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
// 获取当前 Netty层面 Channel锁封装着的 JDK层面的 Channel
SocketChannel ch = javaChannel();
// 获取自旋次数 - 16
// 表示下面的 do...while循环最多只会持续 16次
int writeSpinCount = config().getWriteSpinCount();
do {
// 判断出站缓冲区是否还有待刷新的 Entry
if (in.isEmpty()) { // 正常退出 doWrite方法都会从这里退出
// All written so clear OP_WRITE
// 这里来将当前 Channel在 Selector上注册的 OP_WRITE清理掉, 即让当前 Channel不再去对 OP_WRITE事件感兴趣
// 思考:这里为什么要去取消对 OP_WRITE事件的关注?
// 当 Socket可写缓冲区满时, 此时数据写不进去, 此时会去注册 OP_WRITE事件, 通过多路复用,
// 来通知什么数据可写了; so, 当数据全部都写完毕后, 若此时好不去取消对 OP_WRITE事件的关注,
// 那么此时便会导致一个无限的空轮询 (不断唤醒 NioEventLoop可以去写数据啦, 但此时并没有数据可写)
// so, 正常情况下, OP_WRITE不能去进行注册, 否则 Socket写缓冲区空时将会不断触发
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
// 执行到这里, 说明当前 Channel出站缓冲区内有数据待刷新
// Ensure the pending writes are made of ByteBufs only.
// 限定每次从出站缓冲区内转换多少 bytebuf字节数据的一个变量, 该变量的值会随着 ch的状态不断变化(自适应)
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
// 将出站缓冲区内的部分 Entry.msg转换成 JDK Channel依赖的标准对象 ByteBuffer, 这里返回的便是 ByteBufer数组
// 参数一:1024, 表示最多转换出来 1024个 ByteBuffer对象
// 参数二:nioBuffers方法最多能去转换 maxBytes个字节的 ByteBuf对象
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
// 获取出站缓冲区中待出站的 ByteBuffer数目(对应上一步操作)
int nioBufferCnt = in.nioBufferCount();
// Always use nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
break;
case 1: { // 这里通常会是最普遍的情况 - 我们使用得最多的其实是 ctx.writeAndFlush()。
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// 获取数组中唯一的 Bytebuffer
ByteBuffer buffer = nioBuffers[0];
// 获取 buffer有效数据量
int attemptedBytes = buffer.remaining();
// 看, 这就就联系起来了
// 使用 JDK层面的 Channel将 ByteBuffer中数据写入到 Socket缓冲区中去
final int localWrittenBytes = ch.write(buffer);
// 条件成立 - 说明 Socket缓冲区满了, 此时数据没有写进去
if (localWrittenBytes <= 0) {
// 参数:true
// 这一步实际上就是去设置 ch在 Selector上感兴趣事件为 OP_WRITE, 这样的话, Socket缓冲区空闲时, 便可继续写数据了
// 这也是为什么上面为什么要在数据全部写完后, 要去取消 ch 对 OP_WRITE事件的关注
// 具体可以去看看 NioEventLoop.run()
incompleteWrite(true);
return;
}
// 执行到这里, 说明 ByteBuffer已经有数据写入到 Socket缓冲区了(全部或者一部分)
// 参数一:ByteBuffer中有效数据量
// 参数二:本次写到 Socket缓冲区中的数据量
// 参数三:限定每次从出站缓冲区内转换多少 bytebuf字节数据的一个变量
// 这里其实去动态调整下参数三的值
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
// 核心:将写入到 Socket缓冲区中的数据量从出站缓冲区中移除
// 参数:写入到 Socket缓冲区中的数据量
in.removeBytes(localWrittenBytes);
// 自旋次数自减
--writeSpinCount;
break;
}
default: {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// We limit the max amount to int above so cast is safe
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
// 执行到这, 说明 do...while循环了 16次, 也没有将出站缓冲区待刷新的数据处理完毕
// 参数:false
incompleteWrite(writeSpinCount < 0);
}
方法虽然有点长,但核心做的事情却是清晰明了
先是去获取了 JDK层面的 Channel (Netty底层是 NIO,最终数据都是会通过 Channel.write(ByteBuffer)来写入到 Socket写缓冲区中的)
然后获取一个字段次数,writeSpinCount表示着下面要进行的循环最多持续的次数,这里是 16
接着,便是来到 do...while循环,writeSpinCount变量的值控制着循环次数
先是判断出站缓冲区是否还有数据可写,flushed == 0?
// 判断出站缓冲区是否还有待刷新的 Entry
if (in.isEmpty()) { // 正常退出 doWrite方法都会从这里退出
// All written so clear OP_WRITE
// 这里来将当前 Channel在 Selector上注册的 OP_WRITE清理掉, 即让当前 Channel不再去对 OP_WRITE事件感兴趣
// 思考:这里为什么要去取消对 OP_WRITE事件的关注?
// 当 Socket可写缓冲区满时, 此时数据写不进去, 此时会去注册 OP_WRITE事件, 通过多路复用,
// 来通知什么数据可写了; so, 当数据全部都写完毕后, 若此时好不去取消对 OP_WRITE事件的关注,
// 那么此时便会导致一个无限的空轮询 (不断唤醒 NioEventLoop可以去写数据啦, 但此时并没有数据可写)
// so, 正常情况下, OP_WRITE不能去进行注册, 否则 Socket写缓冲区空时将会不断触发
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
public boolean isEmpty() {
return flushed == 0;
}
这里有一个关键,当缓冲区中没有数据出站时,会去调用 clearOpWrite()方法
由方法名也可知,其实就是去设置下当前 channel在 selector上感兴趣的事件集中不包含 OP_WRITE
为什么这样做?我们知道,当我们写数据时,Socket写缓冲区是由容量大小的,此时 NioEventLoop再往 Socket中写数据其实是一直写不进去的,此时该怎么办?
基于多路复用,我们会去设置 ch在多路复用器上感兴趣的事件集中包含 OP_WRITE,这样的话,当 Socket写缓冲区空闲时,NioEventLoop便会继续处理相关的写操作,这一方面是出于高效考虑,一方面便是让一个 NioEventLoop不会在一个 ch上处理过长时间
因此,当出站缓冲区中数据全部都写完后,此时便需要取消外层 ch在 selector上对 OP_WRITE事件的关注,否则的话,ch将会一直被唤醒,而此时并没有数据可写,这相当于是一个 bug了
到此为止,我们只是去获取了 JDK层面的 Channel,而我们要真正去进行数据的写,此时还缺少了一个数据的容器,便是 ByteBuffer,而目前位置,Entry内部封装着只是 ByteBuf,因此,我们需要进行转换
对应的便是 nioBuffers()
需要注意的便是,每一个 nioBuffers()方法的调用,可转换的数据量以及可转换出来的 ByteBuffer数目是有限制的,即对应其方法传过来的参数
// 将出站缓冲区内的部分 Entry.msg转换成 JDK Channel依赖的标准对象 ByteBuffer, 这里返回的便是 ByteBufer数组
// 参数一:1024, 表示最多转换出来 1024个 ByteBuffer对象
// 参数二:nioBuffers方法最多能去转换 maxBytes个字节的 ByteBuf对象
public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
assert maxCount > 0;
assert maxBytes > 0;
// 本次 nioBuffers()方法调用转换出来的 Buffer总容量
long nioBufferSize = 0;
// 本次 nioBuffers()方法调用转换出来的 Buffer数目
int nioBufferCount = 0;
// 可以简单认为是与当前线程绑定关系的 threadLocalMap
final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
// 为每一个线程分配一个长度为 1024的 ByteBuffer数组, 避免每个线程每次调用 nioBuffers()方法时都去创建一个数组 - 性能考虑
ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
// 获取待刷新的第一个节点
Entry entry = flushedEntry;
// 条件一:当前 Entry是可以被刷新的 Entry (entry != null && entry != unflushedEntry)
// 条件二:msg是 ByteBuf类型
while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
// true - 当前 Entry非取消状态
if (!entry.cancelled) {
// 获取需要被处理的数据
ByteBuf buf = (ByteBuf) entry.msg;
final int readerIndex = buf.readerIndex();
// 获取需要被处理的数据量:写偏移量 - 读偏移量
final int readableBytes = buf.writerIndex() - readerIndex;
// true - 有效数据量大于 0
if (readableBytes > 0) {
// 条件一:稍微做些转换 nioBufferSize + readableBytes > maxBytes
// 表示 已转换的数据量 + 本次循环想要去转换的数据量 > 最大可转换的数据量限制 - 退出循环
if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
// If the nioBufferSize + readableBytes will overflow maxBytes, and there is at least one entry
// we stop populate the ByteBuffer array. This is done for 2 reasons:
// 1. bsd/osx don't allow to write more bytes then Integer.MAX_VALUE with one writev(...) call
// and so will return 'EINVAL', which will raise an IOException. On Linux it may work depending
// on the architecture and kernel but to be safe we also enforce the limit here.
// 2. There is no sense in putting more data in the array than is likely to be accepted by the
// OS.
//
// See also:
// - https://www.freebsd.org/cgi/man.cgi?query=write&sektion=2
// - https://linux.die.net//man/2/writev
break;
}
// 来到这, 说明本次循环还可以继续来读取数据
// 累计已转换的数据量
nioBufferSize += readableBytes;
// 大多数情况下, 这里获取的会是 -1, 表示 ByteBuf底层是由一个 ByteBuf组成(非 CompositeByteBuf那种玩法)
int count = entry.count;
if (count == -1) {
//noinspection ConstantValueVariableUse
// 这里我们来考虑池化内存管理(Direct Buffer) - entry.count = count = 1
// 通常而言, 这里计算出来的会是 1, 特殊情况下是 CompositeByteBuf的情况
entry.count = count = buf.nioBufferCount();
}
// 计算出需要多大的 ByteBuffer数组
int neededSpace = min(maxCount, nioBufferCount + count);
// 如果计算出的 ByteBuffer数组长度大于 1024的话, 这里会来进行下数组扩容的逻辑
if (neededSpace > nioBuffers.length) {
nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
NIO_BUFFERS.set(threadLocalMap, nioBuffers);
}
// 正常逻辑下, 我们走这条分支
if (count == 1) {
ByteBuffer nioBuf = entry.buf;
// 第一次执行时, 这里会成立, 因为转换后的结果并未被缓存(对应 entry.buf, entry.bufs)
if (nioBuf == null) {
// cache ByteBuffer as it may need to create a new ByteBuffer instance if its a
// derived buffer
// 参数一:读偏移量 参数二:可读数据量
// 获取 ByteBuf封装着的 ByteBuffer
// 最后便是将转换后的 ByteBuffer作为结果缓存了起来 - Entry.buf
// 我们仍然是对池化内存管理进行考虑
entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
}
// 将刚刚转换出来的 ByteBuffer加入到数组中去
nioBuffers[nioBufferCount++] = nioBuf;
} else {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
nioBufferCount = nioBuffers(entry, buf, nioBuffers, nioBufferCount, maxCount);
}
// true - 本次循环转换的 ByteBuffer数目已达到了最大限制
if (nioBufferCount >= maxCount) {
break;
}
}
}
entry = entry.next; // 迭代下一个 Entry
}
// 更新统计字段
// 更新转换出来的 ByteBuffer数目
this.nioBufferCount = nioBufferCount;
// 更新已转换的总数据量
this.nioBufferSize = nioBufferSize;
return nioBuffers;
}
该方法实际上也是通过一个 while循环去控制着数据的转换持续次数,对应的方法:isFlushedEntry()
private boolean isFlushedEntry(Entry e) {
// e != null - 迭代到了链表末尾
// e != unflushedEntry - 这正是我们之前所讲的那种情况, 存在 flushedEntry -> ... -> unflushedEntry -> ... -> newTailEntry
// 在 unflushedEntry及其之后的数据实际上是在 flush()之后继续往出站缓冲区中所添加的数据, 这些数据不能在本次 flush()中被刷新出去
// 即对应的便是下一次 flush()方法的调用
return e != null && e != unflushedEntry;
}
判定当前迭代的 Entry能否去进行数据的转换操作,这里涉及到了 unflushedEntry != null的考量
循环中实际上就是去进行 Entry所封装着的 ByteBuf数据的转换操作,先是获取 msg,获取其中数据量的一些信息,然后通过 buf.internalNioBuffer()来进行数据的转换,这里我们不考虑 DuplicateBytebuf的情况,此时 ByteBuf是池化内存的
// 参数一:读偏移量 参数二:可读数据量
@Override
public final ByteBuffer internalNioBuffer(int index, int length) {
checkIndex(index, length);
// 参数一:读偏移量 参数二:可读数据量 参数三:false
return _internalNioBuffer(index, length, false);
}
PooledByteBuf._internalNioBuffer()
// 参数一:读偏移量 参数二:可读数据量 参数三:false
final ByteBuffer _internalNioBuffer(int index, int length, boolean duplicate) {
// 这里计算出来的 index实际上是相对于池化那部分内存的自身的一个偏移量
// 添加偏移量后的 index
index = idx(index);
// internalNioBuffer() - 内部 - memory.duplicate()
// 底层 memory.duplicate() - 即 ByteBuffer与 memory管理的是同一块内存
ByteBuffer buffer = duplicate ? newInternalNioBuffer(memory) : internalNioBuffer();
// 这里就是来初始化 buffer对象 - index = 0, limit = length
// 即, 设置 buffer limit为 ByteBuf在共享 memory上最大下标, pos为添加了偏移量后的读索引
buffer.limit(index + length).position(index);
// 经过上面一番操作, ButeBuffer实际上就是一块可操作的固定大小的一块内存
return buffer;
}
设置使用 memory进行零拷贝,生成 ByteBuffer,然后去设置了下一些相关的信息 (对应的便是数据量的标识)
转换出的 ByteBuffer会记录到 Entry字段 buf中去,其实就是简单缓存下而已
这里其实还去做了些数据信息的统计,以来判断是否达到读限制或者转换限制,详细可以去看代码注释
最后,返回的便是一个 ByteBuffer数组
现在,继续将目光返回到 doWrite()方法
之后,便是获取了 ByteBuffer数组的数目 nioBufferCnt,这里我们来考虑一般情况,对应的便是 1,不考虑组合 ByteBuf的玩法
so,接着我们来考虑的便是:
case 1: { // 这里通常会是最普遍的情况 - 我们使用得最多的其实是 ctx.writeAndFlush()。
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// 获取数组中唯一的 Bytebuffer
ByteBuffer buffer = nioBuffers[0];
// 获取 buffer有效数据量
int attemptedBytes = buffer.remaining();
// 看, 这就就联系起来了
// 使用 JDK层面的 Channel将 ByteBuffer中数据写入到 Socket缓冲区中去
final int localWrittenBytes = ch.write(buffer);
// 条件成立 - 说明 Socket缓冲区满了, 此时数据没有写进去
if (localWrittenBytes <= 0) {
// 参数:true
// 这一步实际上就是去设置 ch在 Selector上感兴趣事件为 OP_WRITE, 这样的话, Socket缓冲区空闲时, 便可继续写数据了
// 这也是为什么上面为什么要在数据全部写完后, 要去取消 ch 对 OP_WRITE事件的关注
// 具体可以去看看 NioEventLoop.run()
incompleteWrite(true);
return;
}
// 执行到这里, 说明 ByteBuffer已经有数据写入到 Socket缓冲区了(全部或者一部分)
// 参数一:ByteBuffer中有效数据量
// 参数二:本次写到 Socket缓冲区中的数据量
// 参数三:限定每次从出站缓冲区内转换多少 bytebuf字节数据的一个变量
// 这里其实去动态调整下参数三的值
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
// 核心:将写入到 Socket缓冲区中的数据量从出站缓冲区中移除
// 参数:写入到 Socket缓冲区中的数据量
in.removeBytes(localWrittenBytes);
// 自旋次数自减
--writeSpinCount;
break;
}
可以看到,通过 ch.write(buffer)将数目真正写到 Socket缓冲区了,返回的便是此时写的数据量保存到变量 localWrittenBytes,需要注意的便是 buffer中数据不一定全部都写出去了
这里还有对 ch在 selector感兴趣事件的设置,因为此次 Socket写缓冲区满了,因此,这里利用到了多路复用
if (localWrittenBytes <= 0) {
// 参数:true
// 这一步实际上就是去设置 ch在 Selector上感兴趣事件为 OP_WRITE, 这样的话, Socket缓冲区空闲时, 便可继续写数据了
// 这也是为什么上面为什么要在数据全部写完后, 要去取消 ch 对 OP_WRITE事件的关注
// 具体可以去看看 NioEventLoop.run()
incompleteWrite(true);
return;
}
数据写出来了,因此,这块我们也需要对出站缓冲区中的 Entry做些出站相关操作,对应 in.removeBytes
removeBytes()
// 参数:写入到 Socket缓冲区中的数据量
// writtenBytes 可能是一个 ByteBuffer | 多个 ByteBuffer | 不足一个 ByteBuffer的大小
public void removeBytes(long writtenBytes) {
// 自旋
for (;;) {
// 获取当前 flushedEntry中封装着的数据
Object msg = current();
if (!(msg instanceof ByteBuf)) {
assert writtenBytes == 0;
break;
}
final ByteBuf buf = (ByteBuf) msg;
final int readerIndex = buf.readerIndex();
// 获取 msg中可读数据量
final int readableBytes = buf.writerIndex() - readerIndex;
// true - 说明当前 Entry中的数据已经被写入到 Socket缓冲区中去了
// 这里来移除 flushedEntry对应的 Entry
if (readableBytes <= writtenBytes) {
if (writtenBytes != 0) {
progress(readableBytes);
// 更新 writtenBytes
writtenBytes -= readableBytes;
}
// 关键 - remove()
remove();
} // else - 说明当前 flushedEntry中的数据量并没有全部写入到 Socket缓冲区中去, 此时便不能将 Entry出站
else { // readableBytes > writtenBytes
if (writtenBytes != 0) {
// 更新下 ByteBuf的读偏移量即可
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
clearNioBuffers();
}
这块逻辑更为简单,其实就是根据实际写的数据量,然后遍历出站缓冲区中的 Entry,若是写的数据量大于该 Entry所封装着的 ByteBuf的实际数据量,是的话说明该 Entry中数据已经写出去了,因此需要将 Entry从单向链表中移除,对应的便是 Entry的释放操作,归还到对象池 ObjectPool中去;防止,便是当前 Entry中数据未完全写出去,此时只需要更新下 Entry所封装着的 ByteBuf有效数据量的相关信息即可
前者对应的便是 remove()
public boolean remove() {
Entry e = flushedEntry;
if (e == null) {
clearNioBuffers();
return false;
}
Object msg = e.msg;
ChannelPromise promise = e.promise;
// 获取当前 Entry自身总大小
int size = e.pendingSize;
// 从出站缓冲区中移除 Entry, 并且更新了 flushed字段
removeEntry(e);
if (!e.cancelled) {
// only release message, notify and decrement if it was not canceled before.
// ByteBuf实现了引用计数, 这里 safeRelease更新引用计数, 最终可能会触发 byteBuf归还内存的逻辑
ReferenceCountUtil.safeRelease(msg);
// 设置 promise结果
safeSuccess(promise);
// 因为 Entry出站了嘛, 这块来更新下出站缓冲区中的总容量
decrementPendingOutboundBytes(size, false, true);
}
// recycle the entry
// 归还 Entry到对象池(ObjectPool)中去, ObjectPool负责 Entry的申请与释放
e.recycle();
return true;
}
// 从出站缓冲区中移除 Entry
private void removeEntry(Entry e) {
// true - 待刷新的 Entry已经全部刷新完毕
if (-- flushed == 0) {
// processed everything
flushedEntry = null;
if (e == tailEntry) {
tailEntry = null;
unflushedEntry = null;
}
} else {
// 迭代下一个 Entry
flushedEntry = e.next;
}
}
以及对应的 Entry的归还 ops:recycle()
void recycle() {
next = null;
bufs = null;
buf = null;
msg = null;
promise = null;
progress = 0;
total = 0;
pendingSize = 0;
count = -1;
cancelled = false;
handle.recycle(this);
}
再回到原来方法,出了循环,对应着循环已经持续了 16次了,但数据并未完全写出去,此时便会去调用方法 incompleteWrite()
// 参数:true - do... while循环中
// false - 出了 do...while循环
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
setOpWrite();
} else {
// It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
// use our write quantum. In this case we no longer want to set the write OP because the socket is still
// writable (as far as we know). We will find out next time we attempt to write if the socket is writable
// and set the write OP if necessary.
// 让当前 Channel不再去对 OP_WRITE事件感兴趣
clearOpWrite();
// Schedule flush again later so other tasks can be picked up in the meantime
// 提交了一个任务 - flush0(), 最终还是会继续来处理数据, 直至待刷新的数据全部都处理完毕 - doWrite()
// 这里主要是避免了在多路复用器上其它 ch饥饿等待, 让 NioEventLoop去处理其它 ch上的事件,
// 最后再来处理 NioEventLoop本地任务队列上的任务, 此时便会碰到 flushTask, 便能够继续去处理待发送的数据了
eventLoop().execute(flushTask);
}
}
我们关注的便是 else分支,首先也是先去取消当前 ch对 OP_WRITE操作的关注,然后异步提交了一个任务,对应 flushTask()
private final Runnable flushTask = new Runnable() {
@Override
public void run() {
// Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
// meantime.
((AbstractNioUnsafe) unsafe()).flush0();
}
};
可以看到,这里提交了一个普通任务 flush0(),其实就是让 NioEventLoop后续再去进行处理数据的刷新操作
思考一下,Netty这么设计的缘由是什么?
一方面是对应可刷新数据后续能够全部处理完毕,另一方面主要是来避免 NioEventLoop在一个 ch上处理时间过长,导致 Selector上其它就绪的 ch饥饿等待,因此这里来提交了一个异步任务
而经过前面 NioEventLoop工作原理的分析我们可知,NioEventLoop是优先去处理 IO任务的,其次才是本地任务以及对应的需要被调度的任务
可以看到,Netty在这一方面考虑的的确是挺周到的
至此,出站缓冲区解读完毕!