随笔分类
FastThreadLocal
JDK中已经引入了 ThreadLocal,为什么 netty还要基于此进行增强造出一个 FastThreadLocal(FTL),对比之下又解决了什么痛点?
在 JDK中每个线程都会有一个 ThreadLocalMap,其采用了懒式创建法,当线程第一次访问此变量时会去创建此 map
JDK中此 map基于探针碰撞
的方式,如果未找到空闲的 slot,会继续往后找,而在 hash冲突严重的情况下,这种方式效率较低
而 ftl通过了分配 index的方式解决了此问题,其基于 AtomicInteger实现,因此每个 ftl创建时都会分配一次唯一的 index
而 ftl.get()获取值时则是直接根据 index获取 ftl绑定的值即可,而 ftl则是搭配 ftlf、ftlr进行使用的,或者说这么去使用才是正确用法
源码分析:
FastThreadLocal的实现涉及到了 InternalThreadMap,这也是其专享存储的 map
/**
* The internal data structure that stores the thread-local variables for Netty and all {@link FastThreadLocal}s.
* Note that this class is for internal use only and is subject to change at any time. Use {@link FastThreadLocal}
* unless you know what you are doing.
*/
class UnpaddedInternalThreadLocalMap {
// 不是 ftlt时使用, 由其名字可看出来
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
// 为 ftl分配唯一索引
static final AtomicInteger nextIndex = new AtomicInteger();
/** Used by {@link FastThreadLocal} */
// 存储每一个 ftl对应值
Object[] indexedVariables;
可以看到这块里头一个专门存储 ftl绑定值的数组,再结合其对应的 index便可快速获取到其对应值
其子类的成员:
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
// 标识数组的槽位暂未被使用
public static final Object UNSET = new Object();
// 使用 bit的方式标识 ftl是否启动了 cleaner 清理线程
// Long[] long 64 bit
// 扩容增加 Long元素即可
// bit == 1说明对应 ftl已经启动了清理线程
private BitSet cleanerFlags;
可知,这块通过一个对象 UNSET来去判断数组中对应 slot是否空闲,其本身便是一个标识位,只需要在创建 map时将 UNSET填充进去即可
要发挥出 ftl的性能优势,需要结合 ftlt一块使用,其继承了 Thread,其内部继承了 InternalThreadMap,如果当前线程是 ftlt的话,则会去使用 InternalThreadMap
接下来分析下 ftl:
private final int index;
// 创建的 ftl于此获取对应 index
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
ftl创建时为其分配唯一的 index
map的创建也是采用了懒汉式方式,第一次接触变量时触发 map的创建
/**
* Returns the current value for the current thread
*/
@SuppressWarnings("unchecked")
public final V get() {
// 创建对应的 map
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // ①
// 获取数组下表对应的值
Object v = threadLocalMap.indexedVariable(index); // ②
// 若获取到了有效的值
if (v != InternalThreadLocalMap.UNSET) { // ③
// 直接返回对应的值
return (V) v;
}
// 我们来考虑这种情况 - 获取不到有效值进行初始化
return initialize(threadLocalMap); // ④
}
一步步进行分析,① 首先去获取 map
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
// 根据当前线程类型创建 快、慢 map
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}
根据当前线程类型去选择创建何种 map,此出承接上述所说的 ftl与 ftlt搭配使用才是最佳方式
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
private static InternalThreadLocalMap slowGet() {
// 父类类型为 jdk ThreadLocal,并且从 threadLocal中获取 InternalThreadLocalMap
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
// 这块去创建普通的 map
slowThreadLocalMap.set(ret);
}
return ret;
}
创建出对应的 map,然后绑定到当前线程中去
② 根据 ftl绑定 index从数组中获取其 value
public Object indexedVariable(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length? lookup[index] : UNSET;
}
③ 如果获取到的值有效而非 UNSET,则直接返回
④ 获取到无效的值,则进行初始化操作 initialize操作
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
// 这里便去调用了重写后的方法(PoolThreadLocalCache.initialValue())
v = initialValue();
} catch (Exception e) {
PlatformDependent.throwException(e);
}
// 参数一:ftl对应下标
// 参数二:ftl所绑定值
threadLocalMap.setIndexedVariable(index, v);
addToVariablesToRemove(threadLocalMap, this);
return v;
}
这块首先是去获取 ftl对应默认值,这是一个扩展点
/**
* Returns the initial value for this thread-local variable.
*/
// 扩展点, 可以单独为 ftl设置默认的值
protected V initialValue() throws Exception {
return null;
}
然后便是将默认值塞到 map中去,必要时刻会去进行数组的扩容操作
/**
* @return {@code true} if and only if a new thread-local variable has been created
*/
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else { // 对数组进行扩容
expandIndexedVariableTableAndSet(index, value);
return true;
}
}
private void expandIndexedVariableTableAndSet(int index, Object value) {
Object[] oldArray = indexedVariables;
final int oldCapacity = oldArray.length;
int newCapacity = index;
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
newCapacity ++;
Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
newArray[index] = value;
indexedVariables = newArray;
}
这块其实也体现了一种玩法,便是类成员和类构造初始化优先级的次序,因此这块 variablesToRemoveIndex值是 0,即 map中存放 ftl对应值是从 index == 1开始存放的
/**
* 这是全局静态共享的 index
*/
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
@SuppressWarnings("unchecked")
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
// 获取第一次槽位中对应元素
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
// 创建了一个集合,集合中存放 ftl
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
// 将 set集合存放到 map第一个槽位
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
// 将 ftl添加到 set集合中去
variablesToRemove.add(variable);
}
这块则是将 ftl存放到 map第一个 slot中去,对应类型便是 set集合
还有一点就是 ftlt本身对 thread进行了增强,体现在 thlr上:
如果有机会去使用 wrap后的 runnable,cleanupFastThreadLocals
标志位会 true
public FastThreadLocalThread() {
cleanupFastThreadLocals = false;
}
public FastThreadLocalThread(Runnable target) {
super(FastThreadLocalRunnable.wrap(target));
cleanupFastThreadLocals = true;
}
执行了 wrap后的 runnable,会对 ftlt进行清理操作
@Override
public void run() {
try {
runnable.run();
} finally {
// 线程对应任务执行完, 此时可以去清除 ftl
FastThreadLocal.removeAll();
}
}
// 对 runnable进行封装
static Runnable wrap(Runnable runnable) {
return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
}
ftl中提供资源回收的几种方式:
-
自动:
如果有去使用基于 FastThreadLocalRunnable包装后的 runnable,在任务执行完后会去进行 ftl的清理
-
手动
ftl、InternalThreadLocalMap都有去提供 remove()方法,用户可在适当时机对 ftl进行清除
-
另外一种自动
这种方式在新版本的 netty已经去除了,就是为当前线程的每一个 ftl注册一个 cleaner,当线程对象不可达时会对 ftl进行回收 (Netty同样推荐去使用上述两种方式,第三种需要另起线程,相对来说比较消耗资源)