MiniReentrantLock
package aqs;
import sun.misc.Unsafe;
import java.lang.reflect.Field;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.LockSupport;
public class MiniReentrantLock implements Lock{
/**
* 锁的是资源 --> state
* 0 未加锁状态
* > 0 加锁状态
*/
private volatile int state;
/**
* 独占模式
* 同一时刻只会有一个线程持有锁,其它未获取到锁的线程会被阻塞
*/
// 当前独占锁的线程
private volatile Thread exclusiveOwnerThread;
/**
* 需要有两个引用来维护 阻塞队列
* head -> 队列头节点
* tail -> 队列尾结点
*/
// 比较特殊,对应的节点其实是当前独占锁的线程
private volatile Node head;
private volatile Node tail;
/**
* 阻塞的线程保存在哪?
* 封装成 Node节点,保存在 FIFO队列中
*/
static class Node {
// 前置节点
Node prev;
// 后置节点
Node next;
// 封装着的线程
Thread thread;
public Node(Thread thread) {
this.thread = thread;
}
public Node() {
}
}
/**
* 获取锁,锁被占用,线程阻塞直至获取到锁位置
* 模拟公平模式 --> 讲究先来后到
* lock的过程是怎样的?
* 情景1:线程进来发现,state == 0,直接去争抢锁
* 情景2:线程进来发现,state > 0,当前线程入队
*/
@Override
public void lock() {
// 第一次获取锁,state == 1
// 第n次重入锁时,state == n
acquire(1);
}
/**
* 竞争资源
* 抢占锁成功,直接返回
* 抢占锁失败,则阻塞当前线程
* ① 将当前线程封装成 Node,加入到阻塞队列中去
* ② 将当前线程 park,使线程处于 挂起状态
*
* 唤醒后呢?
* ① 检查当前 node节点,是否为 head.next节点
* (head.next 节点是具有争抢锁权限的节点,其它节点没有)
* ② 抢占
* success:将当前线程设置为 head,将老的head进行出队操作,返回到业务层面
* fail:继续 park,等待被唤醒
*
* 其实抽象出来也就两个方法:
* 添加到阻塞队列中的方法:addWaiter()
* 在阻塞队列中争抢锁的方法:acquireQueued()
*
* @param arg
*/
private void acquire(int arg) {
if (!this.tryAcquire(arg)) {
Node node = addWaiter();
acquireQueued(node, arg);
}
}
/**
* 当前线程入队
* 返回当前线程对应的 Node节点
* @return
*/
private Node addWaiter() {
Node newNode = new Node(Thread.currentThread());
/**
* 如何入队
* 1.找到 newNode的前置节点 pred
* 2.更新 newNode.prev = pred
* 3. CAS更新 tail为 newNode
* 4. 更新 pred.next = newNode
*
* addWaiter()执行完毕后,保证当前线程已经入队
*/
// 前置条件:队列已经有等待者 node了,当前node不是第一个入队的node
Node pred = tail;
if (pred != null) {
newNode.prev = pred;
// true -> 当前线程成功入队
if (compareAndSetTail(pred, newNode)) {
pred.next = newNode;
return newNode;
}
}
/**
* 执行到这的情况:
* ① tail == null,队列中并没有等待者线程
* ② cas失败,有竞争
*
* 此时会通过自旋的方式来进行入队 --> 抽象成 enq()
*/
enq(newNode);
return newNode;
}
/**
* 队列中的线程竞争锁的逻辑
* @param node
*/
private void acquireQueued(Node node, int arg) {
// 成功获取锁后才会跳出自旋
for (;;) {
// 只有当 node是 head.next节点时,才有资格去争抢锁
Node pred = node.prev;
if (pred == head && tryAcquire(arg)) {
// 当前线程竞争锁成功了
// 需要做些什么?
// ① 设置当前节点为 head
// ② 协助老 head出队
setHead(node);
// 此时,老 head还引用着新 head
pred.next = null; // help gc
return;
}
// 做些日志操作
System.out.println("线程:" + Thread.currentThread().getName() + "被挂起了");
// 挂起当前线程
LockSupport.park();
/**
* 什么时候唤醒被 park的线程 ?
* 没错,就是在 unLock()中
*/
System.out.println("线程:" + Thread.currentThread().getName() + "被唤醒了");
}
}
/**
* 自旋入队
*
* ① tail == null,队列是空队列的情况
* ② cas失败,有竞争
* @param node
* @return
*/
private void enq(Node node) {
for (;;) {
// 第一种情况:tail == null,队列是空队列的情况
// 当前线程是第一个争抢锁失败的线程..
// 当前持有锁的线程,并没有设置过任何 node,所以作为持有锁线程的直接后继节点,需要做些擦屁股操作
// 给当前持有锁的线程补充一个 node,以此来作为 head节点,head节点在任何时候,都表示着持有着锁的线程
if (tail == null) {
// 只会有一个线程设置成功,期望值为 null
// true -> 当前线程成功给 持有锁的线程补充了一个 node,并设置成了 head
if (compareAndSetHead(new Node())) {
tail = head;
// 注意,这里并没有直接返回,还会继续下一次自旋,不然当前线程咋入队?
}
} else { // 这里才是真正的 入队操作
Node pred = tail;
if (pred != null) { // 这里的判断还是有需要的,高并发下
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
// 入队成功,退出自旋
return;
}
}
}
}
}
public int getState() {
return state;
}
public Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
public Node getHead() {
return head;
}
public Node getTail() {
return tail;
}
/**
* 尝试获取锁,不会阻塞线程
* @param arg
* @return
*/
public boolean tryAcquire(int arg) {
if (state == 0) {
/**
* 模拟的是公平锁,需检查阻塞队列中是否还有其它线程(当前线程是否还有前置节点)等着去争抢锁
*/
// 条件1:!hasQueuedProcessor() true -> 当前线程前面没有等待者线程
// 条件2:compareAndSetState(0, arg) true -> 争抢锁成功 cas,因为 lock多线程可并发进入
// 当前线程争抢锁成功
if (!hasQueuedProcessor() && compareAndSetState(0, arg)) {
// 设置当前线程为占用锁线程
this.exclusiveOwnerThread = Thread.currentThread();
}
/**
* 来到这的两种情况
* ① 锁已被占用,当前线程不是独占锁的线程
* ② 锁已被占用,当前线程是独占锁的线程
*/
} else if (Thread.currentThread() == this.exclusiveOwnerThread) { // 获取锁的线程重复去获取锁,锁可重入
// 更新 state值,重入锁
// 这里不存在并发
int c = this.getState();
c = c + arg;
// 这里实际上是需要做些越界判断的
this.state = c;
return true;
}
/**
* 哪些情况返回 false
* state == 0:
* 当前线程前面有等待着线程
* cas fail
* state > 0:
* 当前线程并非持有锁的线程
*/
return false;
}
/**
* true -> 当前线程前面有等待者线程
* false -> 当前线程前面没有等待着线程
*
* 调用链:lock -> acquire -> tryAcquire -> hasQueuedProcessor (state == 0时才会调用)
*
* 返回 false:
* ① 当前队列为空
* ② 当前线程是 head.next节点线程,head.next什么时候都有权限去尝试获取一下锁.
* @return
*/
private boolean hasQueuedProcessor() {
Node h = head;
Node t = tail;
Node s;
// 条件1: h != t true -> 队列中已经有了等待着线程
// false: ① h == t == null 还未初始化 ② h == t = head 第一个获取锁失败的线程,会为当前持有锁的线程,补充创建一个 head节点
// 条件2:前置条件:队列不空
// s = h.next) == null || s.thread != Thread.currentThread()
// 这里其实想要做的是排除几种情况
// 2.1 s = h.next) == null
// 极端情况下,第一个获取锁失败的线程,会为持有锁线程创建 head,然后再自旋入队 1.cas tail() 成功了 2.pred[head].next = node
// 想表达的是:已经有 head.next节点了,别的线程再来,需要返回false,表示队列已经有等待者线程了
// 2.2 s.thread != Thread.currentThread() 大部分情况下,会是这个
// true -> 已经有 head.next节点了,且当前线程并非对应线程,此时也需要返回 true
// false -> 说明当前线程正是 head.next.thread,需要返回 false,回头线程再去竞争锁
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
/**
* 释放锁
*/
@Override
public void unlock() {
release(1);
}
private void release(int arg) {
// true -> 此时释放锁成功,需要去唤醒阻塞队列中的直接后继线程(公平锁的实现)
if (tryRelease(arg)) {
Node head = this.head;
// 判断有没有等待者
if (head.next != null) {
/**
* 公平锁实现
*/
unparkSuccessor(head);
}
}
}
private void unparkSuccessor(Node node) {
Node s = node.next;
if (s != null && s.thread != null) {
LockSupport.unpark(s.thread);
}
}
/**
* 完全释放锁成功,返回 true -> state == 0
* @param arg
* @return
*/
private boolean tryRelease(int arg) {
int c = getState() - arg;
// 检查操作
if (Thread.currentThread() != getExclusiveOwnerThread()) {
throw new RuntimeException("Illegal operation! Must getLock first!");
}
// 来到这,不存在并发
if (c == 0) {
// 说明当前线程持有的锁已经 完全释放
/**
* 此时需要做:
* ① 设置 exclusiveOwnerThread = null
* ② 设置 state = 0
*/
this.exclusiveOwnerThread = null;
this.state = 0;
return true;
}
this.state = c;
return false;
}
private void setHead(Node node) {
this.head = node;
// 当前 node已经是获取锁成功的线程了
node.thread = null;
node.prev = null;
}
private static final Unsafe unsafe;
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
static {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
unsafe = (Unsafe) f.get(null);
stateOffset = unsafe.objectFieldOffset
(MiniReentrantLock.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(MiniReentrantLock.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(MiniReentrantLock.class.getDeclaredField("tail"));
} catch (Exception ex) { throw new Error(ex); }
}
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* CAS state
*/
private final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
Lock
package aqs;
public interface Lock {
public void lock();
public void unlock();
}
```s