Java企业教程系列

使用Java8的Lambda简化ReadWriteLock

  Java 8的lambda表达式能够显著提高代码质量和可读性。这里我们看看如何使得ReadWriteLock简化. 假设我们有一个类为Buffer,能够保存队列中最后几个消息,能够计数和去除老的消息,原先Java实现如下:

public class Buffer {

 

    private final int capacity;

    private final Deque<String> recent;

    private int discarded;

 

    public Buffer(int capacity) {

        this.capacity = capacity;

        this.recent = new ArrayDeque<>(capacity);

    }

 

    public void putItem(String item) {

        while (recent.size() >= capacity) {

            recent.removeFirst();

            ++discarded;

        }

        recent.addLast(item);

    }

 

    public List<String> getRecent() {

        final ArrayList<String> result = new ArrayList<>();

        result.addAll(recent);

        return result;

    }

 

    public int getDiscardedCount() {

        return discarded;

    }

 

    public int getTotal() {

        return discarded + recent.size();

    }

 

    public void flush() {

        discarded += recent.size();

        recent.clear();

    }

 

}

现在我们要多次使用putItem(), 但是内部最新的队列有其最大容量,它能够记得为防止内存泄漏去除了多少个条目。这个类在单线程下工作得很好,我们并使用了非线程安全型的ArrayDeque 和非同步 的int. 当对int的读写是原子时,在不同线程中改变值彼此并不能保证相互显现, 即使我们使用线程安全的BlockingDeque和AtomicInteger,我们还是会无法逃避争夺情况,因为这两个变量并不是在一个同步块中同步处理。

一个方式是将所有方法进行同步,方法中加入synchronized, 但是这相当有限制性,而且我们的读情况大于写情况,在这样情况下,读写锁ReadWriteLock是一个吸引人的选择,它其实由两部分锁组成– 一个是供读,一个是供写. 实际上,他们都在竞争同样的锁,这个锁可以由一个写或多个读线程同时获得,这样在没有人写时,我们可以获得并发读能力,偶尔发生的写操作会堵塞所有的读,而使用同步synchronized 会总是堵塞所有其他的读或写。

ReadWriteLock不足之处是引入了太多公式化,你得显式打开一个锁,然后记得关闭它unlock() ,代码变得难于阅读:

public class Buffer {

 

    private final int capacity;

    private final Deque<String> recent;

    private int discarded;

 

    private final Lock readLock;

    private final Lock writeLock;

 

 

    public Buffer(int capacity) {

        this.capacity = capacity;

        recent = new ArrayDeque<>(capacity);

        final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();

        readLock = rwLock.readLock();

        writeLock = rwLock.writeLock();

    }

 

    public void putItem(String item) {

        writeLock.lock();

        try {

            while (recent.size() >= capacity) {

                recent.removeFirst();

                ++discarded;

            }

            recent.addLast(item);

        } finally {

            writeLock.unlock();

        }

    }

 

    public List<String> getRecent() {

        readLock.lock();

        try {

            final ArrayList<String> result = new ArrayList<>();

            result.addAll(recent);

            return result;

        } finally {

            readLock.unlock();

}

 

    public int getDiscardedCount() {

        readLock.lock();

        try {

            return discarded;

        } finally {

            readLock.unlock();

        }

    }

 

    public int getTotal() {

        readLock.lock();

        try {

            return discarded + recent.size();

        } finally {

            readLock.unlock();

        }

    }

 

    public void flush() {

        writeLock.lock();

        try {

            discarded += recent.size();

            recent.clear();

        } finally {

            writeLock.unlock();

        }

    }

 

}

使用Java 8的Lambda表达式我们可以横切锁这样的关注(类似AOP),将读写锁专门做一个类如下:

public class FunctionalReadWriteLock {

 

    private final Lock readLock;

    private final Lock writeLock;

 

    public FunctionalReadWriteLock() {

        this(new ReentrantReadWriteLock());

    }

 

    public FunctionalReadWriteLock(ReadWriteLock lock) {

        readLock = lock.readLock();

        writeLock = lock.writeLock();

    }

 

    public <T> T read(Supplier<T> block) {

        readLock.lock();

        try {

            return block.get();

        } finally {

            readLock.unlock();

        }

    }

 

    public void read(Runnable block) {

        readLock.lock();

        try {

            block.run();

        } finally {

            readLock.unlock();

        }

    }

 

    public <T> T write(Supplier<T> block) {

        writeLock.lock();

        try {

            return block.get();

        } finally {

            writeLock.unlock();

        }

public void write(Runnable block) {

        writeLock.lock();

        try {

            block.run();

        } finally {

            writeLock.unlock();

        }

    }

 

}

这个类我们实现了ReadWriteLock的包装(适配器模式),提供了一系列实用方法,原理上我们可以传递一个Runnable 或 Supplier<T> (这是一个只有 T get() 方法的接口) ,我们就能确保其运行能够被锁围绕,这样我们能在客户端使用Lambda简化调用代码:

public class Buffer {

 

    private final int capacity;

    private final Deque<String> recent;

    private int discarded;

 

    private final FunctionalReadWriteLock guard;

 

    public Buffer(int capacity) {

        this.capacity = capacity;

        recent = new ArrayDeque<>(capacity);

        guard = new FunctionalReadWriteLock();

    }

 

    public void putItem(String item) {

        guard.write(() -> {

            while (recent.size() >= capacity) {

                recent.removeFirst();

                ++discarded;

            }

            recent.addLast(item);

        });

    }

 

    public List<String> getRecent() {

        return guard.read(() -> {

            return recent.stream().collect(toList());

        });

    }

 

    public int getDiscardedCount() {

        return guard.read(() -> discarded);

    }

 

    public int getTotal() {

        return guard.read(() -> discarded + recent.size());

    }

 

    public void flush() {

        guard.write(() -> {

            discarded += recent.size();

            recent.clear();

        });

    }

 

}

注意到这里putItem方法中我们使用Lambda表达式作为FunctionalReadWriteLock方法write(Supplier<T> block)的参数。

Java8还提供了集合转换为集合的方法,我们使用如下:

public void flush() {

    guard.write(this::unsafeFlush);

}

 

private void unsafeFlush() {

    discarded += recent.size();

    recent.clear();

}

 

public List<String> getRecent() {

    return guard.read(this::defensiveCopyOfRecent);

}

 

private List<String> defensiveCopyOfRecent() {

    return recent.stream().collect(toList());

}

 

Closure Lambda和Monad

Java 8教程

函数编程

并发编程