目的
假设我们有一个共享内存区域,上面详细介绍了基本约束。可以保护互斥锁后面的共享数据,在这种情况下,没有两个线程可以同时访问数据。但是,此解决方案不是最理想的,因为读取器R1可能具有锁定,然后另一个读取器R2请求访问。R2在开始自己的读操作之前等到R1完成是愚蠢的。相反,R2应立即开始。这是Reader Writer Lock模式的动机。
说明
维基百科说
在计算机科学中,读写器(rw)或共享独占锁(也称为多读写器/单写器锁或多读写器锁或推送锁)是解决读写器问题的同步原语。rw锁允许对只读操作进行并发访问,而写操作则需要独占访问。这意味着多个线程可以并行读取数据,但写入或修改数据需要一个排它锁。当写入程序正在写入数据时,所有其他写入程序或读取程序都将被阻止,直到写入程序完成写入。常见的用法可能是控制对内存中数据结构的访问,这些数据结构不能自动更新,并且在更新完成之前无效(不应被其他线程读取)。
源代码
此示例使用两个互斥锁来演示多个读取器和写入器的并发访问。
类图
第1步:创建Reader类,在获取读锁定时读取。
public class Reader implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(Reader.class);
private Lock readLock;
private String name; private long readingTime;
/** * Create new Reader * * @param name - Name of the thread owning the reader * @param readLock - Lock for this reader * @param readingTime - amount of time (in milliseconds) for this reader to engage reading */ public Reader(String name, Lock readLock, long readingTime) { this.name = name; this.readLock = readLock; this.readingTime = readingTime; } /** * Create new Reader who reads for 250ms * * @param name - Name of the thread owning the reader * @param readLock - Lock for this reader */ public Reader(String name, Lock readLock) { this(name, readLock, 250L); }
@Override public void run() { readLock.lock(); try { read(); } catch (InterruptedException e) { LOGGER.info("InterruptedException when reading", e); Thread.currentThread().interrupt(); } finally { readLock.unlock(); } }
/** * Simulate the read operation * */ public void read() throws InterruptedException { LOGGER.info("{} begin", name); Thread.sleep(readingTime); LOGGER.info("{} finish after reading {}ms", name, readingTime); } }
|
第2步: Writer类,在获取写锁定时写入。
public class Writer implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(Writer.class);
private Lock writeLock;
private String name; private long writingTime;
/** * Create new Writer who writes for 250ms * * @param name - Name of the thread owning the writer * @param writeLock - Lock for this writer */ public Writer(String name, Lock writeLock) { this(name, writeLock, 250L); } /** * Create new Writer * * @param name - Name of the thread owning the writer * @param writeLock - Lock for this writer * @param writingTime - amount of time (in milliseconds) for this reader to engage writing */ public Writer(String name, Lock writeLock, long writingTime) { this.name = name; this.writeLock = writeLock; this.writingTime = writingTime; }
@Override public void run() { writeLock.lock(); try { write(); } catch (InterruptedException e) { LOGGER.info("InterruptedException when writing", e); Thread.currentThread().interrupt(); } finally { writeLock.unlock(); } } /** * Simulate the write operation */ public void write() throws InterruptedException { LOGGER.info("{} begin", name); Thread.sleep(writingTime); LOGGER.info("{} finished after writing {}ms", name, writingTime); } }
|
第3步:现在是时候创建ReaderWriterLock类来控制读写器的访问了。
允许多个读取器同时保持锁定,但如果任何写入程序持有锁,则读取器等待。如果读取器持有锁,则编写器等待。这种锁是不公平的。
public class ReaderWriterLock implements ReadWriteLock { private static final Logger LOGGER = LoggerFactory.getLogger(ReaderWriterLock.class);
private Object readerMutex = new Object();
private int currentReaderCount;
/** * Global mutex is used to indicate that whether reader or writer * gets the lock in the moment. * <p> * 1. When it contains the reference of {@link readerLock}, it means that the lock * is acquired by the reader, another * reader can also do the read operation concurrently. <br> * 2. When it contains the reference of reference of {@link writerLock}, it means that * the lock is acquired by the * writer exclusively, no more reader or writer can get the lock. * <p> * This is the most important field in this class to control the access for reader/writer. */ private Set<Object> globalMutex = new HashSet<>();
private ReadLock readerLock = new ReadLock(); private WriteLock writerLock = new WriteLock();
@Override public Lock readLock() { return readerLock; }
@Override public Lock writeLock() { return writerLock; }
/** * return true when globalMutex hold the reference of writerLock */ private boolean doesWriterOwnThisLock() { return globalMutex.contains(writerLock); }
/** * Nobody get the lock when globalMutex contains nothing * */ private boolean isLockFree() { return globalMutex.isEmpty(); }
/** * Reader Lock, can be access for more than one reader concurrently if no writer get the lock */ private class ReadLock implements Lock {
@Override public void lock() { synchronized (readerMutex) { currentReaderCount++; if (currentReaderCount == 1) { acquireForReaders(); } } }
/** * Acquire the globalMutex lock on behalf of current and future concurrent readers. Make sure no writers currently * owns the lock. */ private void acquireForReaders() { // Try to get the globalMutex lock for the first reader synchronized (globalMutex) { // If the no one get the lock or the lock is locked by reader, just set the reference // to the globalMutex to indicate that the lock is locked by Reader. while (doesWriterOwnThisLock()) { try { globalMutex.wait(); } catch (InterruptedException e) { LOGGER.info("InterruptedException while waiting for globalMutex in acquireForReaders", e); Thread.currentThread().interrupt(); } } globalMutex.add(this); } }
@Override public void unlock() {
synchronized (readerMutex) { currentReaderCount--; // Release the lock only when it is the last reader, it is ensure that the lock is released // when all reader is completely. if (currentReaderCount == 0) { synchronized (globalMutex) { // Notify the waiter, mostly the writer globalMutex.remove(this); globalMutex.notifyAll(); } } }
}
@Override public void lockInterruptibly() throws InterruptedException { throw new UnsupportedOperationException(); }
@Override public boolean tryLock() { throw new UnsupportedOperationException(); }
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { throw new UnsupportedOperationException(); }
@Override public Condition newCondition() { throw new UnsupportedOperationException(); } }
/** * Writer Lock, can only be accessed by one writer concurrently */ private class WriteLock implements Lock {
@Override public void lock() {
synchronized (globalMutex) {
// Wait until the lock is free. while (!isLockFree()) { try { globalMutex.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } // When the lock is free, acquire it by placing an entry in globalMutex globalMutex.add(this); } }
@Override public void unlock() {
synchronized (globalMutex) { globalMutex.remove(this); // Notify the waiter, other writer or reader globalMutex.notifyAll(); } }
@Override public void lockInterruptibly() throws InterruptedException { throw new UnsupportedOperationException(); }
@Override public boolean tryLock() { throw new UnsupportedOperationException(); }
@Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { throw new UnsupportedOperationException(); }
@Override public Condition newCondition() { throw new UnsupportedOperationException(); } }
}
|
第4步:让我们测试一下这个设计模式。
public class ReaderWriterLockDemo {
private static final Logger LOGGER = LoggerFactory.getLogger(App.class);
/** * Program entry point * * @param args command line args */ public static void main(String[] args) {
ExecutorService executeService = Executors.newFixedThreadPool(10); ReaderWriterLock lock = new ReaderWriterLock(); // Start writers IntStream.range(0, 5) .forEach(i -> executeService.submit(new Writer("Writer " + i, lock.writeLock(), ThreadLocalRandom.current().nextLong(5000)))); LOGGER.info("Writers added...");
// Start readers IntStream.range(0, 5) .forEach(i -> executeService.submit(new Reader("Reader " + i, lock.readLock(), ThreadLocalRandom.current().nextLong(10)))); LOGGER.info("Readers added..."); try { Thread.sleep(5000L); } catch (InterruptedException e) { LOGGER.error("Error sleeping before adding more readers", e); Thread.currentThread().interrupt(); }
// Start readers IntStream.range(6, 10) .forEach(i -> executeService.submit(new Reader("Reader " + i, lock.readLock(), ThreadLocalRandom.current().nextLong(10)))); LOGGER.info("More readers added...");
// write operations are exclusive. executeService.shutdown(); try { executeService.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { LOGGER.error("Error waiting for ExecutorService shutdown", e); Thread.currentThread().interrupt(); }
}
}
|
适用性
应用程序需要为多个线程增加资源同步性能,特别是有混合读/写操作。