如何避免死锁和活锁? - simar

19-04-03 banq
                   

死锁只能在并发(多线程)程序中发生,其中同步(使用锁)线程访问一个或多个共享资源(变量和对象)或指令集(临界区)。

活锁时当我们试图避免死锁时会使用异步锁定时发生的,其中多个线程对同一组锁的竞争写操作,为了避免获取锁定,允许其他线程第一个到达的获得锁,等待最终释放锁定后再继续,这容易造成等待线程不断重试获取锁造成的CPU循环饥饿。异步锁只是一种避免死锁成为活锁的策略。

下面是一些的理论上解决死锁的方法,并且其中之一(第二个)是主要的原因为活锁。

理论方法

1. 不要使用锁

在两个操作需要同步的情况下是不可能的,例如,简单的银行转帐,您可以借记一个帐户然后可以贷记另一个帐户,并且在当前线程完成之前不允许任何其他线程触及帐户中的余额。

2.不要阻塞锁,如果一个线程无法获取锁,它应该释放以前获取的锁,以便稍后再试

实施起来很麻烦并且可能导致饥饿(活锁),线程总是重试获取锁。此外,这种方法可能在频繁的线程上下文切换中会造成切换开销,从而降低了系统的整体性能。

3. 让线程始终以严格的顺序请求锁定

说起来容易做起来难。如果我们正在写一个函数将账户A的资金转移到B,我们可以写一些类似的东西:

// at compile time, we take lock on first arg then second
public void transfer(Account A, Account B, long money) {
  synchronized (A) {
    synchronized (B) {
      A.add(amount);
      B.subtract(amount);
    }
  }
}
// at runtime we cannot track how our methods will be called
public void run() {
  new Thread(()->this.transfer(X,Y, 10000)).start();
  new Thread(()->this.transfer(Y,X, 10000)).start();
}
// this run() will create a deadlock
// first thread locks on X, waits for Y
// second thread locks on Y, waits for X

现实中的解决方案

我们可以结合锁定顺序和定时锁定的方法来得到真正现实解决方案

1. 通过业务确定锁的顺序

我们可以通过根据帐号大小区分A和B来改进我们的方法。

// at run time, we take lock on account with smaller id first
public void transfer(Account A, Account B, long money) {
  final Account first = A.id < B.id ? A : B;
  final Account second = first == A? B: A;
  synchronized (first) {
    synchronized (second) {
      first.add(amount);
      second.subtract(amount);
    }
  }
}
// at runtime we cannot track how our methods will be called
public void run() {
  new Thread(()->this.transfer(X,Y, 10000)).start();
  new Thread(()->this.transfer(Y,X, 10000)).start();
}

如果X.id = 1111和Y.id = 2222,因为我们采取的第一个帐户为一个较小的账户ID,锁定顺序执行:transfer(Y, X, 10000)和transfer(X,Y, 10000)将是一样的。如果X的帐号小于Y,则两个线程将尝试在Y之前锁定X,并且只有X成功后才继续锁定Y。

2. 业务确定tryLock / async 的时间等待的锁请求

使用上述业务确定性锁顺序的解决方案仅适用于一个地方的逻辑转移(...)的关联关系,例如在我们的方法中确定如何协调资源。

我们最终可能会有其他方法/逻辑,最终使用与之不兼容的排序逻辑transfer(…)。为避免在这种情况下出现死锁,建议使用异步锁定,我们尝试锁定资源的有限/实际时间(最大事务时间)+小随机等待时间,这样所有线程都不会尝试分别获得太早而避免了活锁(由于无法获取锁反复尝试而导致饥饿)

// assume Account#getLock() gives us account's Lock  (java.util.concurrent.locks.Lock)
// Account could encapsulate lock, provide lock() /unlock()
public long getWait() { 
/// returns moving average of transfer times for last n transfers + small-random-salt in millis so all threads waiting to lock do not wake up at the same time.
//////返回最后n次传输的传输时间的移动平均值+ 小随机时间,因此等待锁定的所有线程不会同时唤醒。

}
public void transfer(Lock lockF, Lock lockS, int amount) {
  final Account first = A.id < B.id ? A : B;
  final Account second = first == A? B: A;
  final Lock lockF = first.getLock();
  final Lock lockS = second.getLock();
  boolean done = false;
  do {
    try {
      try {
        if (lockF.tryLock(getWait(), MILLISECONDS)) {
          try {
            if (lockS.tryLock(getWait(), MILLISECONDS)) {
              done = true;
            }
          } finally {
            lockS.unlock();
          }
        }
      } catch (InterruptedException e) {
        throw new RuntimeException("Cancelled");
      }
    } finally {
      lockF.unlock();
    }
  } while (!done);

}
// at runtime we cannot track how our methods will be called
public void run() {
    new Thread(()->this.transfer(X,Y, 10000)).start();
    new Thread(()->this.transfer(Y,X, 10000)).start();
}