我在锁的使用与死锁的避免一文中,介绍了需要持有多把锁时,如何使用和释放锁,当时出于文章篇幅和文章结构限制,重点说明了一种方法。本文再次结合转账业务,把持有多把锁的业务场景下,锁使用的几种方法,做一个汇总说明。
1. 转账业务分析
“转账业务”本身容易理解,不用文字赘述,直接看类的代码即可:
public class Account {
private final long id;
private int balance;
public Account(int balance) {
this.balance = balance;
}
public int getBalance() {
return this.balance;
}
public void transfer(Account target, int amount){
if (this.balance > amount) {
this.balance -= amount;
target.balance += amount;
}
}
如上面代码,若两个Account
对象(account1, account2)进行转账,调用account1.transfer(account2, 100)方法可以实现。但在并发场景下,若同时存在多笔转账交易,由于数据竞争,需要上锁来保证线程安全。
如果是新手,首先想到的上锁方式可能是下面这样的:
public void transfer(Account target, int amount) {
synchronized (this) {
if (this.balance > amount) {
this.balance -= amount;
target.balance += amount;
}
}
}
但这样做是错误的,this
这把锁可以保护自己的余额 this.balance
,却保护不了别人的余额 target.balance
,如果同时发生A向B转账,B向C转账,对B余额的读写,还是存在数据竞争的。
用锁 this
保护 this.balance
和 target.balance
的如上面示意图,对于A、B、C多人并发转账时B的数据竞争,再详细说明一下:
假设有 A、B、C 三个账户,余额都是 200 元,我们用两个线程分别执行两个转账操作:账户 A 转给账户 B 100 元,账户 B 转给账户 C 100 元,最后我们期望的结果应该是账户 A 的余额是 100 元,账户 B 的余额是 200 元, 账户 C 的余额是 300 元。
假设线程 1 执行账户 A 转账户 B 的操作,线程 2 执行账户 B 转账户 C 的操作,这两个线程分别在两颗 CPU 上同时执行,
this
锁并不能做到两个操作的互斥。因为线程 1 锁定的是账户 A 的实例
(A.this)
,而线程 2 锁定的是账户 B 的实例(B.this
),所以这两个线程可以同时进入临界区transfer()
。这时,线程 1 和线程 2 都会读到账户 B 的余额为 200,导致最终账户 B 的余额可能是 300,也可能是100,就是不可能是 200:
- 线程 1 后于线程 2 写 B.balance,线程 2 写的 B.balance 值被线程 1 覆盖,则300
- 线程 1 先于线程 2 写 B.balance,线程 1 写的 B.balance 值被线程 2 覆盖,则100
上面这段文字,对于并发编程的新手同学,可能需要多看几遍,理解后就会发现,上面场景中,B之所以存在数据竞争,是因为this
锁是锁在对象上,A向B转账时,在B上加了A锁,B向C转账时,在B上加了B锁,所以两个线程不存在抢同一把锁的情况,不构成互斥关系,均能进入临界区;要解决这个问题,就需要A、B共享同一个锁,如:
class Account {
private Object lock;
private int balance;
private Account();
public Account(Object lock) {
this.lock = lock;
}
void transfer(Account target, int amount){
synchronized(lock) {
if (this.balance > amount) {
this.balance -= amount;
target.balance += amount;
}
}
}
}
这样确实能解决问题,但是有瑕疵:它要求在创建 A, B两个对象时传入同一个锁对象,否则还是会存在上文分析到的数据竞争。在真实的项目场景中,创建 Account 对象的代码很可能分散在多个工程中,传入共享的 lock 真的很难,故这个方式尽显帮我们理解锁的使用,并不实用。
因此在实践中,以上述分析为依托,需要进一步设计可行的解决方案。
2. 实战可行的各种死锁避免方法
2.1 锁在类上
虽然上一节末尾提到的让A、B共享同一把锁的方案不实用,但在实践中确实存在一把锁,可以保证被A、B两个对象共享,即用类作为锁:
class Account {
private int balance;
// 转账
void transfer(Account target, int amount){
synchronized(Account.class) {
if (this.balance > amount) {
this.balance -= amount;
target.balance += amount;
}
}
}
}
Account.class
是所有 Account 对象共享的,而且它是 Java 虚拟机在加载 Account 类的时候创建的,所以我们不用担心它的唯一性。这个实践可行,不过问题在于锁的颗粒度太大了,即便两个并发转账并不相关,如线程1执行A向B转账,线程2执行C向D转账,也会互斥,变为串行,严重影响系统性能。
2.2 锁在对象上
为避免在“锁在类上”带来的巨大性能损耗,还是要采用在对象上加锁的方案,在转账业务中,既然一个对象锁锁不住A、B两个对象,那就每个对象用各自的锁来锁定,即:
public void transfer(Account target, int amount) {
synchronized (this.getId()) {
synchronized (target.getId()) {
// 为了让代码更清晰,转账业务逻辑代码独立到另一个方法中了
transferAccount(target, amount);
}
}
}
protected void transferAccount(Account target, int amount) {
if (this.balance > amount) {
this.balance -= amount;
target.balance += amount;
}
}
这样,A向B转账时,B上加的是B锁,B向C转账时,B上加的也是B锁,从而避免了B上的数据竞争。 不过这段代码时有bug的,存在死锁隐患。即,若两个线程同时发起A向B的转账(线程1),和B向A的转账(线程2),线程1获取A锁时,线程2获取了B锁,此时,线程1由于获取不到B锁,会进入等待,而线程2由于获取不到A锁,也会进入等待,于是两个线程都会无限等待下去,也就是死锁。
这段代码,在A、B并发互相转账时,我们可以用jvm工具查看死锁状态:
命令行可以用jstack查看,结果如下:
jstack -l 6003
Java stack information for the threads listed above:
===================================================
"pool-1-thread-11":
at org.gevinzone.threadsafe.DeadLockAccount.transfer(DeadLockAccount.java:14)
- waiting to lock <0x00000007156c1078> (a java.lang.Long)
- locked <0x00000007156c1060> (a java.lang.Long)
at org.gevinzone.Main.lambda$concurrentAccountTransfer$0(Main.java:46)
at org.gevinzone.Main$$Lambda$1/81628611.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
"pool-1-thread-8":
at org.gevinzone.threadsafe.DeadLockAccount.transfer(DeadLockAccount.java:14)
- waiting to lock <0x00000007156c1060> (a java.lang.Long)
- locked <0x00000007156c1078> (a java.lang.Long)
at org.gevinzone.Main.lambda$concurrentAccountTransfer$1(Main.java:51)
at org.gevinzone.Main$$Lambda$2/931919113.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Found 1 deadlock.
也可以用jconsole在UI中查看:
所以,锁在对象上时,重点是避免死锁问题,下面将介绍比较常用的几种持有多个锁时,死锁的避免方法。
2.2.1 注意加锁顺序
如上面死锁的分析,都是先对自己上锁,然后对target上锁,即account1 先对account1 上锁,然后在对account2 上锁,而account2 是先对account2上锁,再对account1上锁,这造成了循环等待,从而死锁;若两个对象上锁顺序一致,如都先对account1 上锁,再对account2 上锁,就对循环解套了,可以避免死锁。
public void transfer(Account target, int amount) {
Account left = this.getId() < target.getId() ? this : target;
Account right = this.getId() < target.getId() ? target : this;
synchronized (left.getId()) {
synchronized (right.getId()) {
transferAccount(target, amount);
}
}
}
2.2.2 加锁超时释放
如上面死锁的分析,两个线程都是在获取第二个对象锁时,由于获取不到,进入等待,而自己持有的锁在等待也无法释放,从而永久的等待下去,产生了死锁;所以,可以对等待加一个超时时间,超时后不在继续等待,而是释放自己持有的锁,就可以避免死锁了。
public void transferWithTryLock(TryLockAccount target, int amount) {
while (true) {
if (lock.tryLock()) {
try {
if (target.lock.tryLock()) {
try {
transferAccount(target, amount);
break;
} finally {
target.lock.unlock();
}
}
} finally {
lock.unlock();
}
}
// 防止活锁
randomSleepForLiveLock();
}
}
2.2.3 加锁中断释放
死锁时,为避免线程处于等待状态无法响应,可以使用lock.lockInterruptibly()
,这样,线程处于等待状态时,依然可以响应中断,抛出异常,这样我们可以通过从外部中断线程,打破死锁。
如,把死锁的代码改成使用lock.lockInterruptibly()
,当死锁后,调用thread.interrupt()
,可以打破死锁。
public void transfer(LockInterruptableAccount target, int amount) {
try {
lock.lockInterruptibly();
try {
target.lock.lockInterruptibly();
transferAccount(target, amount);
} finally {
target.lock.unlock();
}
} catch (InterruptedException ignored) {
} finally {
lock.unlock();
}
}
用下面代码起2个线程测试死锁和死锁的打破,若注释掉thread1.interrupt();
,下面代码会死锁,但如果中断thread1
或thread2
,死锁立刻被打破
private void lockInterruptableTest() throws InterruptedException {
LockInterruptableAccount a = new LockInterruptableAccount(1, 1000);
LockInterruptableAccount b = new LockInterruptableAccount(2, 1000);
System.out.println("before...");
System.out.printf("A: %d, B: %d\n", a.getBalance(), b.getBalance());
BiConsumer<LockInterruptableAccount, LockInterruptableAccount> consumer =
(o1, o2) -> o1.transfer(o2, 1);
Thread thread1 = new Thread(() -> consumer.accept(a, b));
Thread thread2 = new Thread(() -> consumer.accept(b, a));
thread1.start();
thread2.start();
thread1.interrupt();
thread1.join();
thread2.join();
System.out.println("after...");
System.out.printf("A: %d, B: %d\n", a.getBalance(), b.getBalance());
}
2.2.4 多把锁转换为一把锁进行加锁
这里并非用类锁这种大颗粒度的锁。而是我们在上面死锁分析中,发现对于A、B两个对象的上锁,不是原子级的,正是由于线程先锁定一个对象,再去锁定另一个对象,带来了死锁隐患。如果可以同时锁定两个对象,同时释放两个对象,也能避免死锁。这里,我们可以用一个锁来实现原子级锁定2个对象的操作,在转账线程中,如果能同时锁定2个对象,则进行转账,否则等待;当两个对象能被同时锁定时,再唤醒线程。即:
先设计一个Allocator类用于锁定和释放对象:
public class Allocator {
private final HashSet<Object> container;
public void apply(Object from, Object to) throws InterruptedException {
synchronized (this) {
while (container.contains(from) || container.contains(to)) {
wait();
}
container.add(from);
container.add(to);
}
}
public void free(Object from, Object to) {
synchronized (this) {
container.remove(from);
container.remove(to);
notifyAll();
}
}
在转账线程中使用:
public void transfer(Account target, int amount) {
try {
allocator.apply(this, target);
transferAccount(target, amount);
allocator.free(this, target);
} catch (InterruptedException exception) {
exception.printStackTrace();
}
}
3. 其他上锁的方式
除了synchronized
和并发包中的lock外,信号量也可以用做锁,避免死锁的方式,与上文相同。
(1)注意上锁顺序
public void transferWithSemaphore(Account target, int amount) {
SemaphoreAccount left = this.getId() < target.getId() ? this : (SemaphoreAccount)target;
SemaphoreAccount right = this.getId() < target.getId() ? (SemaphoreAccount)target : this;
try {
left.semaphore.acquire();
try {
right.semaphore.acquire();
transferAccount(target, amount);
} finally {
right.semaphore.release();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
left.semaphore.release();
}
}
(2)加锁超时释放
public void transferWithSemaphore(TrySemaphoreAccount target, int amount) {
while (true) {
if (semaphore.tryAcquire()) {
try {
if (target.semaphore.tryAcquire()) {
try {
transferAccount(target, amount);
break;
} finally {
target.semaphore.release();
}
}
} finally {
semaphore.release();
}
}
// 防止活锁
randomSleepForLiveLock();
}
}
What's More
本文代码,可以在GitHub上查看。
本文同步发表于我的微信公众号,欢迎关注。
注:转载本文,请与Gevin联系
欢迎关注我的微信公众账号
