随笔分类
ThreadLocalRandomProbe
有时常会去思考一个问题,传统的 hash值对 key进行哈希而得,然后会对此哈希值进行路由得到目标地址;但有时,我们可能会从另一个维度去思考问题,如果目标数据本身不变 (并非数据本身不发生改变,而是地址),当多个线程想要对该数据进行操作,即多线程针对同一维度的数据进行争抢时,此时便有可能出现占用上的冲突,此时传统的 hash便不管用了,我们需要的便是对线程本身进行哈希,这一实现其实在 LongAdder、ForkJoinPool中也都有体现:Thread#ThreadLocalRandomProbe
/**
* Probe hash value; nonzero if threadLocalRandomSeed initialized
*/
// 线程的探针哈希值
// 直白点, 就是线程的 hash值, 不同的是该哈希值是会发生改变的
// threadLocalRandomProbe存在的意义是什么?
// 当多线程去争抢同一维度数据时, 为避免线程占用上的冲突, 针对线程进行哈希以及使用对应的路由寻址算法来实现冲突的减小
// 当冲突还是存在时, 可以更改线程的 hash值去重新寻址元素, 即 rehash
// 这块在 LongAdder、ForkJoinPool中都有体现
@jdk.internal.vm.annotation.Contended("tlr")
int threadLocalRandomProbe;
此值初始化在:ThreadLocalRandom#localInit()
/**
* Initialize Thread fields for the current thread. Called only
* when Thread.threadLocalRandomProbe is zero, indicating that a
* thread local seed value needs to be generated. Note that even
* though the initialization is purely thread-local, we need to
* rely on (static) atomic generators to initialize the values.
*/
static final void localInit() {
int p = probeGenerator.addAndGet(PROBE_INCREMENT);
int probe = (p == 0) ? 1 : p; // skip 0
long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
Thread t = Thread.currentThread();
U.putLong(t, SEED, seed);
// 可以看到, 由 ThreadLocalRandom类中的 localInit来为 Thread中的字段 threadLocalRandomProbe字段来去初始化
U.putInt(t, PROBE, probe);
}
此值是会发生变化的,而各自变化的实现又有所不同,可参考一下的一种线程探针哈希值更变实现:
/**
* Pseudo-randomly advances and records the given probe value for the
* given thread.
*/
// 更改当前线程的探测哈希值
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
U.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
之前也有去分析过 LongAdder设计,其里面便会涉及到此技巧,LongAdder其实也是这么玩的:
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
done: for (;;) {
Cell[] cs; Cell c; int n; long v;
if ((cs = cells) != null && (n = cs.length) > 0) {
if ((c = cs[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
break done;
}
} finally {
cellsBusy = 0;
}
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (c.cas(v = c.value,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break;
else if (n >= NCPU || cells != cs)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == cs) // Expand table unless stale
cells = Arrays.copyOf(cs, n << 1);
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 可知, 当前线程会在此出更改下其探针哈希值
h = advanceProbe(h);
}
else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {
try { // Initialize table
if (cells == cs) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
break done;
}
} finally {
cellsBusy = 0;
}
}
// Fall back on using base
else if (casBase(v = base,
(fn == null) ? v + x : fn.applyAsLong(v, x)))
break done;
}
}