随笔分类
Channel解读
最近,读了秋大的些许文章,收获颇多,至此些许分享,不足之处耐指出.
我们知道,Channel其实就是对 Socket的 包装增强,Channel功能实现最终还是会围绕着 Socket做文章
赋予 Channel可异步中断处理的能力
顶层接口:
// Channel是对 Socket的包装增强, Channel功能的实现会围绕着 Socket做文章.
public interface Channel extends Closeable {
/**
* Tells whether or not this channel is open.
*
* @return {@code true} if, and only if, this channel is open
*/
public boolean isOpen();
/**
* Closes this channel.
*
* <p> After a channel is closed, any further attempt to invoke I/O
* operations upon it will cause a {@link ClosedChannelException} to be
* thrown.
*
* <p> If this channel is already closed then invoking this method has no
* effect.
*
* <p> This method may be invoked at any time. If some other thread has
* already invoked it, however, then another invocation will block until
* the first invocation is complete, after which it will return without
* effect. </p>
*
* @throws IOException If an I/O error occurs
*/
public void close() throws IOException;
}
可以看到,Channel中设定简单明了,无外乎判断 Channel状态,以及去关闭 Channel
我们主要来探讨下关闭 Channel这一动作,关心于 ClosedChannelException是如何在代码中发生的
有时候,程序中可能需要去异步关闭 Channel,这恰恰也是我们需要的:若有一个线程正在阻塞在 Channel IO上,此时别的线程去调用了 Channel.close(),那么阻塞的线程将会接收到一个 AsynchronousException异常
同样,我们也需要考虑另一种情况,如果一个线程正阻塞在一个 Channel的 IO上,此时别的线程去调用了该阻塞线程的 intrrrupt(),从而使 Channel关闭,那么此阻塞线程应该要收到 ClosedByInterruptException异常,同时会去将中断状态设置到该线程上去
如果 Channel已经被关闭了,又有别的线程去调用了 IO操作,此时该线程会收到 ClosedChannelException异常
上述所讲的,发现在顶层接口 Channel中并没有体现出来,因此想要实现 异步关闭 Channel的功能,就需要有基于该功能实现的接口,对应的就是对 Channel的扩展,不难发现,这个接口就是 Interruptible,也就是说实现了该接口的 Channel就将具有异步关闭的功能!
InterruptibleChannel定义如下:
public interface InterruptibleChannel
extends Channel
{
/**
* 当 Channel被异步关闭时, 任何阻塞在当前 Channel执行 IO操作的线程, 都会收到一个 AsynchronousCloseException异常
* @throws IOException If an I/O error occurs
*/
public void close() throws IOException;
}
而具体实现是在:AbstractInterruptibleChannel
AbstractInterruptibleChannel是对 InterruptibelChannel的扩展,其提供了对 异步中断的支持:begin()、end()
首先我们需要知道,对于支持 Channel中断的代码是如何来进行编写的:注释中提供了一个 范式
boolean completed = false;
try {
begin();
completed = ...; // Perform blocking I/O operation 执行阻塞 IO操作
return ...; // Return result
} finally {
end(completed);
}
NIO中规定了,在执行阻塞 IO操作前,必须要去调用 begin(),IO ops执行完后,须确保 end()方法会被执行,因此放在 finally代码块中
begin()
// begin()负责去添加 Channel的中断处理器到当前线程
protected final void begin() {
// 初始化中断处理对象, 中断处理对象提供了中断处理的一个回调
if (interruptor == null) {
interruptor = new Interruptible() {
// 中断处理回调中登记了被中断的线程, 然后调用了 implCloseChannel()去关闭 Channel
public void interrupt(Thread target) {
synchronized (closeLock) {
// 这里来保证只会被关闭一次
if (closed)
return;
closed = true;
// 登记被中断的线程
interrupted = target;
try {
// 关闭当前 channel
AbstractInterruptibleChannel.this.implCloseChannel();
} catch (IOException x) { }
}
}};
}
// 将 interruptor 中断处理对象注册到当前线程中去
blockedOn(interruptor);
// 判断当前线程是否被中断, 如果已经被中断, 可能中断处理对象没有被执行, 这里手动执行以下
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
begin()中负责去将 中断处理对象 Interruptor注册到当前线程中去,并且其本身提供了一个回调方法:interrupt()
我们来看看是如何进行 Interruptor注册的:blockedOn()
// jdk.internal.access.SharedSecrets
static void blockedOn(Interruptible intr) { // package-private
// getJavaLangAccess()返回 JavaLangAccess对象, 顾名思义, 该对象提供了 java.lang包下一些非公开方法的访问.
// 这里实质上会去调用 Thread.blockedOn()
SharedSecrets.getJavaLangAccess().blockedOn(intr);
}
SharedSecrets是什么?
SharedSecrets是一个神奇而糟糕的类, 为什么说是糟糕呢 ?
这个类的存在就是为了访问 JDK库中一些由于 类作用域限制而外部无法访问的类或者方法,JDK中很多类或方法是私有或者包级别私有的, 外部是无法访问的, 但是 JDK在本身设计实现时又存在相互依赖的情况,因此, 为了外部在不依赖与反射访问这些类或者方法情况下, 提供了这么一个类 SharedSecrets, 提供了各种超越限制的方法
可以看到,这里调用了 getJavaLangAccess()
那么其对应的初始化是在哪里呢?
在 System的 initPhase1()
这个方法会在 System初始化后会被调用:
/**
* Initialize the system class. Called after thread initialization.
*/
// 在线程初始化后调用 - 初始化系统类
private static void initPhase1() {
// VM might invoke JNU_NewStringPlatform() to set those encoding
// sensitive properties (user.home, user.name, boot.class.path, etc.)
// during "props" initialization, in which it may need access, via
// System.getProperty(), to the related system encoding property that
// have been initialized (put into "props") at early stage of the
// initialization. So make sure the "props" is available at the
// very beginning of the initialization and all system properties to
// be put into it directly.
props = new Properties(84);
initProperties(props); // initialized by the VM
// There are certain system configurations that may be controlled by
// VM options such as the maximum amount of direct memory and
// Integer cache size used to support the object identity semantics
// of autoboxing. Typically, the library will obtain these values
// from the properties set by the VM. If the properties are for
// internal implementation use only, these properties should be
// removed from the system properties.
//
// See java.lang.Integer.IntegerCache and the
// VM.saveAndRemoveProperties method for example.
//
// Save a private copy of the system properties object that
// can only be accessed by the internal implementation. Remove
// certain system properties that are not intended for public access.
VM.saveAndRemoveProperties(props);
lineSeparator = props.getProperty("line.separator");
StaticProperty.javaHome(); // Load StaticProperty to cache the property values
VersionProps.init();
FileInputStream fdIn = new FileInputStream(FileDescriptor.in);
FileOutputStream fdOut = new FileOutputStream(FileDescriptor.out);
FileOutputStream fdErr = new FileOutputStream(FileDescriptor.err);
setIn0(new BufferedInputStream(fdIn));
setOut0(newPrintStream(fdOut, props.getProperty("sun.stdout.encoding")));
setErr0(newPrintStream(fdErr, props.getProperty("sun.stderr.encoding")));
// Setup Java signal handlers for HUP, TERM, and INT (where available).
Terminator.setup();
// Initialize any miscellaneous operating system settings that need to be
// set for the class libraries. Currently this is no-op everywhere except
// for Windows where the process-wide error mode is set before the java.io
// classes are used.
VM.initializeOSEnvironment();
// The main thread is not added to its thread group in the same
// way as other threads; we must do it ourselves here.
Thread current = Thread.currentThread();
current.getThreadGroup().add(current);
// register shared secrets
// 这是个比较核心的方法
setJavaLangAccess();
// Subsystems that are invoked during initialization can invoke
// VM.isBooted() in order to avoid doing things that should
// wait until the VM is fully initialized. The initialization level
// is incremented from 0 to 1 here to indicate the first phase of
// initialization has completed.
// IMPORTANT: Ensure that this remains the last initialization action!
VM.initLevel(1);
}
里面有去调用 setJavaLangAccess,里面有着对应的 blockedOn()
public void blockedOn(Interruptible b) {
// 可以看到, 这里最终去调用了 Thread.blockedOn(b)
Thread.blockedOn(b);
}
即,对应到 Thread.blockedOn()
/* Set the blocker field; invoked via jdk.internal.access.SharedSecrets
* from java.nio code
*/
// 这个方法就是为 NIO Coding而存在的, 通过 jdk.internal.access.SharedSecretx
static void blockedOn(Interruptible b) {
Thread me = Thread.currentThread();
// 串行化 blocker相关操作
// blocker - 就是中断处理对象 Interruptor
synchronized (me.blockerLock) {
me.blocker = b;
}
}
private volatile Interruptible blocker;
private final Object blockerLock = new Object();
现在来到重头戏了,当我们对线程中断后,又是如何去调用 NIO中注册的中断处理对象?
public void interrupt() {
if (this != Thread.currentThread()) {
checkAccess();
// thread may be blocked in an I/O operation
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // set interrupt status 这一步会设置 interrupt标志位
// 这里来执行了下 Interruptor 中断处理对象的回调方法
b.interrupt(this);
return;
}
}
}
// set interrupt status
// 如果没有绑定中断处理对象的话, 就走正常逻辑 - 设置下标志位即可
interrupt0();
}
可以看到,这里判断了下当前中断线程是否有中断处理对象,没有的话就简单地设置了下标志位,有的话进行了下回调:Interruptor.interrupt()
在回调方法中关闭了 Channel,这就达到了异步关闭 Channel的效果,但现在还不完整,被中断的线程应当接收到 Exception
end()
// end()是在 IO操作完或者中断完后的操作, 负责判断中断是否发生 ? 是否存在 IO操作未结束但 Channel被异步关闭的情况 ?
// 根据不同的逻辑去做不同的判断操作, 抛异常
// completed - 当前 IO ops是否完成 ?
protected final void end(boolean completed)
throws AsynchronousCloseException
{
// 清空线程的中断处理器引用, 避免线程一致存活导致中断处理器无法被回收.
blockedOn(null);
Thread interrupted = this.interrupted;
// 表明当前线程被中断, 而将 Channel关闭的情况 (Interruptor.interrupt()), IO操作也随之中断了, 此时需要抛 ClosedByInterruptException异常
if (interrupted != null && interrupted == Thread.currentThread()) {
this.interrupted = null;
// 抛中断关闭异常
throw new ClosedByInterruptException();
}
// IO未完但 Channel被关闭了, 则需要抛 AsynchronousClose异常, 表示 Channel被异步关闭了
if (!completed && closed)
// 抛异步关闭异常
throw new AsynchronousCloseException();
}
显然易见,在这块代码里头根据 IO操作的结果以及 Channel的状态,来抛出的对应的 Exception
但需明确一点:当线程在 IO操作时,若对应的 Channel被中断了,那么 IO操作将无法进行下去,即被打断了,抛异常,再去执行 end()
这块在高并发下的场景比较多,此处稍列几种:
- A线程在读,B线程中断了 A线程,Channel被关闭了,A线程将收到 ClosedByInterruptException
- A、B线程进行 read,C线程中断了 A线程
- A线程被中断时,B线程刚进入 read,A线程将抛出 ClosedByInterruptException,B线程也将在 ensureOpen()抛出 ClosedChannelException
- A线程被中断时,B线程正阻塞在 read IO上,A线程将抛出 ClosedByInterruptException,B线程也将抛出 AsynchronousClosedException
- A线程被中断时,B线程已经读取数据完毕,此时 B线程正常返回,A线程返回 ClosedByInterruptException
begin()、end()的体现很多处,如:ServerSocketChannelImpl,FileChannelImpl
总结
在 Java IO时期,人们为了中断 IO操作想了不少方法,但核心操作是 关闭流,促使 IO操作抛出异常,达到中断 IO操作的效果;
NIO中,将这个操作植入了 Thread.interrupt()中去,这就避免用户去编码特定代码的麻烦;这就使得 IO操作可像其它可中断方法一样,在中断时抛出 ClosedByInterruptException异常,业务层面去捕获异常再去执行对应的响应逻辑即可
后续有空的话,会来补充系列:赋予 Channel可多路复用、网络 Socket能力,mark