读写锁(Reader Writer Lock)

19-04-24 jdon
                   

目的

假设我们有一个共享内存区域,上面详细介绍了基本约束。可以保护互斥锁后面的共享数据,在这种情况下,没有两个线程可以同时访问数据。但是,此解决方案不是最理想的,因为读取器R1可能具有锁定,然后另一个读取器R2请求访问。R2在开始自己的读操作之前等到R1完成是愚蠢的。相反,R2应立即开始。这是Reader Writer Lock模式的动机。

说明

维基百科说

在计算机科学中,读写器(rw)或共享独占锁(也称为多读写器/单写器锁或多读写器锁或推送锁)是解决读写器问题的同步原语。rw锁允许对只读操作进行并发访问,而写操作则需要独占访问。这意味着多个线程可以并行读取数据,但写入或修改数据需要一个排它锁。当写入程序正在写入数据时,所有其他写入程序或读取程序都将被阻止,直到写入程序完成写入。常见的用法可能是控制对内存中数据结构的访问,这些数据结构不能自动更新,并且在更新完成之前无效(不应被其他线程读取)。

源代码

此示例使用两个互斥锁来演示多个读取器和写入器的并发访问。

类图

第1步:创建Reader类,在获取读锁定时读取。

public class Reader implements Runnable {

  private static final Logger LOGGER = LoggerFactory.getLogger(Reader.class);

  private Lock readLock;

  private String name;
  
  private long readingTime;

  /**
   * Create new Reader
   * 
   * @param name - Name of the thread owning the reader
   * @param readLock - Lock for this reader
   * @param readingTime - amount of time (in milliseconds) for this reader to engage reading
   */
  public Reader(String name, Lock readLock, long readingTime) {
    this.name = name;
    this.readLock = readLock;
    this.readingTime = readingTime;
  }
  
  /**
   * Create new Reader who reads for 250ms
   * 
   * @param name - Name of the thread owning the reader
   * @param readLock - Lock for this reader
   */
  public Reader(String name, Lock readLock) {
    this(name, readLock, 250L);
  }

  @Override
  public void run() {
    readLock.lock();
    try {
      read();
    } catch (InterruptedException e) {
      LOGGER.info("InterruptedException when reading", e);
      Thread.currentThread().interrupt();
    } finally {
      readLock.unlock();
    }
  }

  /**
   * Simulate the read operation
   * 
   */
  public void read() throws InterruptedException {
    LOGGER.info("{} begin", name);
    Thread.sleep(readingTime);
    LOGGER.info("{} finish after reading {}ms", name, readingTime);
  }
}

第2步: Writer类,在获取写锁定时写入。

public class Writer implements Runnable {

  private static final Logger LOGGER = LoggerFactory.getLogger(Writer.class);

  private Lock writeLock;

  private String name;
  
  private long writingTime;

  /**
   * Create new Writer who writes for 250ms
   * 
   * @param name - Name of the thread owning the writer
   * @param writeLock - Lock for this writer
   */
  public Writer(String name, Lock writeLock) {
    this(name, writeLock, 250L);
  }
  
  /**
   * Create new Writer
   * 
   * @param name - Name of the thread owning the writer
   * @param writeLock - Lock for this writer
   * @param writingTime - amount of time (in milliseconds) for this reader to engage writing
   */
  public Writer(String name, Lock writeLock, long writingTime) {
    this.name = name;
    this.writeLock = writeLock;
    this.writingTime = writingTime;
  }


  @Override
  public void run() {
    writeLock.lock();
    try {
      write();
    } catch (InterruptedException e) {
      LOGGER.info("InterruptedException when writing", e);
      Thread.currentThread().interrupt();
    } finally {
      writeLock.unlock();
    }
  }
  
  /**
   * Simulate the write operation
   */
  public void write() throws InterruptedException {
    LOGGER.info("{} begin", name);
    Thread.sleep(writingTime);
    LOGGER.info("{} finished after writing {}ms", name, writingTime);
  }
}

第3步:现在是时候创建ReaderWriterLock类来控制读写器的访问了。

允许多个读取器同时保持锁定,但如果任何写入程序持有锁,则读取器等待。如果读取器持有锁,则编写器等待。这种锁是不公平的。

public class ReaderWriterLock implements ReadWriteLock {
  
  private static final Logger LOGGER = LoggerFactory.getLogger(ReaderWriterLock.class);


  private Object readerMutex = new Object();

  private int currentReaderCount;

  /**
   * Global mutex is used to indicate that whether reader or writer 
   * gets the lock in the moment.
   * <p>
   * 1. When it contains the reference of {@link #readerLock}, it means that the lock 
    * is acquired by the reader, another
   * reader can also do the read operation concurrently. <br>
   * 2. When it contains the reference of reference of {@link #writerLock}, it means that 
    * the lock is acquired by the
   * writer exclusively, no more reader or writer can get the lock.
   * <p>
   * This is the most important field in this class to control the access for reader/writer.
   */
  private Set<Object> globalMutex = new HashSet<>();

  private ReadLock readerLock = new ReadLock();
  private WriteLock writerLock = new WriteLock();

  @Override
  public Lock readLock() {
    return readerLock;
  }

  @Override
  public Lock writeLock() {
    return writerLock;
  }

  /**
   * return true when globalMutex hold the reference of writerLock
   */
  private boolean doesWriterOwnThisLock() {
    return globalMutex.contains(writerLock);
  }

  /**
   * Nobody get the lock when globalMutex contains nothing
   * 
   */
  private boolean isLockFree() {
    return globalMutex.isEmpty();
  }

  /**
   * Reader Lock, can be access for more than one reader concurrently if no writer get the lock
   */
  private class ReadLock implements Lock {

    @Override
    public void lock() {
      synchronized (readerMutex) {
        currentReaderCount++;
        if (currentReaderCount == 1) {
          acquireForReaders();
        }
      }
    }

    /**
     * Acquire the globalMutex lock on behalf of current and future concurrent readers. Make sure no writers currently
     * owns the lock.
     */
    private void acquireForReaders() {
      // Try to get the globalMutex lock for the first reader
      synchronized (globalMutex) {
        // If the no one get the lock or the lock is locked by reader, just set the reference
        // to the globalMutex to indicate that the lock is locked by Reader.
        while (doesWriterOwnThisLock()) {
          try {
            globalMutex.wait();
          } catch (InterruptedException e) {
            LOGGER.info("InterruptedException while waiting for globalMutex in acquireForReaders", e);
            Thread.currentThread().interrupt();
          }
        }
        globalMutex.add(this);
      }
    }

    @Override
    public void unlock() {

      synchronized (readerMutex) {
        currentReaderCount--;
        // Release the lock only when it is the last reader, it is ensure that the lock is released
        // when all reader is completely.
        if (currentReaderCount == 0) {
          synchronized (globalMutex) {
            // Notify the waiter, mostly the writer
            globalMutex.remove(this);
            globalMutex.notifyAll();
          }
        }
      }

    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
      throw new UnsupportedOperationException();
    }

    @Override
    public boolean tryLock() {
      throw new UnsupportedOperationException();
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
      throw new UnsupportedOperationException();
    }

    @Override
    public Condition newCondition() {
      throw new UnsupportedOperationException();
    }
  }

  /**
   * Writer Lock, can only be accessed by one writer concurrently
   */
  private class WriteLock implements Lock {

    @Override
    public void lock() {

      synchronized (globalMutex) {

        // Wait until the lock is free.
        while (!isLockFree()) {
          try {
            globalMutex.wait();
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
          }
        }
        // When the lock is free, acquire it by placing an entry in globalMutex
        globalMutex.add(this);
      }
    }

    @Override
    public void unlock() {

      synchronized (globalMutex) {
        globalMutex.remove(this);
        // Notify the waiter, other writer or reader
        globalMutex.notifyAll();
      }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
      throw new UnsupportedOperationException();
    }

    @Override
    public boolean tryLock() {
      throw new UnsupportedOperationException();
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
      throw new UnsupportedOperationException();
    }

    @Override
    public Condition newCondition() {
      throw new UnsupportedOperationException();
    }
  }

}

第4步:让我们测试一下这个设计模式。

public class ReaderWriterLockDemo {

  private static final Logger LOGGER = LoggerFactory.getLogger(App.class);

  /**
   * Program entry point
   * 
   * @param args command line args
   */
  public static void main(String[] args) {

    ExecutorService executeService = Executors.newFixedThreadPool(10);
    ReaderWriterLock lock = new ReaderWriterLock();
    
    // Start writers
    IntStream.range(0, 5)
        .forEach(i -> executeService.submit(new Writer("Writer " + i, lock.writeLock(), 
            ThreadLocalRandom.current().nextLong(5000))));
    LOGGER.info("Writers added...");

    // Start readers
    IntStream.range(0, 5)
        .forEach(i -> executeService.submit(new Reader("Reader " + i, lock.readLock(), 
            ThreadLocalRandom.current().nextLong(10))));
    LOGGER.info("Readers added...");
    
    try {
      Thread.sleep(5000L);
    } catch (InterruptedException e) {
      LOGGER.error("Error sleeping before adding more readers", e);
      Thread.currentThread().interrupt();
    }

    // Start readers
    IntStream.range(6, 10)
        .forEach(i -> executeService.submit(new Reader("Reader " + i, lock.readLock(), 
            ThreadLocalRandom.current().nextLong(10))));
    LOGGER.info("More readers added...");   

    // write operations are exclusive.
    executeService.shutdown();
    try {
      executeService.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      LOGGER.error("Error waiting for ExecutorService shutdown", e);
      Thread.currentThread().interrupt();
    }

  }

}

适用性

应用程序需要为多个线程增加资源同步性能,特别是有混合读/写操作。