semaphore使用案例源码
Java提供了下面几种机制防止多线程同时操作一个资源:
- 使用 volatile
- 在类内部使用 java.util.concurrent and java.util.concurrent.atomic
- 同步锁Synchronized
- 使用Semaphore
什么是semaphore?
可以这样描述:允许N个单元被得到,提供得到和释放的机制,它允许我们确保在某个时间只有N个处理线程访问同一个资源。是一种限制连接数的锁。
比如我们可以用来作为连接池限制:
public class ConnectionLimiter {
private final Semaphore semaphore;
private ConnectionLimiter(int maxConcurrentRequests) {
semaphore = new Semaphore(maxConcurrentRequests);
}
public URLConnection acquire(URL url) throws InterruptedException,
IOException {
semaphore.acquire();
return url.openConnection();
}
public void release(URLConnection conn) {
try {
/*
* ... clean up here
*/
} finally {
semaphore.release();
}
}
}
相比同步锁,必须记住:必须在 try..finally 中释放你得到的锁。
可以使用在如下场景:
- 限制并发访问硬盘
- 创建线程的限制
- JDBC连接池/限制
- 网络连接节流
- 节流CPU或内存密集型任务
假设一个场景:有一个内存缓存in-memory cache 保存有 ‘Person’类型对象. 用户能够插入和查询记录,展示如何使用semaphores在缓存中控制并发。
import java.util.concurrent.Semaphore;
/**
* This class will allow thread to acquire and release locks as required
*
* @author dinuka.arseculeratne
*
*/
public class PersonLock {
/**
* We do not want multiple lock objects lying around so we make ths class
* singleton
*/
private PersonLock() {
}
/**
* Bill Pugh's way of lazy initializing the singleton instance
*
* @author dinuka.arseculeratne
*
*/
private static class SingletonHolder {
public static final PersonLock INSTANCE = new PersonLock();
}
/**
* Use this method to get a reference to the singleton instance of
* {@link PersonLock}
*
* @return the singleton instance
*/
public static PersonLock getInstance() {
return SingletonHolder.INSTANCE;
}
/**
* In this sample, we allow only one thread at at time to update the cache
* in order to maintain consistency
*/
private Semaphore writeLock = new Semaphore(1);
/**
* We allow 10 concurrent threads to access the cache at any given time
*/
private Semaphore readLock = new Semaphore(10);
public void getWriteLock() throws InterruptedException {
writeLock.acquire();
}
public void releaseWriteLock() {
writeLock.release();
}
public void getReadLock() throws InterruptedException {
readLock.acquire();
}
public void releaseReadLock() {
readLock.release();
}
}
这个类将处理的过程中获取和释放锁,使我们的高速缓存线程安全的。我已经使用了两个单独的锁在这里读和写。这样做的理由是为了让用户读取数据,尽管可能公司脏数据。
假设在使用10个线程可同时获得锁,用来访问的缓存读取的数据。接下来,用了一个写锁:'这意味着只有一个线程可以访问缓存把记录放入。为了保持一致性,这在缓存中是很重要的。也就是说,我们不希望多个线程试图插入条目到Map中,这将导致不可预知的行为(至少在某些情况下)。,通过使用的信号量它获取一个锁有两种方法。
1. acquire() : 是一个阻塞呼叫等待,直到锁被释放或线程被中断
2. tryAcquire() : 是一个非阻塞调用将立即返回,返回真或假标志意味着锁是否得到。
这里使用堵塞锁,因为我想调用线程等待,直到锁可用。当然,这将取决于你的使用情况。您也可以定义一个超时周期tryAcquire()方法,使线程不会无限期地等待一个锁
下面是缓存实现 :
import java.util.HashMap;
import java.util.Map;
/**
* A mock storage to hold the person objects in a map
*
* @author dinuka.arseculeratne
*
*/
public class PersonStorage {
private Map<Integer, Person> personCache = new HashMap<Integer, Person>();
private int counter = 0;
/**
* This class is made singleton and hence the constructor is made private
*/
private PersonStorage() {
}
/**
* Bill Pugh's way of lazy initializing the singleton instance
*
* @author dinuka.arseculeratne
*
*/
private static final class SingletonHolder {
public static final PersonStorage INSTANCE = new PersonStorage();
}
/**
* Use this method to get a reference to the singleton instance of
* {@link PersonStorage}
*
* @return the singleton instance
*/
public static PersonStorage getInstance()
{
return SingletonHolder.INSTANCE;
}
/**
* Inserts the person into the map. Note that we use defensive copying so
* that even if the client changes the object later on, those changes will
* not be reflected in the object within the map
*
* @param person
* the instance of {@link Person} to be inserted
* @return the key which signifies the location of the person object
* @throws InterruptedException
*/
public int putPerson(Person person) throws InterruptedException {
Person copyPerson = person.copyPerson();
personCache.put(++counter, copyPerson);
return counter;
}
/**
* Here as well we use defensive copying so that the value of the object
* reference within the map is not passed in to the calling party.
*
* @param id
* the id representing the location of the object within the map
* @return the instance of the {@link Person} represented by the key passed
* in
* @throws InterruptedException
*/
public Person retrievePerson(int id) throws InterruptedException {
PersonLock.getInstance().getReadLock();
if (!personCache.containsKey(id)) {
throw new RuntimeException('Key is not found');
}
PersonLock.getInstance().releaseReadLock();
return personCache.get(id).copyPerson();
}
}
测试调用客户端:
/**
* A test class to demonstrate the locking at work
*
* @author dinuka.arseculeratne
*
*/
public class TestLock {
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
Person p1 = new Person(1L, 'Test1', 'XYZ');
try {
PersonLock.getInstance().getWriteLock();
PersonStorage.getInstance().putPerson(p1);
} catch (InterruptedException e) {
// Exception handling need to be done
e.printStackTrace();
}
finally{
PersonLock.getInstance().releaseWriteLock();
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
Person p2 = new Person(2L, 'Test123', 'ABC');
try {
PersonLock.getInstance().getWriteLock();
PersonStorage.getInstance().putPerson(p2);
} catch (InterruptedException e) {
// Exception handling need to be done
}
finally{
PersonLock.getInstance().releaseWriteLock();
}
}
});
t1.start();
t2.start();
System.out.println(PersonStorage.getInstance().retrievePerson(2));
}
}
java多线程
Java同步或锁
Java性能调优