并发主题

使用Guava Striped类实现高并发

使用Guava Striped类实现细粒度的并发开发,ConcurrentHashMap 使用了striped锁来提高并发性,而Guava的 Striped类就是继承这个原理,为我们提供了striped锁 ReadWriteLocks 和 Semaphores.。当访问Array or HashMap 的对象或数据结构时,我们通常是同步整个对象,但是在有些情况下是没有必要的。

Striped 锁Locking/信号Semaphores使用案例

尽可能缩小使用同步的代码范围,比如有一个 URL的ArrayList,代表远程资源,我们要在给定的时间限制访问这个资源的线程总数,限制线程的访问数适合用java.util.concurrent.Semaphore,但是ArrayList本身会成为瓶颈,我们使用Striped来限制在某个时间内访问集合中每个URL资源的线程数目。

从Striped获得一个读写锁:

Striped<ReadWriteLock> rwLockStripes = Striped.readWriteLock(10);

上面是得到一个强引用,垃圾回收不会回收,除非Striped类自己被清除。下面是得到一个弱引用:

int permits = 5;
int numberStripes = 10;
Striped<Semaphore> lazyWeakSemaphore = Striped.lazyWeakSemaphore(numberStripes,permits);

下面展示访问和使用Striped锁,类似JDK的读写锁使用:

String key = "taskA";
ReadWriteLock rwLock = rwLockStripes.get(key);
try{
rwLock.lock();
.....
}finally{
rwLock.unLock();
}

下面是两种对资源访问的比较,一个使用了Striped,一个未使用,直接使用Semaphore:

public class ConcurrentWorker {

private Striped<Semaphore> stripedSemaphores = Striped.semaphore(10,3);
private Semaphore semaphore = new Semaphore(3);

public void stripedConcurrentAccess(String url) throws Exception{
Semaphore stripedSemaphore = stripedSemaphores.get(url);
stripedSemaphore.acquire();
try{
//Access restricted resource here
Thread.sleep(25);
}finally{
stripedSemaphore.release();
}
}

public void nonStripedConcurrentAccess(String url) throws Exception{
semaphore.acquire();
try{
//Access restricted resource here
Thread.sleep(25);
}finally{
semaphore.release();
}
}
}

使用下面代码进行测试:

public class StripedExampleDriver {

private ExecutorService executorService = Executors.newCachedThreadPool();
private int numberThreads = 300;
private CountDownLatch startSignal = new CountDownLatch(1);
private CountDownLatch endSignal = new CountDownLatch(numberThreads);
private Stopwatch stopwatch = Stopwatch.createUnstarted();
private ConcurrentWorker worker = new ConcurrentWorker();
private static final boolean USE_STRIPES = true;
private static final boolean NO_STRIPES = false;
private static final int POSSIBLE_TASKS_PER_THREAD = 10;
private List<String> data = Lists.newArrayList();

public static void main(String[] args) throws Exception {
StripedExampleDriver driver = new StripedExampleDriver();
driver.createData();
driver.runStripedExample();
driver.reset();
driver.runNonStripedExample();
driver.shutdown();
}

private void runStripedExample() throws InterruptedException {
runExample(worker, USE_STRIPES, "Striped work");
}

private void runNonStripedExample() throws InterruptedException {
runExample(worker, NO_STRIPES, "Non-Striped work");
}

private void runExample(final ConcurrentWorker worker, final boolean isStriped, String type) throws InterruptedException {
for (int i = 0; i < numberThreads; i++) {
final String value = getValue(i % POSSIBLE_TASKS_PER_THREAD);
executorService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
startSignal.await();
if (isStriped) {
worker.stripedConcurrentAccess(value);
} else {
worker.nonStripedConcurrentAccess(value);
}
endSignal.countDown();
return null;
}
});
}
stopwatch.start();
startSignal.countDown();
endSignal.await();
stopwatch.stop();
System.out.println("Time for" + type + " work [" + stopwatch.elapsed(TimeUnit.MILLISECONDS) + "] millis");
}
//details left out for clarity

结果显示:

Time forStriped work work [261] millis
Time forNon-Striped work work [2596] millis