并发主题

semaphore使用案例源码

Java提供了下面几种机制防止多线程同时操作一个资源:

  1. 使用 volatile 
  2. 在类内部使用 java.util.concurrent and java.util.concurrent.atomic
  3. 同步锁Synchronized
  4. 使用Semaphore

什么是semaphore?

可以这样描述:允许N个单元被得到,提供得到和释放的机制,它允许我们确保在某个时间只有N个处理线程访问同一个资源。是一种限制连接数的锁。

比如我们可以用来作为连接池限制:

public class ConnectionLimiter {
   private final Semaphore semaphore;

   private ConnectionLimiter(int maxConcurrentRequests) {
       semaphore = new Semaphore(maxConcurrentRequests);
   }

   public URLConnection acquire(URL url) throws InterruptedException,
                                                IOException {
       semaphore.acquire();
       return url.openConnection();
   }

   public void release(URLConnection conn) {
       try {
           /*
           * ... clean up here
           */
       } finally {
           semaphore.release();
       }
   }
}

相比同步锁,必须记住:必须在 try..finally 中释放你得到的锁。

可以使用在如下场景:

  • 限制并发访问硬盘
  • 创建线程的限制
  • JDBC连接池/限制
  • 网络连接节流
  • 节流CPU或内存密集型任务

 

假设一个场景:有一个内存缓存in-memory cache 保存有 ‘Person’类型对象. 用户能够插入和查询记录,展示如何使用semaphores在缓存中控制并发。

import java.util.concurrent.Semaphore;

/**
 * This class will allow thread to acquire and release locks as required
 *
 * @author dinuka.arseculeratne
 *
 */
public class PersonLock {

 /**
  * We do not want multiple lock objects lying around so we make ths class
  * singleton
  */
 private PersonLock() {

 }

 /**
  * Bill Pugh's way of lazy initializing the singleton instance
  *
  * @author dinuka.arseculeratne
  *
  */
 private static class SingletonHolder {
  public static final PersonLock INSTANCE = new PersonLock();
 }

 /**
  * Use this method to get a reference to the singleton instance of
  * {@link PersonLock}
  *
  * @return the singleton instance
  */
 public static PersonLock getInstance() {
  return SingletonHolder.INSTANCE;
 }

 /**
  * In this sample, we allow only one thread at at time to update the cache
  * in order to maintain consistency
  */
 private Semaphore writeLock = new Semaphore(1);

 /**
  * We allow 10 concurrent threads to access the cache at any given time
  */
 private Semaphore readLock = new Semaphore(10);

 public void getWriteLock() throws InterruptedException {
  writeLock.acquire();
 }

 public void releaseWriteLock() {
  writeLock.release();
 }

 public void getReadLock() throws InterruptedException {
  readLock.acquire();
 }

 public void releaseReadLock() {
  readLock.release();
 }
}

这个类将处理的过程中获取和释放锁,使我们的高速缓存线程安全的。我已经使用了两个单独的锁在这里读和写。这样做的理由是为了让用户读取数据,尽管可能公司脏数据。

假设在使用10个线程可同时获得锁,用来访问的缓存读取的数据。接下来,用了一个写锁:'这意味着只有一个线程可以访问缓存把记录放入。为了保持一致性,这在缓存中是很重要的。也就是说,我们不希望多个线程试图插入条目到Map中,这将导致不可预知的行为(至少在某些情况下)。,通过使用的信号量它获取一个锁有两种方法。

1. acquire() : 是一个阻塞呼叫等待,直到锁被释放或线程被中断

2. tryAcquire() : 是一个非阻塞调用将立即返回,返回真或假标志意味着锁是否得到。

这里使用堵塞锁,因为我想调用线程等待,直到锁可用。当然,这将取决于你的使用情况。您也可以定义一个超时周期tryAcquire()方法,使线程不会无限期地等待一个锁

下面是缓存实现 :

import java.util.HashMap;
import java.util.Map;

/**
 * A mock storage to hold the person objects in a map
 *
 * @author dinuka.arseculeratne
 *
 */
public class PersonStorage {

 private Map<Integer, Person> personCache = new HashMap<Integer, Person>();

 private int counter = 0;

 /**
  * This class is made singleton and hence the constructor is made private
  */
 private PersonStorage() {

 }

 /**
  * Bill Pugh's way of lazy initializing the singleton instance
  *
  * @author dinuka.arseculeratne
  *
  */
 private static final class SingletonHolder {
  public static final PersonStorage INSTANCE = new PersonStorage();
 }
 
 /**
  * Use this method to get a reference to the singleton instance of
  * {@link PersonStorage}
  *
  * @return the singleton instance
  */
 public static PersonStorage getInstance()
 {
  return SingletonHolder.INSTANCE;
 }

 /**
  * Inserts the person into the map. Note that we use defensive copying so
  * that even if the client changes the object later on, those changes will
  * not be reflected in the object within the map
  *
  * @param person
  *            the instance of {@link Person} to be inserted
  * @return the key which signifies the location of the person object
  * @throws InterruptedException
  */
 public int putPerson(Person person) throws InterruptedException {
 
  Person copyPerson = person.copyPerson();
  personCache.put(++counter, copyPerson);
 
  return counter;
 }

 /**
  * Here as well we use defensive copying so that the value of the object
  * reference within the map is not passed in to the calling party.
  *
  * @param id
  *            the id representing the location of the object within the map
  * @return the instance of the {@link Person} represented by the key passed
  *         in
  * @throws InterruptedException
  */
 public Person retrievePerson(int id) throws InterruptedException {
  PersonLock.getInstance().getReadLock();
  if (!personCache.containsKey(id)) {
   throw new RuntimeException('Key is not found');
  }
  PersonLock.getInstance().releaseReadLock();
  return personCache.get(id).copyPerson();
 }

}

测试调用客户端:

/**
 * A test class to demonstrate the locking at work
 *
 * @author dinuka.arseculeratne
 *
 */
public class TestLock {

 public static void main(String[] args) throws InterruptedException {

  Thread t1 = new Thread(new Runnable() {

   @Override
   public void run() {
   
    Person p1 = new Person(1L, 'Test1', 'XYZ');
    try {
PersonLock.getInstance().getWriteLock();
PersonStorage.getInstance().putPerson(p1);
    } catch (InterruptedException e) {
     // Exception handling need to be done
     e.printStackTrace();
    }
   finally{
          PersonLock.getInstance().releaseWriteLock();
    }
   }
  });

  Thread t2 = new Thread(new Runnable() {

   @Override
   public void run() {
   
    Person p2 = new Person(2L, 'Test123', 'ABC');

    try {
PersonLock.getInstance().getWriteLock();

     PersonStorage.getInstance().putPerson(p2);
    } catch (InterruptedException e) {
     // Exception handling need to be done
    }
 finally{
          PersonLock.getInstance().releaseWriteLock();
    }
   
   }
  });

  t1.start();
  t2.start();

  System.out.println(PersonStorage.getInstance().retrievePerson(2));
 }
}

 

java多线程

Java同步或锁

Java性能调优