java中的锁--ReentrantLock可重入锁

Posted by 张伟真 on 2024-11-12
Estimated Reading Time 6 Minutes
Words 1.4k In Total
Viewed Times

java中的锁–ReentrantLock可重入锁

java中有各种锁,开个锁的专题,讲讲java中的各种锁

一、ReentrantLock可重入锁

ReentrantLock,也是大家常听到的可重入锁. 是java.util.concurrent.locks包下的同步工具类. 该锁的特点跟名称一样,允许一个线程多次获取同一个锁而不产生死锁.这与synchronized关键字提供的锁机制有点像,但是更灵活,拓展性更好.
核心内容

  1. 三个内部类 Sync,FairSync,NonFairSync. FairSync和NonFairSync继承Sync, Sync继承AQS(AbstractQueuedSynchronizer)
  2. 默认构建ReentrantLock,创建的是非公平锁NonFairSync,可根据入参指定公平锁还是非公平锁
  3. AQS 有个state和队列
    reentrantLock

公平锁

new ReentrantLock传入true, 内部构建一个公平锁.lock时先initialTryLock,抢占state判断队列是否有等待线程,没有则CAS进行抢占.否则调用acquire,最终调用公平锁或者非公平锁里的tryAcquire具体方法
子类重写父类方法,这里有个经典的设计模式: 模版设计模式. 父类只做里基本实现,不做业务,具体各个具体实现去做逻辑处理,调用也是调用实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (getExclusiveOwnerThread() == current) {
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
// 公平锁的tryAcquire
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && !hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

具体逻辑tryAcquire抢占锁, 公平锁的原因,会先判断队列是否有人等待,当前没人的话就CAS直接获取锁.如果有人等待返回false,进入队尾
这里如果state不是0,判断是否当前线程,如果是则重入,状态次数加1,返回true
如果state不是0.并且不是当前线程则返回false,获取锁失败,进入队列
入队有个自旋保证能入队成功

非公平锁

非公平锁和公平锁逻辑差不多,只是非公平锁有个抢占的动作,新来的线程一上来就会CAS尝试抢占锁,而不会去先去判断是否队列还有线程等待.
获取失败则走公平锁的逻辑,非公平锁效率上相对公平锁高一点,看场景选择,可能出现线程饥饿问题,队列中的线程一直获取不到锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final boolean initialTryLock() {
Thread current = Thread.currentThread();
if (compareAndSetState(0, 1)) { // first attempt is unguarded
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) {
int c = getState() + 1;
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
return false;
}

/**
* Acquire for non-reentrant cases after initialTryLock prescreen
*/
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

JDK17版本将acquire操作融合来一起调整了优化了性能,放弃了CAS进行队列的插入,跟jdk8相比代码可读性变差了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
// 获取当前线程
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued

for (;;) {
// 如果是头节点返回head,否则返回null
// 如果不是头节点,node.pred是当前节点的前驱节点
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
// 重入过多,小于0,清楚队列,cleanQueeu纯纯算法
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
// 前前节点是空,onSpinWait是Thread的空方法
Thread.onSpinWait(); // ensure serialization
continue;
}
}
if (first || pred == null) {
// 这里才是理想情况应该进来的
boolean acquired;
try {
if (shared)
// 共享锁执行,tryAcquireShared同样是子类实现
acquired = (tryAcquireShared(arg) >= 0);
else
// 非共享锁执行
acquired = tryAcquire(arg);
} catch (Throwable ex) {
// 取消尝试获取锁
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
if (node == null) {
// 这里做了共享锁和排他锁的不同实现,只是一个标识而已
// 他们都是继承了Node,没有添加任何东西
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // try to enqueue
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
// Node提供的一系列原子操作
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
// 这里用到了前置知识,给了一个许可
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
// 取消尝试获取锁
return cancelAcquire(node, interrupted, interruptible);
}

总结

AQS是Java提供的一个线程安全的同步器,用于管理可重入的线程访问共享资源的顺序。支持不同的同步状态:可中断等待、非阻塞等待、无锁等待。具有可扩展性和灵活性,而且能够保证并发访问时的线程安全性。它适用于许多高并发、共享资源的场景,例如计数器、消息队列等。在使用 AQS 队列同步器时,开发者需要在对共享资源的操作上使用相应的同步器来保证线程安全。AQS由state标识,双向链表和CAS组成,调用了LockSupport的park和unpark功能,ReentrantLock是典型实现。


如果您喜欢此博客或发现它对您有用,则欢迎对此发表评论。 也欢迎您共享此博客,以便更多人可以参与。 如果博客中使用的图像侵犯了您的版权,请与作者联系以将其删除。 谢谢 !