随笔分类
解读 Pipeline
先来看看 Pipeline的接口设计:
对应 Invoker中实际上来规定了可往 pipeline中传递的出站、入站事件,而 pipeline本身相当于就是一个容器,其提供了许多对容器中存储元素的增删改查方法,以及对 Invoker继承来的方法做了些许修改(返回值)
而对于 ChannelPipeline呢,其里面提供了许多对 ChannelHandlerContext的增删改查的方法,其实由启动流程时我们也知道,管道中存储的并不是 handler本身,而是对 handler的封装,DefaultChannelHandlerContext,其才是真正的关键
其还去重写了从 Invoker中继承而来的方法,修改了返回值为 ChannelPipeline,其实也是为了方便我们使用
/** <h1>以下重写了从 Invoker获取的方法, 修改了下返回值, 这也是为了更加方便我们的使用</h1> **/
@Override
ChannelPipeline fireChannelRegistered();
@Override
ChannelPipeline fireChannelUnregistered();
@Override
ChannelPipeline fireChannelActive();
@Override
ChannelPipeline fireChannelInactive();
@Override
ChannelPipeline fireExceptionCaught(Throwable cause);
@Override
ChannelPipeline fireUserEventTriggered(Object event);
@Override
ChannelPipeline fireChannelRead(Object msg);
@Override
ChannelPipeline fireChannelReadComplete();
@Override
ChannelPipeline fireChannelWritabilityChanged();
@Override
ChannelPipeline flush();
so,现在来看看管道中存储的 handler吧
ChannelHandler
// 可以看到, 顶层接口中主要定义了两个与 handler添加、移除相关的回调方法
// 实际上这两个方法的回调是由 Channel对应的 EventLoop去执行的.
public interface ChannelHandler {
/**
* Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
*/
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
* anymore.
*/
// 这个方法中可以来进行一些相关的释放资源相关的操作
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called if a {@link Throwable} was thrown.
*
* @deprecated if you want to handle this event you should implement {@link ChannelInboundHandler} and
* implement the method there.
*/
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
/**
* Indicates that the same instance of the annotated {@link ChannelHandler}
* can be added to one or more {@link ChannelPipeline}s multiple times
* without a race condition.
* <p>
* If this annotation is not specified, you have to create a new handler
* instance every time you add it to a pipeline because it has unshared
* state such as member variables.
* <p>
* This annotation is provided for documentation purpose, just like
* <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>.
*/
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
// no value
}
}
其封装对应顶层接口
AbstractChannelHandlerContext
属性
// 我们主要关注于 pipeline中事件的传播机制
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
// 后驱
volatile AbstractChannelHandlerContext next;
// 前驱
volatile AbstractChannelHandlerContext prev;
private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
*/
// 添加等待状态 - handlerAdded尚未被执行, 可能是 Channel尚未初始化
private static final int ADD_PENDING = 1;
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
*/
// 已经添加到了管道中 - 对应 handlerAdded方法已经执行了
private static final int ADD_COMPLETE = 2;
/**
* {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
// 已经从 pipeline中移除了 - 对应 handlerRemoved已经执行结束了
private static final int REMOVE_COMPLETE = 3;
/**
* Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
* nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
// 这也是默认状态 - 刚创建出来的 handler
private static final int INIT = 0;
// 外层管道容器
private final DefaultChannelPipeline pipeline;
// 一般情况下, 不会去指定, 当向 pipeline添加 ctx时, ctx会默认生成
private final String name;
// 一般会是 true
private final boolean ordered;
// 与 handler所重写方法相关的一个掩码值
private final int executionMask;
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
// 一般会是 null, 若 handler中操作耗时, 可来指定 group执行
final EventExecutor executor;
private ChannelFuture succeededFuture;
// Lazily instantiated tasks used to trigger events to a handler with different executor.
// There is no need to make this volatile as at worse it will just create a few more instances then needed.
private Tasks invokeTasks;
// 默认状态便是 INIT
private volatile int handlerState = INIT;
由属性可知,pipeline中存储的实际上就是双向链表,对应 next、prev
并且对于 ctx,其存在着一些对应的状态,按周期为:INIT、ADD_PENDING、ADD_COMPLETE、REMOVE_COMPLETE
在此稍微解读下,ctx默认创建出来对应状态便是 INIT,当对应 channel尚未注册完时,即对应 executor为 null,此时 ctx为 ADD_PENDING,当 channel已经注册完了,对应状态为 ADD_COMPLETE,当 handler被移除或者说是 channel关闭时(其实,关闭时回去卸载 ctx),ctx状态为 REMOVE_COMPLETE
构造
其实,我们一直好奇的是,在 pipeline中事件时如何来传递了,不急,咋们来慢慢揭露其蒙纱
咋们先来思考下,当事件传播时,ctx是如何来去找到其下一个能去处理该事件的 ctx的,这其实是不是涉及到了某种标识?
// 参数一:pipeline外层容器, 盛装 ctx的管道容器
// 参数二:executor执行器, 一般会是 null - 对于一些操作耗时的 handel, 可以执行 NioEventLoop来去执行, 以来提升程序处理能力
// 参数三:handle在 pipeline中的 name
// 参数四:真正去处理业务的 处理器 class
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
// 参数:handler真实 class
// mask方法主要来计算出一个掩码, 该掩码的作用是方便 ctx向后传递时查找到下一个合适的 ctx
// 返回值 - 最终 mask中记录的便是 handler中去重写的方法, 即标识其对应的掩码
this.executionMask = mask(handlerClass);
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
// 一般会是 true
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
mask()方法是一个关键,即返回值赋予了 executionMask,由字面可知,这是一个执行相关的掩码标识
参数传递了当前 handlerClass进去,其实到了现在,我们知道这些底层实现,但凡设计了 class相关的,都不可避免使用到了反射,那这里的反射又是体现在哪里?
# ChannelHandlerMask.mask()
这里涉及到了一个新类:ChannelHandlerMask
// 参数:handler真实 class
// mask方法主要来计算出一个掩码, 该掩码的作用是方便 ctx向后传递时查找到下一个合适的 ctx
static int mask(Class<? extends ChannelHandler> clazz) {
// Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast
// lookup in the future.
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) { // 我们考虑这条分支 - 不考虑缓存
// 返回值 - 最终 mask中记录的便是 handler中去重写的方法, 即标识其对应的掩码
mask = mask0(clazz);
cache.put(clazz, mask); // 缓存以来提升程序性能
}
return mask;
}
// 参数:handler真实 class
// 返回值 - 最终 mask中记录的便是 handler中去重写的方法, 即标识其对应的掩码
private static int mask0(Class<? extends ChannelHandler> handlerType) {
// 0b 0000 0000 0000 0000 0000 0000 0000 0001
int mask = MASK_EXCEPTION_CAUGHT;
try {
// 条件成立 - 说明当前 handler是 ChannelInboundHandler的子类
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
// 0b 0000 0000 0000 0000 0000 0001 1111 1111
mask |= MASK_ALL_INBOUND;
// 参数一:handler class
// 参数二:方法名
// 参数三:ChannelHandlerContext.class
// isSkippable - 判断当前 handler是否有去重写了指定方法 - 重写方法上不会有 @Skip注解! - 即只有重写的方法才有意义!
// true - 说明当前 handler未去重写指定方法
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
// 0b 0000 0000 0000 0000 0000 0001 1111 1111
// &
// 0b 1111 1111 1111 1111 1111 1111 1111 1101
// -> 0000 0000 0000 0000 0000 0001 1111 1101
mask &= ~MASK_CHANNEL_REGISTERED; // 实际上就是从掩码中去除该方法对应的标识掩码
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_USER_EVENT_TRIGGERED;
}
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
mask &= ~MASK_READ;
}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {
mask &= ~MASK_WRITE;
}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
mask &= ~MASK_FLUSH;
}
}
if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
mask &= ~MASK_EXCEPTION_CAUGHT;
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}
return mask; // 最终 mask中记录的便是 handler中去重写的方法, 即标识其对应的掩码
}
可以看到,在 mask0()方法中计算出了 mask,关键方法在于 isSkippable()方法的调用
// 参数一:handler class
// 参数二:方法名
// 参数三:ChannelHandlerContext.class
@SuppressWarnings("rawtypes")
private static boolean isSkippable(
final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
Method m;
try {
// 这里不就是通过反射去获取执行参数的方法 ? - 对接 ChannelInboundHandler中方法
m = handlerType.getMethod(methodName, paramTypes);
} catch (NoSuchMethodException e) {
if (logger.isDebugEnabled()) {
logger.debug(
"Class {} missing method {}, assume we can not skip execution", handlerType, methodName, e);
}
return false;
}
// 条件一:找到指定方法
// 条件二:当前 handler是继承了 ChannelHandlerInboundAdapter, 并且 method对应方法并没有去重写
return m != null && m.isAnnotationPresent(Skip.class);
}
});
}
可知,isSkippable()方法中通过传过来的 class,根据 methodName、paraTypes反射去匹配 method,其就是来判断 handler有没有去我们所关心的方法,有的话需要去处理
当然,对应未去重写的 method,其对应父类中仅是传给下一个 ctx,而对于重写的 method,显然才是我们所关注的业务逻辑实现
怎么去判断是不是重写了呢?判断有没有对应注解 @Skip即可,父类中我们可以看到会有 @Skip注解
如,ChannelInboundHandlerAdapter
@Skip // 这个注解是个关键, 如果子类重写该方法, 对应注解则消失 - so, 依据此来去判断子类是否有去重写对应方法
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
so,我们通过该反射便可以逐个比对目标方法有没有被重写,没有的话则需要去修改下 mask,即最终 mask会表示出当前 ctx内所封装着的 handler所去实现的方法
而每个目标方法也就对应着一个唯一标识掩码,这也是属性中所预定好的了
// 这是一个辅助类
final class ChannelHandlerMask {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelHandlerMask.class);
// Using to mask which methods must be called for a ChannelHandler.
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;
// 计算出与入站相关的掩码:0000 0000 0000 0001 1111 1110
static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
// 计算出与入站相关的掩码:(包含 MASK_EXCEPTION_CAUGHT) 0000 0000 0001 1111 1111
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
// 计算出与出站相关的掩码:1111 1110 0000 0000
static final int MASK_ONLY_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
// 计算出与出站相关的掩码:(包含 MASK_EXCEPTION_CAUGHT) 1 1111 1110 0000 0001
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;
private static final FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>> MASKS =
new FastThreadLocal<Map<Class<? extends ChannelHandler>, Integer>>() {
@Override
protected Map<Class<? extends ChannelHandler>, Integer> initialValue() {
return new WeakHashMap<Class<? extends ChannelHandler>, Integer>(32);
}
};
而,现在我们来看看 ctx真正类型,即 DefaultChannelHandlerContext
DefaultChannelHandlerContext
// 实现了 handler() - 这样, ctx的功能就全体现出来了!
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
// 内部封装着的 handler
private final ChannelHandler handler;
// 参数一:pipeline外层容器, 盛装 ctx的管道容器
// 参数二:executor执行器, 一般会是 null - 对于一些操作耗时的 handel, 可以执行 NioEventLoop来去执行, 以来提升程序处理能力
// 参数三:handle在 pipeline中的 name
// 参数四:真正去处理业务的 处理器
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, handler.getClass()); // 可以看到, 这里间接地将 handler暴露给了 AbstractChannelHandlerContext
this.handler = handler;
}
@Override
public ChannelHandler handler() {
return handler;
}
}
由此可见,其内部封装着 handler,至此,我们大概知道了 ctx的作用了
DefaultChannelPipeline
这也是我们所关注的 pipeline
addLast
我们比较好奇与 handler添加到 pipeline的一些逻辑
// 参数:业务层面的处理器 - 这也是我们使用得最多的添加 method
public final ChannelPipeline addLast(ChannelHandler handler) {
// 此时, name - null
return addLast(null, handler);
}
@Override
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
// 参数一:group - null
// 参数二:name - 不显式指定这里也会是 null
// 参数三:业务层面 handler
return addLast(null, name, handler);
}
// 参数一:group - null - 不指定的话, 这里会是 null
// 参数二:name - 不显式指定这里也会是 null
// 参数三:业务层面 handler
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
// 桥梁 ctx
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 设置 added属性为 true, 表示当前 handler已经添加到某个 pipeline中去了
checkMultiplicity(handler);
// 这一步将 ChannelInitializer封装成了 DefaultChannelHandlerContext
// 参数一:事件执行器组, 一般如果未指定, 来到这时会是 null
// 参数二:filterName(name, handler) 如果 name == null, 此时为 ctx指定一个名字
// 生成的名字:handlerType + "#" + 编号(这里的编号会来防止命名的重复 - 自增)
// 参数三:当前 handler
newCtx = newContext(group, filterName(name, handler), handler);
// 此时便是需要将 ctx加入到 pipeline中!
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
// true - 当前 pipeline归属的 channel并未注册, 对应的 NioEventLoop还未进行绑定, 即此时 executor = null
// 个人理解:这块主要是来预防空指针异常
if (!registered) { // 对于 ChannelInitializer, 此时 channel尚未注册
newCtx.setAddPending(); // 设置 handler状态为已经入队状态 ADD_PENDING, 但还未去调用 handlerAdd方法
// 这一步来将当前 ctx封装成一个任务然后添加到一个全局单向链表中去
callHandlerCallbackLater(newCtx, true); // 待 channel注册完之后, 对应 executor会去执行(详见 invokeHandlerAddedIfNeeded())
return this; // so, 此时便可以放心返回了!
}
// 来到这时, channel已经进行了注册
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 调用 ctx中 handler的 handleAdded方法
callHandlerAdded0(newCtx);
return this;
}
可以看到,在 newContex(filterName中为 ctx指定生成了一个 name,生成名字:handlerType + "#" + 编号(这里的编号会来防止命名的重复 - 自增))中初始化了 newCtx
// 参数一:事件执行器组, 一般如果未指定, 来到这时会是 null
// 参数二:filterName(name, handler) 如果 name == null, 此时为 ctx指定一个名字
// 生成的名字:handlerType + "#" + 编号(这里的编号会来防止命名的重复 - 自增)
// 参数三:当前 handler
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
然后就是添加到 pipeline中,对应 addLast0()
// 不难看出, 所谓的 addLast其实是去将 handler(可以看到, pipeline中存储着的实际上是封装后的 handler)添加到 tail的前面
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
添加完后,对应的便是状态的修改, 以及 handlerAdd()方法的触发,但此时可能 channel还未注册,so,这里使用到了一个全局维护的单向链表,其实这块在 ChannelInitializer添加时已经涉及到了
对应:
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
// true - 当前 pipeline归属的 channel并未注册, 对应的 NioEventLoop还未进行绑定, 即此时 executor = null
// 个人理解:这块主要是来预防空指针异常
if (!registered) { // 对于 ChannelInitializer, 此时 channel尚未注册
newCtx.setAddPending(); // 设置 handler状态为已经入队状态 ADD_PENDING, 但还未去调用 handlerAdd方法
// 这一步来将当前 ctx封装成一个任务然后添加到一个全局单向链表中去
callHandlerCallbackLater(newCtx, true); // 待 channel注册完之后, 对应 executor会去执行(详见 invokeHandlerAddedIfNeeded())
return this; // so, 此时便可以放心返回了!
}
若 channel未注册,则先去设置 ctx状态为 ADD_PENDING,然后将当前 ctx封装成一个任务,添加到单向链表中去了,对应方法 callHandlerCallbackLater
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
// 这一步将 ChannelInitializer封装后的 ChannelHandlerContext再进行了一次封装 - PendingHandlerAddedTask
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
// 这里来执行的便是单向链表的入队操作
if (pending == null) {
pendingHandlerCallbackHead = task; // 可以看到, 这一步 pendingHandlerCallbackHead中存储的最里层所封装着的实际上就是 ChannelInitializer
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
目的,其实也是当 channel注册完后,获取取出该链表中任务逐一执行对应的 handlerAdded方法,并去修改下其对应状态为 handlerAdded
对应执行的方法便是 # DefaultChanelPipeline.callHandlerAddedForAllHandlers,当 channel注册完后,提交一个异步任务,在异步任务中会去遍历该单向链表
如:
// 需清楚, 此方法的执行是由 EventLoop线程来做的
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// This Channel itself was registered.
registered = true;
// 这里就是来获取存储 task的单向链表 - 以服务端启动分析, 此时获取的便是对 ChannelInitializer最外层包装后的 task
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
// 遍历单向链表, 执行结果
while (task != null) { // 单向链表中存储的便是一个个待执行的任务, 如:channel尚未注册时, 此时便需要将 ctx封装成任务添加到此全局的单向链表中去, 待 exector执行
task.execute();
task = task.next;
}
}
so,当 ctx被封装成任务之后,存放到全局的单向链表中,就可以方向的 return了,因为对应 channel会在注册完之后,会注意遍历该单向链表,执行对应的任务
具体的任务实际上是和其对应实例有关,这里我们来考虑 PendingHandlerAddedTask的情况
private final class PendingHandlerAddedTask extends PendingHandlerCallback {
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}
@Override
public void run() {
callHandlerAdded0(ctx);
}
@Override
void execute() {
// 这一步, 来获取到的便是 EventLoop
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
executor, ctx.name(), e);
}
atomicRemoveFromHandlerList(ctx);
ctx.setRemoved();
}
}
}
}
显然,此时会来执行 callHandlerAdded0()
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.callHandlerAdded(); // 执行这
} catch (Throwable t) {
boolean removed = false;
try {
atomicRemoveFromHandlerList(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}
callHandlerAdded
final void callHandlerAdded() throws Exception {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
// 更新当前 ctx状态为 ADD_COMPLETE
if (setAddComplete()) {
// 以服务端启动视角来看, handler()来获取的便是 ChannelInitializer
// 执行 handlerAdded方法
handler().handlerAdded(this);
}
}
// 修改状态
final boolean setAddComplete() {
for (;;) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE) {
return false;
}
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
// exposing ordering guarantees.
if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return true;
}
}
}
可见,当 executor去执行任务时,通过 CAS将 ctx状态修改为 ADD_COMPLETE,然后去执行其内部封装着的 handler对应的 HandlerAdded()
so,至此,ctx的添加逻辑阶段完毕!
接着我们来看事件究竟是如何在 pipeline中传播的吧!
由于 pipeline中多数是模板代码,这里就以 fireChannelRegistered()为例
fireChannelRegistered
需要明晰一点,调用 pipeline的对应 fire系列方法,比如说是对应入站事件的话,是会从 HeadContext开始传递,这是区别于调用 ctx的 fireXXX,因此,我们才总说通过调用 ctx的 fireXXX方法可以类似于做些许短路操作,过滤掉一些不感兴趣的 ctx
@Override
public ChannelHandlerContext fireChannelRegistered() {
// findContextInbound(MASK_CHANNEL_REGISTERED) 参数:与 ChannelRegister()相关的掩码
// 不难想象, 其实就是通过这个掩码值找到合适的即感兴趣的 ctx去进行处理
// fireContextInbound - 找到实现了目标方法的 ctx(后面的 ctx, 进行迭代判断查找)
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered(); // 第一次来执行的实际上便是 HeadContext.invokeChannelRegistered
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
private void invokeChannelRegistered() {
// 这里来判断 handler的状态
if (invokeHandler()) {
try {
// 可以看到, 这一步来调用了 handle.channelRegistered()
((ChannelInboundHandler) handler()).channelRegistered(this); // 第一次来执行的实际上就是 HeadContext.channelRegistered
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRegistered();
}
}
当判定 handler的状态合法后,便会去执行 handler对应的方法了
状态合法其实也就是判定 ctx是不是 ADD_COMPLETE状态,这实际上也挺符合逻辑的,因为只有完全添加成功的 ctx(channel已经注册完毕),其内部方法才会被调用嘛
private boolean invokeHandler() {
// Store in local variable to reduce volatile reads.
int handlerState = this.handlerState;
// 这里来判断 handler是不是已经入队状态了 ADD_COMPLETE
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}
so,此时我们来到 HeadContext.channelRegistered()
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
// 由于 firstRegistration == false, 实际上这一步啥也没干
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered(); // 以服务端启动 demo来看, 这里实际上就是传递到了 LoggingHandler
}
接着,来到 AbstractChannelHandlerContext.fireChannelRegistered
@Override
public ChannelHandlerContext fireChannelRegistered() {
// findContextInbound(MASK_CHANNEL_REGISTERED) 参数:与 ChannelRegister()相关的掩码
// 不难想象, 其实就是通过这个掩码值找到合适的即感兴趣的 ctx去进行处理
// fireContextInbound - 找到实现了目标方法的 ctx(后面的 ctx, 进行迭代判断查找)
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
findContextInbound()
// 参数:与 目标方法相关的掩码
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
// 寻找合适的 ctx
do {
ctx = ctx.next; // 不断迭代下一个 ctx
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND)); // 返回 true - 表明当前 ctx不是我们想要找的那个 ctx, 需要 skip
return ctx;
}
通过循环遍历管道中 ctx后续的 ctx,条件是什么呢?
skipContext()
// 参数一:当前 ctx
// 参数二:执行 ctx的事件轮询器
// 参数三:目标方法相关的掩码值
// 参数四:MASK_ONLY_INBOUND
private static boolean skipContext(
AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
// Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT
// 可以理解为 大判断(起到筛选作用)|| 真正逻辑判断 ctx.executionMask & Mask == 0
// 返回 true - 表明当前 ctx不是我们想要找的那个 ctx, 需要 skip
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
// We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
// everything to preserve ordering.
//
// See https://github.com/netty/netty/issues/10067
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}
so,此时我们便清楚了,当迭代到的 ctx对应的 executionMask属性值(前面讲过,其唯一标识了 ctx内部的 handler所去重写的方法)中包含,此时考虑的便是 MASK_CHANNEL_REGISTERED值,如果不包含,返回 true,包含则返回 false,结束循环,接着来到外层方法:invokeChannelRegistered
so,这个方法前面讲过了,其实就是去执行 ctx内封装着的 handler对应方法
这不就实现了事件的传递了吗?
so,这里需要来关注一个点:我们所添加的 ctx内封装的 handler,若其重写方法中未去显式调用 super.channelXXX
或者是 ctx.fireChannelXXX
方法,实则事件就会再此 handler被隔断,即不会再继续往下传播
尤其,这对于 write事件十分重要,这可能涉及到数据最终能够出站,发送到客户端!
so,至此,pipeline的解读完毕!