随笔分类
解读 出站缓冲区 (ChannelOutboundBuffer)
数据是怎么进行出站的?我们直观上认为,数据经过处理器的层层传递,最终会传递到 HeadContext,由 HeadContext负责将数据写入到与 Socket相关的写缓冲区中去,实现数据的发送
然,事实并非如此,so,此篇来对 "出站缓冲区"进行解读!
首先,需要清楚一个概念:当我们将数据通过 ctx进行 write操作时,数据并没有直接写到对端,而当我们真正去调用 ctx.flush()方法时,才会去将数据真正发送到对端,当然,这也只是一种情况,当数据量达到某种量时,也可能会触发数据的发送
其次,对于 Netty层面,当我们在操作数据时,直接获取到的数据会是 ByteBuf类型,当然,这期间我们会通过一些编码译码器的处理将数据转换为某种状态来去做些其它操作,但我们知道,Netty底层是 NIO,即最终我们发送数据时,是通过 ByteBuffer的形式进行写入 Channel后再去发送的,这期间有时如何来做些变化的,so,本期来探秘出站缓冲区
由上一期事件在 pipeline中传播的解析可知,当我们调用 ctx.write()时,数据写事件最终会传递到 HeadContext中去,即对应 HeadContext.wirte()
消息入站
// 参数一:ctx, 当前 HeadContext自身
// 参数二:一般情况下会是 ByteBuf, 当然也有其它情况 如 FileRegion(暂时还不怎么了解...)
// 参数三:与当前动作相关的 promise, 一般而言, 若是业务关心本次写动作, 会去创建一个 Promise, 并去注册一些相关的监听器,
// 以来实现一些相关的操作
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
AbstractUnsafe.wirte
// 参数一:一般情况下会是 ByteBuf, 当然也有其它情况 如 FileRegion(暂时还不怎么了解...)
// 参数二:与当前动作相关的 promise, 一般而言, 若是业务关心本次写动作, 会去创建一个 Promise, 并去注册一些相关的监听器,
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 一般不会走这条分支
if (outboundBuffer == null) {
try {
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
} finally {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise,
newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
}
return;
}
// 表示 msg数据量大小
int size;
try {
// 一般而言, msg是 ByteBuf对象, ByteBuf对象根据内存归属 分为 heap 和 direct类型
// 如果 ByteBuf类型是 heap的话, 这里会将它转为 direct类型
msg = filterOutboundMessage(msg);
// 这里便是以一种通用的方式去获取 msg中的数据量 - msg不一定就是 ByteBuf类型
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) { // 若发生了异常, 会去释放 msg中管理的内存
try {
ReferenceCountUtil.release(msg);
} finally {
safeSetFailure(promise, t);
}
return;
}
// 将 msg中数据添加到 出站缓冲区中去
// 参数一:msg, 我们考虑 ByteBuf类型, 并且为 DirectBuffer
// 参数二:msg中有效数据量大小
// 参数三:与当前 write动作相关的 promise
outboundBuffer.addMessage(msg, size, promise);
}
这里我们关注的两个方法:
AbstractNioByteChannel.filterOutboundMessage()
@Override
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
// 如果 msg是直接内存方式分配的 ByteBuf, 直接返回即可
if (buf.isDirect()) {
return msg;
}
// 这块便是以直接内存方式对其再分配
return newDirectBuffer(buf);
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
DefaultMessageSizeEstimator.HandlerImpl.size()
// 根据 msg具体类型来去获取其数据量
@Override
public int size(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
if (msg instanceof FileRegion) {
return 0;
}
return unknownSize;
}
这一步主要做的事情:如果 msg不是 DirectByteBuf类型,则进行转换;并且以一种比较通用的方式去计算出 msg内部的可读数据量
接着,去调用了 addMessage(),这里是我们所关注的重点
在开始解读该方法之前,先对 ChannelOutBoundBuffer做些必要的说明
由上层调用知,每个 Channel会对应着自己的 Unsafe实例,而每一个 Unsafe实例对应着一个 ChannelOutboundBuffer,so,对于每一个 Channel而言,实际上都持有着一个 "出站缓冲区"
首先来看看其对应的一些字段解读
字段
// 出站缓冲区 - 在上层应用调用 ctx.write()数据最终都会写入到 ChannelOutboundBuffer中去
public final class ChannelOutboundBuffer {
// Assuming a 64-bit JVM:
// - 16 bytes object header
// - 6 reference fields
// - 2 long fields
// - 2 int fields
// - 1 boolean field
// - padding
static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelOutboundBuffer.class);
private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
@Override
protected ByteBuffer[] initialValue() throws Exception {
return new ByteBuffer[1024];
}
};
// 表示当前出站缓冲区归属的 channel
private final Channel channel;
/**
* 可能存在的几种情况:
* a. flushedEntry == null && unflushedEntry != null - 此时出站缓冲区处于数据入站阶段
* b. flushedEntry != null && unflushedEntry == null - 此时出站缓冲区处于数据出站阶段, 即调用了 addFlush方法之后, 会去将 flushedEntry 指向
* unflushedEntry的值, 并且会计算出待刷新的节点数量的值 flushed
*
* c. flushedEntry != null && unflushedEntry != null - 这种情况比较特殊 (特指:数据未 flush完毕又有新的数据入站了)
* 假设业务层面不停地调用 ctx.wirte(msg), msg最终都会调用 unsafe.write(msg) -> ChannelOutboundBuffer.addMessage(msg)
* e1 - e2 - e3 - e4 - e5 -> ... -> eN
* 此时, flushedEntry -> null, unflushedEntry -> e1, tailEntry -> eN
*
* 业务层面接下来调用了 ctx.flush(), 最终会触发 unsafe.flush()
* unsafe.flush() {
* 1. ChannelOutboundBuffer.addFlush() - 将 flushedEntry指向 unflushedEntry, 并且计算出链表中需要被 flush的 Entry数目
* 2. ChannelOutboundBuffer.nioBuffers(...) - 此方法实际上就是去做转换, 最终返回 ByteBuffer[] 数组供下面逻辑使用
* 3. 遍历 byteBuffer数组, 调用 JDK层面 Channel.write(buffer), 该方法会返回真正写入 socket写缓冲区的字节数量, 如 res
* 4. 根据 res去移除出站缓冲区内对应的 entry
* }
*
* socket缓冲区可能写满, 假设写到 byteBuffer[3]时, socket缓冲区写满了, 那么此时 nioEventLoop再去重写也没有, 此时多路复用就起作用了(前提:Channel设置了
* 感兴趣事件为 write), 这样的话, 当 socket写缓冲区空闲时, selector会在此唤醒 NioEventLoop线程去处理。。。
* 假设此时 flushedEntry指向 e4, 业务层面再去调用 ctx.write(msg)时, 此时 unflushedEntry便会指向最新的 e(N + 1)
*
*/
// Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry) - 这也是上述所说的 case:c, 虽发生情况小, 但还是可能发生的
//
// The Entry that is the first in the linked-list structure that was flushed
// 表示待刷新的第一个节点
private Entry flushedEntry;
// The Entry which is the first unflushed in the linked-list structure
// 表示未刷新的第一个节点
private Entry unflushedEntry;
// The Entry which represents the tail of the buffer
// 表示尾结点
private Entry tailEntry;
// The number of flushed entries that are not written yet
// 记录链表中需要被刷新的 Entry数目 - addFlush() 中迭代链表直至末尾来计算得出
private int flushed;
private int nioBufferCount;
private long nioBufferSize;
private boolean inFail;
// 使用字段更新器去更新字段 totalPendingSize - CAS
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
// 表示出站缓冲区共有的字节数据量 - 注:包含 Entry自身字段占用的空间 - Entry -> msg + entry.fields
@SuppressWarnings("UnusedDeclaration")
private volatile long totalPendingSize;
private static final AtomicIntegerFieldUpdater<ChannelOutboundBuffer> UNWRITABLE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "unwritable");
// 表示出站缓冲区是否可写 - 0 -> 可写, 1 -> 不可写
// 如果业务层面不去检查 unwritable的话, 不受限制
@SuppressWarnings("UnusedDeclaration")
private volatile int unwritable;
private volatile Runnable fireChannelWritabilityChangedTask;
ChannelOutboundBuffer(AbstractChannel channel) {
this.channel = channel;
}
由上分析可知,出站缓冲区中存储的并不是 msg本身,而是 Entry,并且 Entry之间形成了个单向链表,这里稍微注意下 flushedEntry、unflushedEntry、TailEntry之间的联系,并且其还来应该一些字段更新器去更新一些字段
Entry
Entry中封装着 msg,及对应 ByteBuf的封装
由于 Entry使用频率高,过多的创建 Entry必然会导致性能的损耗,so,Netty也考虑到了这一点,因此其使用一个对象池 ObjectPool对其申请与释放进行了管理
这里需要注意的一个点:其字段 pendingSize
记录了当前 Entry自身的空间占用大小
static final class Entry { // 对 ByteBuf的封装
// 由于 Entry使用频率特别高, 因此 Netty在此对其做了优化
// 使用 ObjectPool对 Entry来进行管理(申请和释放) - 优化了程序性能
private static final ObjectPool<Entry> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry>() {
@Override
public Entry newObject(Handle<Entry> handle) {
return new Entry(handle);
}
});
// 归还 Entry到 ObjectPool使用的 "句柄"
private final Handle<Entry> handle;
// 由此可见, 单向链表
Entry next;
// 业务层面的信息, 这里通常会是 ByteBuf对象
Object msg;
// 当 unsafe调用出站缓冲区对应的 nioBuffers()方法时, 被涉及到的 entry被转换成 ByteBuffer, 这里来缓存结果使用
ByteBuffer[] bufs;
ByteBuffer buf;
// 业务层面关注 write动作执行结果时提交的 promise
ChannelPromise promise;
// 进度
long progress;
// msg有效数据量
long total;
// ByteBuf有效数据量大小 + 96(对应的便是 Entry中其它字段总大小)
// Assuming a 64-bit JVM:
// - 16 bytes object header
// - 6 reference fields (这里似乎不考虑指针压缩的情况, 故为 8)
// - 2 long fields -
// - 2 int fields
// - 1 boolean field
// - padding
// so, 16 + 48 + 2 * 8 + 2 * 4 + 1 = 89, 由于 padding(必须为 8的倍数), 故 96
int pendingSize; // 即, 表示的其实是 Entry字段本身空间总大小
// 表示当前 msg ByteBuf底层由多少个 ByteBuffer组成, 一般情况下会是 1
// 但考虑到 CompositeByteBuf, 底层可以由多个 ByteBuf组成, 对应的便是多个 ByteBuffer
int count = -1;
// 表示当前 Entry是否取消刷新至 socket, 默认情况下是 false
boolean cancelled;
private Entry(Handle<Entry> handle) {
this.handle = handle;
}
addMessage()
// 将 msg中数据添加到 出站缓冲区中去
// 参数一:msg, 我们考虑 ByteBuf类型, 并且为 DirectBuffer
// 参数二:msg中有效数据量大小
// 参数三:与当前 write动作相关的 promise
public void addMessage(Object msg, int size, ChannelPromise promise) {
// 参数一:msg, 我们考虑 ByteBuf类型, 并且为 DirectBuffer
// 参数二:msg中有效数据量大小
// 参数三:其实也是有效数据量大小 == size
// 参数四:与当前 write动作相关的 promise
// 这一步获取到包装着 msg的 Entry对象
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
// 下面便是将 Entry对象加入到 Entry链表中去, 表示数据入站到出站缓冲区中
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
// 累加出站缓冲区总大小
// 参数一:Entry自身总大小
// 参数二:false
incrementPendingOutboundBytes(entry.pendingSize, false);
}
newInstance(),向 ObjectPool申请 Entry
// 最终返回一个 Entry, 里头包装着 msg
static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
// 从对象池 ObjectPool中获取一个空闲的 Entry对象, 若 ObjectPool中不存在空闲的 Entry对象, 则去创建一个 Entry对象
Entry entry = RECYCLER.get();
// 以下就是对 Entry中字段进行赋值操作
entry.msg = msg;
// 即, msg有效数据量 + 96 -- 对应 Entry本身空间总大小
entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
entry.total = total;
entry.promise = promise;
return entry;
}
即,从对象池 ObjectPool中获取一个空闲的 Entry对象, 若 ObjectPool中不存在空闲的 Entry对象, 则去创建一个 Entry对象,这块设计设计太常见了...
当封装着 msg的 Entry申请成功后,添加到链表中去,其实相应的便是对 unflushedEntry、TailEntry进行更新
接着,其实际上还去做了一件事情:去累计当前出站缓冲区的总大小
increatmentPendingOutboundBytes
// 累加出站缓冲区总大小
// 参数一:Entry自身总大小
// 参数二:false
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
// 使用字段更新器去更新字段 totalPendingSize - CAS
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
// 如果累加完之后的值, 大于出站缓冲区的高水位, 则设置 unWritable字段 表示不可写, 并且向 channel pipeline发起 unwritable更改事件
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
// 设置出站缓冲区为不可写状态
private void setUnwritable(boolean invokeLater) {
for (;;) {
// 表示旧值
final int oldValue = unwritable;
// 表示新值 1
final int newValue = oldValue | 1;
// 通过 CAS去更新字段值 unwritable
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
// true - 说明 Channel状态已经发生了改变
if (oldValue == 0) {
// 参数:false
// 往 pipeline中广播 当前 Channel可写状态已经发生改变的一个事件
// 这里需要注意的是:这里虽然是去设置了不可写状态, 向 pipeline中去传播了该事件, 若是没有对应 handler去进行处理的话,
// 实际上这块数据还是不断往出站缓冲区中写, 因此, 若是业务层面有需要的话, 可以去设置些 handler去关注此可写状态的改变,
// 如:可以修改写的一个速率之类的...
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
// 参数:false
private void fireChannelWritabilityChanged(boolean invokeLater) {
final ChannelPipeline pipeline = channel.pipeline();
if (invokeLater) {
Runnable task = fireChannelWritabilityChangedTask;
if (task == null) {
fireChannelWritabilityChangedTask = task = new Runnable() {
@Override
public void run() {
pipeline.fireChannelWritabilityChanged();
}
};
}
channel.eventLoop().execute(task);
} else {
// 往 pipeline中广播 当前 Channel可写状态已经发生改变的一个事件
pipeline.fireChannelWritabilityChanged();
}
}
这一块,我认为是 Netty提供给我们的一个处理消息的扩展点
即,由上可以看出,当出站缓冲区内数据达到一定量时 (这里可以认为是,达到了 "高水位"),便会去设置其状态为不可写状态,然后往 pipeline中去广播了事件,即对应的是 Channel可写状态发生改变的一个事件
因此,在上层业务层面进行操作时,即对应在 ctx.write()操作之前,我们可以先去判断 channel当前的可写状态,若可写状态发生了改变,可以去做些写操作相关的控制:如,控制写速率的大小等
若是上层应用不去做任何事情的话,实际上,这块数据的累加,水位判断,状态传播就不会发挥出任何作用