每个锁创建多个条件队列以避免虚假唤醒


多个条件队列以实现更好的并发性。每个锁使用单独的条件队列的优点。

  • 它避免了虚假的唤醒和上下文切换。例如,如果您使用notifyAll进行传统等待,则最终会唤醒正在等待不同条件的线程。
  • 当您在单独的条件队列上等待时,您可以使用signal 而不是signalAll来进一步提高性能。

以下是在无界队列之上的有界BlockingQueue的两个经典实现。

每个锁具有单独的等待集

 public class BlockingQueue<T> {

    private final Queue<T> queue;
    private final Lock lockObj = new ReentrantLock();
    private final Condition empty = lockObj.newCondition();
    private final Condition full = lockObj.newCondition();
    private int maxLength;
    private int currentSize = 0;

    public BlockingQueue(int maxLength) {
      this.queue = new ArrayDeque<T>();
      this.maxLength = maxLength;
    }

    public void offer(T elem) throws InterruptedException {
      lockObj.lock();
      try {
        while (currentSize == maxLength) {
          full.await();
        }
        queue.offer(elem);
        currentSize++;
        empty.signal();
      } finally {
        lockObj.unlock();
      }
    }

    public T poll() throws InterruptedException {
      lockObj.lock();
      try {
        while (currentSize == 0) {
          empty.await();
        }
        T elem = queue.poll();
        currentSize--;
        full.signal();
        return elem;
      } finally {
        lockObj.unlock();
      }
    }
  }

使用JMH测试吞吐量:

Benchmark                           Mode  Cnt         Score        Error  Units
BenchmarkBlockingDequeu.testProduceAndConsume   thrpt   25     12500542.933      ± 374127.076 ops/s

旧的方式(单锁和等待)

 public class BlockingQueueWait<T> {


    private final Queue<T> queue;
    private final Object lockObj = new Object();
    private int maxLength;
    private int currentSize = 0;

    public BlockingQueueWait(int maxLength) {
      this.queue = new ArrayDeque<T>();
      this.maxLength = maxLength;
    }

    public void offer(T elem) throws InterruptedException {
      synchronized (lockObj) {
        while (currentSize == maxLength) {
          lockObj.wait();
        }
        queue.offer(elem);
        currentSize++;
        lockObj.notifyAll();
      }
    }

    public T poll() throws InterruptedException {
      synchronized (lockObj) {
        while (currentSize == 0) {
          lockObj.wait();
        }
        T elem = queue.poll();
        currentSize--;
        lockObj.notifyAll();
        return elem;
      }
    }
  }

使用JMH测试吞吐量:
Benchmark                         Mode  Cnt        Score       Error  Units
BenchMarkBlockingWait.testProduceAndConsume  thrpt   25     2702842.067    ± 24534.073  ops/s

如果你仔细看看上面的实现,ops /s的差异是巨大的,其中大部分是由虚假的唤醒引起的,并且没有使用显式条件队列和每个锁的等待集,你最终会浪费宝贵的cpu周期。因此,如果您正在编写并发库实现,请记住您有更好的并发支持,并且您可以为每个锁创建多个条件队列以避免虚假唤醒。