实战:并发转账业务中避免死锁的各种方法

Posted by Gevin on Category: 开发 Tags: 并发

enter image description here

我在锁的使用与死锁的避免一文中,介绍了需要持有多把锁时,如何使用和释放锁,当时出于文章篇幅和文章结构限制,重点说明了一种方法。本文再次结合转账业务,把持有多把锁的业务场景下,锁使用的几种方法,做一个汇总说明。

1. 转账业务分析

“转账业务”本身容易理解,不用文字赘述,直接看类的代码即可:


public class Account {
    private final long id;
    private int balance;


    public Account(int balance) {
        this.balance = balance;
    }

    public int getBalance() {
        return this.balance;
    }

    public void transfer(Account target, int amount){
        if (this.balance > amount) {
            this.balance -= amount;

            target.balance += amount;
        }
    }

如上面代码,若两个Account对象(account1, account2)进行转账,调用account1.transfer(account2, 100)方法可以实现。但在并发场景下,若同时存在多笔转账交易,由于数据竞争,需要上锁来保证线程安全。

如果是新手,首先想到的上锁方式可能是下面这样的:

public void transfer(Account target, int amount) {
        synchronized (this) {
            if (this.balance > amount) {
                this.balance -= amount;
                target.balance += amount;
            }
        }
    }

但这样做是错误的,this 这把锁可以保护自己的余额 this.balance,却保护不了别人的余额 target.balance,如果同时发生A向B转账,B向C转账,对B余额的读写,还是存在数据竞争的。

138ec3ba149e21009828fef3342b3216.png

用锁 this 保护 this.balancetarget.balance 的如上面示意图,对于A、B、C多人并发转账时B的数据竞争,再详细说明一下:

假设有 A、B、C 三个账户,余额都是 200 元,我们用两个线程分别执行两个转账操作:账户 A 转给账户 B 100 元,账户 B 转给账户 C 100 元,最后我们期望的结果应该是账户 A 的余额是 100 元,账户 B 的余额是 200 元, 账户 C 的余额是 300 元。

假设线程 1 执行账户 A 转账户 B 的操作,线程 2 执行账户 B 转账户 C 的操作,这两个线程分别在两颗 CPU 上同时执行,this锁并不能做到两个操作的互斥。

因为线程 1 锁定的是账户 A 的实例(A.this),而线程 2 锁定的是账户 B 的实例(B.this),所以这两个线程可以同时进入临界区 transfer()。这时,线程 1 和线程 2 都会读到账户 B 的余额为 200,导致最终账户 B 的余额可能是 300,也可能是100,就是不可能是 200

  1. 线程 1 后于线程 2 写 B.balance,线程 2 写的 B.balance 值被线程 1 覆盖,则300
  2. 线程 1 先于线程 2 写 B.balance,线程 1 写的 B.balance 值被线程 2 覆盖,则100

上面这段文字,对于并发编程的新手同学,可能需要多看几遍,理解后就会发现,上面场景中,B之所以存在数据竞争,是因为this锁是锁在对象上,A向B转账时,在B上加了A锁,B向C转账时,在B上加了B锁,所以两个线程不存在抢同一把锁的情况,不构成互斥关系,均能进入临界区;要解决这个问题,就需要A、B共享同一个锁,如:

class Account {
  private Object lock;
  private int balance;
  private Account();

  public Account(Object lock) {
    this.lock = lock;
  } 

  void transfer(Account target, int amount){
    synchronized(lock) {
        if (this.balance > amount) {
            this.balance -= amount;
            target.balance += amount;
        }
      }
  }
}

这样确实能解决问题,但是有瑕疵:它要求在创建 A, B两个对象时传入同一个锁对象,否则还是会存在上文分析到的数据竞争。在真实的项目场景中,创建 Account 对象的代码很可能分散在多个工程中,传入共享的 lock 真的很难,故这个方式尽显帮我们理解锁的使用,并不实用。

因此在实践中,以上述分析为依托,需要进一步设计可行的解决方案。

2. 实战可行的各种死锁避免方法

2.1 锁在类上

虽然上一节末尾提到的让A、B共享同一把锁的方案不实用,但在实践中确实存在一把锁,可以保证被A、B两个对象共享,即用类作为锁:


class Account {
  private int balance;
  // 转账
  void transfer(Account target, int amount){
    synchronized(Account.class) {
        if (this.balance > amount) {
            this.balance -= amount;
            target.balance += amount;
        }
    }
  } 
}

Account.class 是所有 Account 对象共享的,而且它是 Java 虚拟机在加载 Account 类的时候创建的,所以我们不用担心它的唯一性。这个实践可行,不过问题在于锁的颗粒度太大了,即便两个并发转账并不相关,如线程1执行A向B转账,线程2执行C向D转账,也会互斥,变为串行,严重影响系统性能。

2.2 锁在对象上

为避免在“锁在类上”带来的巨大性能损耗,还是要采用在对象上加锁的方案,在转账业务中,既然一个对象锁锁不住A、B两个对象,那就每个对象用各自的锁来锁定,即:

public void transfer(Account target, int amount) {
        synchronized (this.getId()) {
            synchronized (target.getId()) {
                // 为了让代码更清晰,转账业务逻辑代码独立到另一个方法中了
                transferAccount(target, amount);
            }
        }
    }

protected void transferAccount(Account target, int amount) {
        if (this.balance > amount) {
            this.balance -= amount;
            target.balance += amount;
        }
    }

这样,A向B转账时,B上加的是B锁,B向C转账时,B上加的也是B锁,从而避免了B上的数据竞争。 不过这段代码时有bug的,存在死锁隐患。即,若两个线程同时发起A向B的转账(线程1),和B向A的转账(线程2),线程1获取A锁时,线程2获取了B锁,此时,线程1由于获取不到B锁,会进入等待,而线程2由于获取不到A锁,也会进入等待,于是两个线程都会无限等待下去,也就是死锁。

这段代码,在A、B并发互相转账时,我们可以用jvm工具查看死锁状态:

命令行可以用jstack查看,结果如下:


jstack -l 6003

Java stack information for the threads listed above:
===================================================

"pool-1-thread-11":
        at org.gevinzone.threadsafe.DeadLockAccount.transfer(DeadLockAccount.java:14)
        - waiting to lock <0x00000007156c1078> (a java.lang.Long)
        - locked <0x00000007156c1060> (a java.lang.Long)
        at org.gevinzone.Main.lambda$concurrentAccountTransfer$0(Main.java:46)
        at org.gevinzone.Main$$Lambda$1/81628611.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
"pool-1-thread-8":
        at org.gevinzone.threadsafe.DeadLockAccount.transfer(DeadLockAccount.java:14)
        - waiting to lock <0x00000007156c1060> (a java.lang.Long)
        - locked <0x00000007156c1078> (a java.lang.Long)
        at org.gevinzone.Main.lambda$concurrentAccountTransfer$1(Main.java:51)
        at org.gevinzone.Main$$Lambda$2/931919113.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Found 1 deadlock.

也可以用jconsole在UI中查看:

6f591d1b1b591a3eb7eab42971e045ee.png

所以,锁在对象上时,重点是避免死锁问题,下面将介绍比较常用的几种持有多个锁时,死锁的避免方法。

2.2.1 注意加锁顺序

如上面死锁的分析,都是先对自己上锁,然后对target上锁,即account1 先对account1 上锁,然后在对account2 上锁,而account2 是先对account2上锁,再对account1上锁,这造成了循环等待,从而死锁;若两个对象上锁顺序一致,如都先对account1 上锁,再对account2 上锁,就对循环解套了,可以避免死锁。

public void transfer(Account target, int amount) {
        Account left = this.getId() < target.getId() ? this : target;
        Account right = this.getId() < target.getId() ? target : this;
        synchronized (left.getId()) {
            synchronized (right.getId()) {
                transferAccount(target, amount);
            }
        }
    }

2.2.2 加锁超时释放

如上面死锁的分析,两个线程都是在获取第二个对象锁时,由于获取不到,进入等待,而自己持有的锁在等待也无法释放,从而永久的等待下去,产生了死锁;所以,可以对等待加一个超时时间,超时后不在继续等待,而是释放自己持有的锁,就可以避免死锁了。

public void transferWithTryLock(TryLockAccount target, int amount) {
        while (true) {
            if (lock.tryLock()) {
                try {
                    if (target.lock.tryLock()) {
                        try {
                            transferAccount(target, amount);
                            break;
                        } finally {
                            target.lock.unlock();
                        }
                    }
                } finally {
                    lock.unlock();
                }
            }
            // 防止活锁
            randomSleepForLiveLock();
        }
    }

2.2.3 加锁中断释放

死锁时,为避免线程处于等待状态无法响应,可以使用lock.lockInterruptibly(),这样,线程处于等待状态时,依然可以响应中断,抛出异常,这样我们可以通过从外部中断线程,打破死锁。

如,把死锁的代码改成使用lock.lockInterruptibly(),当死锁后,调用thread.interrupt(),可以打破死锁。

public void transfer(LockInterruptableAccount target, int amount)  {
    try {
            lock.lockInterruptibly();
            try {
                target.lock.lockInterruptibly();
                transferAccount(target, amount);
            } finally {
                target.lock.unlock();
            }
        } catch (InterruptedException ignored) {
        } finally {
            lock.unlock();
        }
}

用下面代码起2个线程测试死锁和死锁的打破,若注释掉thread1.interrupt();,下面代码会死锁,但如果中断thread1thread2,死锁立刻被打破

private void lockInterruptableTest() throws InterruptedException {
        LockInterruptableAccount a = new LockInterruptableAccount(1, 1000);
        LockInterruptableAccount b = new LockInterruptableAccount(2, 1000);
        System.out.println("before...");
        System.out.printf("A: %d, B: %d\n", a.getBalance(), b.getBalance());

        BiConsumer<LockInterruptableAccount, LockInterruptableAccount> consumer =
                (o1, o2) -> o1.transfer(o2, 1);

        Thread thread1 = new Thread(() -> consumer.accept(a, b));
        Thread thread2 = new Thread(() -> consumer.accept(b, a));
        thread1.start();
        thread2.start();
        thread1.interrupt();
        thread1.join();
        thread2.join();

        System.out.println("after...");
        System.out.printf("A: %d, B: %d\n", a.getBalance(), b.getBalance());
    }

2.2.4 多把锁转换为一把锁进行加锁

这里并非用类锁这种大颗粒度的锁。而是我们在上面死锁分析中,发现对于A、B两个对象的上锁,不是原子级的,正是由于线程先锁定一个对象,再去锁定另一个对象,带来了死锁隐患。如果可以同时锁定两个对象,同时释放两个对象,也能避免死锁。这里,我们可以用一个锁来实现原子级锁定2个对象的操作,在转账线程中,如果能同时锁定2个对象,则进行转账,否则等待;当两个对象能被同时锁定时,再唤醒线程。即:

先设计一个Allocator类用于锁定和释放对象:

public class Allocator {
    private final HashSet<Object> container;

    public void apply(Object from, Object to) throws InterruptedException {
        synchronized (this) {
            while (container.contains(from) || container.contains(to)) {
                wait();
            }
            container.add(from);
            container.add(to);
        }
    }

    public void free(Object from, Object to) {
        synchronized (this) {
            container.remove(from);
            container.remove(to);
            notifyAll();
        }
    }

在转账线程中使用:

public void transfer(Account target, int amount) {
        try {
            allocator.apply(this, target);
            transferAccount(target, amount);
            allocator.free(this, target);
        } catch (InterruptedException exception) {
            exception.printStackTrace();
        }
    }

3. 其他上锁的方式

除了synchronized和并发包中的lock外,信号量也可以用做锁,避免死锁的方式,与上文相同。

(1)注意上锁顺序

public void transferWithSemaphore(Account target, int amount) {
        SemaphoreAccount left = this.getId() < target.getId() ? this : (SemaphoreAccount)target;
        SemaphoreAccount right = this.getId() < target.getId() ? (SemaphoreAccount)target : this;
        try {
            left.semaphore.acquire();
            try {
                right.semaphore.acquire();
                transferAccount(target, amount);
            } finally {
                right.semaphore.release();
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            left.semaphore.release();
        }
    }

(2)加锁超时释放

public void transferWithSemaphore(TrySemaphoreAccount target, int amount) {
        while (true) {
            if (semaphore.tryAcquire()) {
                try {
                    if (target.semaphore.tryAcquire()) {
                        try {
                            transferAccount(target, amount);
                            break;
                        } finally {
                            target.semaphore.release();
                        }
                    }
                } finally {
                    semaphore.release();
                }
            }
            // 防止活锁
            randomSleepForLiveLock();
        }
    }

What's More

本文代码,可以在GitHub上查看。

本文同步发表于我的微信公众号,欢迎关注。


注:转载本文,请与Gevin联系




如果您觉得Gevin的文章有价值,就请Gevin喝杯茶吧!

|

欢迎关注我的微信公众账号



Comments