随笔分类
解读 Netty资源泄漏追踪
Netty有办法去知道目前系统正在发生内存资源泄漏吗?
答案是:有的!
前置知识:引用,Netty依赖于引用队列技术实现资源泄漏追踪;在此略微科普下,大概引用实例关联对象被 gc时,引用实例会先被加入到 pending队列中,而当 Reference类加载时,会默认启动一个线程去消费 pending队列中元素,根据元素类型去做些响应处理,其中有一种处理逻辑便是将元素存入到引用队列中去
前置知识,对 DirectByteBuffer解读 - 良夜的博客 (liangye-xo.xyz)
代码分析入口,仍然是:PooledByteBufAllocator.newDirectBuffer()
申请内存是从这个方法进去,但是基于资源泄漏的追踪,Netty并不是简单地便让从池子中申请地这块内存直接就返回去,在返回去之前,做了些相关的处理
// 参数一:指定想要去分配的内存
// 参数二:允许分配的最大内存:Integer.MAX_VALUE
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
// 每个线程都会去 threadCache(PoolThreadLocalCache类型, 继承自FastThreadCache)中去获取线程相关的 PoolThreadCache(起到延迟释放 ByteBuf的作用)
// 这里便是去获取 | 创建线程独占的 PoolThreadCache, 并进行相应的初始化操作
// 每个 PoolThreadCache中会有两个区域:HeapArena、DirectArena
PoolThreadCache cache = threadCache.get();
// 获取分配给当前线程的 directArena区域, 后面会在这个区域进行内存的申请操作
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
// true
if (directArena != null) {
// 参数一:当前线程相关的 PoolThreadCache
// 参数二:指定想要去分配的内存容量
// 参数三:允许分配的最大内存:Integer.MAX_VALUE
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
// 可以看到, 这里并没去将获取的 ByteBuf直接返回了, 而是去做了些处理
// 资源泄漏追踪的入口
// 该方法会根据检测级别进行追踪 byteBuf资源对象, 默认检测级别是 simple.
// simple级别 - 会抽样一小部分 byteBuf进行资源追踪, 当被追踪的资源检测到发生资源泄漏后, 资源泄漏检测器会去打印 error日志
return toLeakAwareBuffer(buf);
}
对应的便是方法 toLeakAwareBuffer(buf)
这块引入了两个东西,资源泄漏探测器 ResourceLeakDetector、资源泄漏追踪器 DefaultResourceLeak,以及对应的资源监控级别 Level
对于前者,我们发现是 AbstractByteBuf创建的一个全局范围内的 ResourceLeakDetector的一个实例
// 可以看到, 这里来创建了一个全局范围内的 ResourceLeakDetector的一个实例
static final ResourceLeakDetector<ByteBuf> leakDetector =
ResourceLeakDetectorFactory.instance().newResourceLeakDetector(ByteBuf.class);
ResourceLeakDetector
字段
// 资源泄漏检测器
public class ResourceLeakDetector<T> {
// jvm参数 - 可以通过 key去修改资源的监控级别
private static final String PROP_LEVEL_OLD = "io.netty.leakDetectionLevel";
private static final String PROP_LEVEL = "io.netty.leakDetection.level";
// 表示默认的监控级别:SIMPLE
private static final Level DEFAULT_LEVEL = Level.SIMPLE;
// 可以通过 key去修改资源追踪器(leak)的记录数
private static final String PROP_TARGET_RECORDS = "io.netty.leakDetection.targetRecords";
// 资源追踪器默认的记录数为:4, 这个值并不是说不能超过它, 而是达到它后会有限制
private static final int DEFAULT_TARGET_RECORDS = 4;
// 可以通过 key去修改采样阈值
private static final String PROP_SAMPLING_INTERVAL = "io.netty.leakDetection.samplingInterval";
// There is a minor performance benefit in TLR if this is a power of 2.
// 默认的采样阈值:128
private static final int DEFAULT_SAMPLING_INTERVAL = 128;
// 下面这两个就是对应的保存值
// 表示资源追踪器(leak)的记录数
private static final int TARGET_RECORDS;
// 表示采样阈值
static final int SAMPLING_INTERVAL;
静态代码块
// 其实就是去设置一些字段而已
static {
final boolean disabled;
// 判断是否有去配置不去使用资源泄漏检测
if (SystemPropertyUtil.get("io.netty.noResourceLeakDetection") != null) {
disabled = SystemPropertyUtil.getBoolean("io.netty.noResourceLeakDetection", false);
logger.debug("-Dio.netty.noResourceLeakDetection: {}", disabled);
logger.warn(
"-Dio.netty.noResourceLeakDetection is deprecated. Use '-D{}={}' instead.",
PROP_LEVEL, DEFAULT_LEVEL.name().toLowerCase());
} else { // 默认情况会走这条分支
disabled = false;
}
Level defaultLevel = disabled? Level.DISABLED : DEFAULT_LEVEL;
// First read old property name
String levelStr = SystemPropertyUtil.get(PROP_LEVEL_OLD, defaultLevel.name());
// If new property name is present, use it
levelStr = SystemPropertyUtil.get(PROP_LEVEL, levelStr);
Level level = Level.parseLevel(levelStr);
TARGET_RECORDS = SystemPropertyUtil.getInt(PROP_TARGET_RECORDS, DEFAULT_TARGET_RECORDS);
SAMPLING_INTERVAL = SystemPropertyUtil.getInt(PROP_SAMPLING_INTERVAL, DEFAULT_SAMPLING_INTERVAL);
ResourceLeakDetector.level = level;
if (logger.isDebugEnabled()) {
logger.debug("-D{}: {}", PROP_LEVEL, level.name().toLowerCase());
logger.debug("-D{}: {}", PROP_TARGET_RECORDS, TARGET_RECORDS);
}
}
默认的资源泄漏监控级别是 Simple,Level是个枚举,对应的还有哪些可去设置的监控级别呢?
Level
/**
* Represents the level of resource leak detection.
*/
public enum Level {
/**
* Disables resource leak detection.
*/
// 禁用资源泄漏检测
DISABLED,
/**
* Enables simplistic sampling resource leak detection which reports there is a leak or not,
* at the cost of small overhead (default).
*/
// 实现简单的资源泄漏检测, 报告是否存在泄漏, 以最小开销为代价(默认)
SIMPLE, // 这是默认的资源检测级别
/**
* Enables advanced sampling resource leak detection which reports where the leaked object was accessed
* recently at the cost of high overhead.
*/
// 启用高级的资源泄漏检测, 其会去报告泄漏对象最近访问位置, 以昂贵的开销为代价
ADVANCED,
/**
* Enables paranoid resource leak detection which reports where the leaked object was accessed recently,
* at the cost of the highest possible overhead (for testing purposes only).
*/
// 启用偏执资源泄漏检测级别, 报告最近访问泄漏对象的位置, 以尽可能高的开销为代价(仅用于测试目的)
// 实际上就是会每个资源都去进行追踪检测
PARANOID;
以及,最为关键的一个:内存资源泄漏追踪器 DefaultResourceLeak
DefaultResourceLeak
字段
// 可以看到, 内存泄漏追踪器去继承了弱引用
@SuppressWarnings("deprecation")
private static final class DefaultResourceLeak<T>
extends WeakReference<Object> implements ResourceLeakTracker<T>, ResourceLeak {
@SuppressWarnings("unchecked") // generics and updaters do not mix.
private static final AtomicReferenceFieldUpdater<DefaultResourceLeak<?>, Record> headUpdater =
(AtomicReferenceFieldUpdater)
AtomicReferenceFieldUpdater.newUpdater(DefaultResourceLeak.class, Record.class, "head");
@SuppressWarnings("unchecked") // generics and updaters do not mix.
private static final AtomicIntegerFieldUpdater<DefaultResourceLeak<?>> droppedRecordsUpdater =
(AtomicIntegerFieldUpdater)
AtomicIntegerFieldUpdater.newUpdater(DefaultResourceLeak.class, "droppedRecords");
// 说明这里会是一个保存 Record的链表 - 老套路了
// Record - 资源使用的记录信息, 其继承自 Throwable, 天生就带着创建时的 线程堆栈信息
@SuppressWarnings("unused")
private volatile Record head;
// 删除的 record记录数
// 为什么要去删除呢 ?
// 因为 Record链表是有长度限制的, 当超过长度阈值后, 会去选择性地删除一些 record记录
@SuppressWarnings("unused")
private volatile int droppedRecords;
// 保存当前资源泄漏追踪器的 set集合
private final Set<DefaultResourceLeak<?>> allLeaks;
// 被追踪对象的哈希值
// 这块确实是想得周到, 为什么不去保存被追踪对象本身呢 ?
// 如果去保存了被追踪对象自身的话, 那么被追踪对象相当于就多了一个强引用, 后面进行资源泄漏追踪时就会有问题
private final int trackedHash;
这里可以看到一个关键信息,Leak继承了 WeakReference,so,它是拥有着我们之前将的那些引用特性的
这里包含了一个链表,链表中记录的是 Record,Record本身继承了 Throwable,so通过这个我们可以通过 Record去打印一些资源创建时的线程堆栈信息
以及一个字段 allLeaks,此字段对应的便是当前资源泄漏检测器的 set集合
仔细一看,这块还有一个较为巧妙地设计,其去保存的了被追踪对象的哈希值!这块确实是想得周到, 为什么不去保存被追踪对象本身呢 ? 如果去保存了被追踪对象自身的话, 那么被追踪对象相当于就多了一个强引用, 后面进行资源泄漏追踪时就会有问题
so,主类介绍了完,继续回到主逻辑
正题
toLeakAwareBuffer()是资源泄漏追踪的入口
// 资源泄漏追踪的入口
// 该方法会根据检测级别进行追踪 byteBuf资源对象, 默认检测级别是 simple.
// simple级别 - 会抽样一小部分 byteBuf进行资源追踪, 当被追踪的资源检测到发生资源泄漏后, 资源泄漏检测器会去打印 error日志
protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
ResourceLeakTracker<ByteBuf> leak;
switch (ResourceLeakDetector.getLevel()) {
// 默认级别便是 SIMPLE
case SIMPLE:
// 去调用了 ResourceLeakDetector.trace()方法
// 参数:从内存池获取的 ByteBuf对象
leak = AbstractByteBuf.leakDetector.track(buf);
// 条件成立 - 说明当前 buf被追踪了
if (leak != null) {
// 这块便是将 buf、leak封装成 SimpleLeakAwareByteBuf对象, 即返回给上层业务的是对应的封装(通过方法重写实现无差别使用)
buf = new SimpleLeakAwareByteBuf(buf, leak);
}
break;
case ADVANCED:
case PARANOID:
leak = AbstractByteBuf.leakDetector.track(buf);
if (leak != null) {
buf = new AdvancedLeakAwareByteBuf(buf, leak);
}
break;
default:
break;
}
return buf;
}
因为默认的内存泄漏追踪级别是 SIMPLE,so,这块会去执行 leakDetector.track(buf),接着来到:
ResourceLeakDetector.track()
// 参数:从内存池获取的 ByteBuf对象(尚未判断是否需要对该资源进行追踪)
@SuppressWarnings("unchecked")
public final ResourceLeakTracker<T> track(T obj) {
// 参数:从内存池获取的 ByteBuf对象
return track0(obj);
}
// 参数:从内存池获取的 ByteBuf对象
@SuppressWarnings("unchecked")
private DefaultResourceLeak track0(T obj) {
// 获取资源泄漏监控级别
Level level = ResourceLeakDetector.level;
// 条件成立, 说明资源泄漏机制处于关闭状态
if (level == Level.DISABLED) {
// 这里直接返回了 null, 对应的便是返回给业务使用的便是原生的从内存池中获取的 ByteBuf对象
return null;
}
// 条件成立 - 说明 level处于 simple或 advanced级别
if (level.ordinal() < Level.PARANOID.ordinal()) {
// 这里来计算出一个随机数, 随机数范围 samplingInterval(采样阈值), 若该随机数等于 0,
// 那么就对该资源进行追踪
if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
// 上报之前发生的泄漏情况
reportLeak();
// 创建资源泄漏追踪器(继承了 weakReference)
// 参数一:从内存池获取的 ByteBuf对象, 其实就是要去追踪的对象, WeakReference会去关联它
// 参数二:资源泄漏检测器范围内的引用队列, 当 obj被回收后, leak会被添加到 refQueue中去(前提:leak关联着 obj)
// 参数三:allLeaks, 资源泄漏追踪器范围内的 集合 leaks, 每创建一个 leak, 会将该 leak加入到 leaks集合内
// 后续判断是否发生资源泄漏都与该集合有关系
return new DefaultResourceLeak(obj, refQueue, allLeaks);
}
// 否则, 返回 null, 表示不追踪
return null;
}
// 执行到这, level便是 paranoid(偏执)级别
reportLeak();
return new DefaultResourceLeak(obj, refQueue, allLeaks);
}
在这里可以看到,Netty虽然是启用了内存资源泄漏检测机制,但也不是会去对所有的资源进行检测,对应 SIMPLE、ADVANCE这两种检测级别,实际上默认都会是去基于 "抽样检测"机制去判断是否需要对资源进行追踪,这块也比较容易理解,若是对所有资源的都去进行追踪的话,而 Netty服务器可能每秒分配的内存数不计数,这将会是一笔巨大的开销,so,这里会基于一定策略进行资源的检测
实际上,在对资源进行追踪前,还会去上报之前发生的内存泄漏情况,对应的便是方法 reportLeak(),这个方法是个关键,这个方法我们先留在后面讲
之后,便是去创建出了内存资源追踪器 DefaultResourceLeak
这里需要注意的便是,这里传递的 refQueue,allLeaks都是当前内存泄漏检测器中默认创建的,即 leak间会去共享
// 这里默认来创建出了一个保存 资源泄漏追踪器的 set集合
// 后续判断是否发生资源泄漏都与该集合有关系
private final Set<DefaultResourceLeak<?>> allLeaks =
Collections.newSetFromMap(new ConcurrentHashMap<DefaultResourceLeak<?>, Boolean>());
// 可以看到, 这里默认来创建出了一个引用队列(Leak会使用到)
// 在当前资源泄漏检测器范围内创建的 leak都会使用到, 即会去共享该队列
private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
即,判断出该资源需要进行追踪时,会去为该资源创建一个内存资源追踪器
// 创建资源泄漏追踪器(继承了 weakReference)
// 参数一:从内存池获取的 ByteBuf对象, 其实就是要去追踪的对象, WeakReference会去关联它
// 参数二:资源泄漏检测器范围内的引用队列, 当 obj被回收后, leak会被添加到 refQueue中去(前提:leak关联着 obj)
// 参数三:allLeaks, 资源泄漏追踪器范围内的 集合 leaks, 每创建一个 leak, 会将该 leak加入到 leaks集合内
// 后续判断是否发生资源泄漏都与该集合有关系
DefaultResourceLeak(
Object referent,
ReferenceQueue<Object> refQueue,
Set<DefaultResourceLeak<?>> allLeaks) {
// 这块就是去调用了 WeakReference的构造嘛
super(referent, refQueue);
assert referent != null;
// Store the hash of the tracked object to later assert it in the close(...) method.
// It's important that we not store a reference to the referent as this would disallow it from
// be collected via the WeakReference.
// 存储被追踪对象的哈希, 以便后续在 close(...)中断言它
// 重要的是, 我们不要去存储 referent的引用, 因为这将禁止它从 WeakReference中进行收集
trackedHash = System.identityHashCode(referent);
// 将当前创建出来的内存资源追踪器添加到资源泄漏检测器范围内的 leaks集合中去
allLeaks.add(this);
// Create a new Record so we always have the creation stacktrace included.
// 记录一条 record信息, 通过这一条 record可以知道当前被追踪对象的创建时的线程堆栈信息 - Record继承了 Throwable
headUpdater.set(this, new Record(Record.BOTTOM));
this.allLeaks = allLeaks;
}
可以看到,创建出来的 leak会被添加到资源泄漏检测器范围内的 leaks集合中去
以及 Ref特性嘛,也去调用了 WeakReference的构造,去保存了下被追踪对象以及引用队列
然后,再回到 toLeakAwareBuffer()中:由于这块去创建了 leak,表示当前资源需要被追踪,因此,这块会去将 leak与 buf封装成一个 SimpleLeakAwareByteBuf对象,然后再返回给业务使用
而 SimpleLeakByteBuf是什么?其实际上就是去对 buf的封装。
class SimpleLeakAwareByteBuf extends WrappedByteBuf {
SimpleLeakAwareByteBuf(ByteBuf wrapped, ResourceLeakTracker<ByteBuf> leak) {
this(wrapped, wrapped, leak);
}
SimpleLeakAwareByteBuf(ByteBuf wrapped, ByteBuf trackedByteBuf, ResourceLeakTracker<ByteBuf> leak) {
super(wrapped);
this.trackedByteBuf = ObjectUtil.checkNotNull(trackedByteBuf, "trackedByteBuf");
this.leak = ObjectUtil.checkNotNull(leak, "leak");
}
因此,这块来到:WrappedByteBuf的构造
protected WrappedByteBuf(ByteBuf buf) {
this.buf = ObjectUtil.checkNotNull(buf, "buf");
}
这块实际上就是去重写一些方法,底层调用的实际上还是被封装对象里头的方法
如,调用 capacity()
@Override
public final int capacity() {
return buf.capacity();
}
而我们主要关心于其去重写的方法:release(),这才是核心方法
@Override
public boolean release() {
// 说明 byteBuf已经释放内存到池子中去了
if (super.release()) {
// 这块去关闭内存泄漏追踪器
closeLeak();
return true;
}
return false;
}
@Override
public boolean release() {
return buf.release();
}
接着来到:AbstractReferenceCountedByteBuf.release()
@Override
public boolean release() {
return handleRelease(updater.release(this));
}
private boolean handleRelease(boolean result) {
// true - 这块会去释放内存(此时引用计数为 0)
if (result) {
deallocate();
}
return result;
}
接着去执行了 deallocate()
PooledByteBuf.deallocate()
// 回收内存的入口
@Override
protected final void deallocate() {
if (handle >= 0) {
// 申请内存时 表示内存位置信息的 handle
final long handle = this.handle;
// 设置为 -1, 表示当前 buf不再管理内存
this.handle = -1;
memory = null;
// 参数一:表示当前 byteBuf管理内存归属的 chunk
// 参数二:不关心, 可能是 null
// 参数三:申请内存时 表示内存位置信息的 handle
// 参数四:当前 byteBuf的最大可用大小
// 参数五:与当前线程相关的本地缓存 PoolThreadCache, 释放内存时优先将内存位置信息缓存到线程的本地缓存中去
chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
tmpNioBuf = null;
chunk = null;
// 将 PooledByteBuf归还到对象池
recycle();
}
}
可以看到,当 release()后判断出引用计数为 0时,去执行了 deallocate()
该方法主要便是去将当前 PooledByteBuf归还到对象池中去,其管理的内存归还到内存池中去
接着回到上层,当 byteBuf已经释放内存到池子中去了,接着去执行了 closeLeak(),这块便失去关闭内存泄漏追踪器
private void closeLeak() {
// Close the ResourceLeakTracker with the tracked ByteBuf as argument. This must be the same that was used when
// calling DefaultResourceLeak.track(...).
// 参数:trackedByteBuf - 被追踪对象
boolean closed = leak.close(trackedByteBuf);
assert closed;
}
// 参数:trackedByteBuf - 被追踪对象
@Override
public boolean close(T trackedObject) {
// Ensure that the object that was tracked is the same as the one that was passed to close(...).
// 可以看到, 这一步去进行了断言 - trackedHash起作用了
// 这一步去对应创建 leak时所计算出来的被追踪对象的 hash与此时传递进来的对象的 hash是否一致(其实就是去确保二者是一致的, 以来执行正确关闭逻辑)
assert trackedHash == System.identityHashCode(trackedObject);
try {
// 执行 close()
return close();
} finally {
// This method will do `synchronized(trackedObject)` and we should be sure this will not cause deadlock.
// It should not, because somewhere up the callstack should be a (successful) `trackedObject.release`,
// therefore it is unreasonable that anyone else, anywhere, is holding a lock on the trackedObject.
// (Unreasonable but possible, unfortunately.)
reachabilityFence0(trackedObject);
}
}
@Override
public boolean close() {
// 正常逻辑:
// 1.将当前内存泄漏追踪器从内存泄漏检测器范围内的 allLeaks集合中移出去
if (allLeaks.remove(this)) {
// Call clear so the reference is not even enqueued.
// 2.断开 ref与 referent之间的关联关系断开
// leak是弱引用, 这里将引用字段设置为 null, 后续被管理的对象被 gc时, 就不会去进行通知了(即不会再去将 leak添加到引用队列中去了)
clear();
// 3.将调用记录清除掉(头插法的 record单向链表嘛)
headUpdater.set(this, null);
return true;
}
return false;
}
可以看到,正常逻辑下去执行的便是 close()方法,对应的便是,将当前内存泄漏追踪器从内存泄漏检测器范围内的 allLeaks集合中移出去,断开 ref与 referent之间的关联关系断开,最后便是将调用记录清除掉(头插法的 record单向链表嘛)
这是正常逻辑会去执行的 close(),而当 cnt不等于 0时,那就不会来到此方法中去
那么,前面埋得坑 reportLeak()起作用了,便是去上报之前的内存泄漏情况
ResourceLeakDetector.reportLeak()
private void reportLeak() {
// 条件成立, 说明当前项目并没有开启 error日志级别...(直接去清空引用即可, 无需进行上报了)
if (!needReport()) {
// 清空引用队列
clearRefQueue();
return;
}
// Detect and report previous leaks.
// 这里其实就是通过引用队列去消费引用队列
for (;;) {
DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
// true - 说明当前引用队列中元素已经全部消费完了
if (ref == null) {
break;
}
// ref.dispose()如果返回了 true, 说明资源发生泄漏了
// 条件成立,
if (!ref.dispose()) {
continue;
}
// 执行到这, 说明当前 leak关联对象发生资源泄漏了, 需要进行报告逻辑
// 获取异常信息
String records = ref.toString();
// 这块就是去进行异常信息的报告逻辑了
if (reportedLeaks.add(records)) {
if (records.isEmpty()) {
reportUntracedLeak(resourceType);
} else {
reportTracedLeak(resourceType, records);
}
}
}
}
可以看到,这块主要去消费 leak引用队列中元素,队列什么时候中会有元素?即 leak关联着的被追踪对象被 gc时,leak会存放到 RefQueue中去
这块去执行了 ref.dispose()
// 来到这, 说明 leak已经进入到了队列中去
boolean dispose() {
// 清空 leak所关联的对象
clear();
// 尝试将当前内存泄漏追踪器从内存泄漏检测器范围内的 allLeaks集合中移出去
// 正常情况下, close时就已经将当前 leak从 allLeaks集合中移出去了
// 如果这里返回了 true, 说明没能正常去关闭资源, 即资源发生泄漏了
return allLeaks.remove(this);
}
clear()中清除 leak与被追踪资源之间的关联
之后便是尝试从 allLeaks中去移除当前 leak对象,而我们知道正常闭环逻辑下,我们会在 close()方法中去进行移除 leak,而执行到了这,被追踪对象已经被 gc了,若此时 allLeaks中还包含该对象的话,那说明只有一种情况,便是 byteBuf已经没有强引用关联,但其引用计数还不等于 0的情况,这便判断出了 Netty内存池的资源泄漏
判断出来以后,会干什么?
若是程序有去开启 ERROR级别的日志打印的话,便会去执行:
// 执行到这, 说明当前 leak关联对象发生资源泄漏了, 需要进行报告逻辑
// 获取异常信息
String records = ref.toString();
// 这块就是去进行异常信息的报告逻辑了
if (reportedLeaks.add(records)) {
if (records.isEmpty()) {
reportUntracedLeak(resourceType);
} else {
reportTracedLeak(resourceType, records);
}
}
至此,Netty资源泄漏追踪解读完毕!