多线程锁的问题

09-12-11 xysniper
需求:有两个方法:put()与get(),多个线程都会调用put和get,要求put与get同步,put自己也要同步

问题:请问使用ReentrantReadWriteLock可以解决吗?

banq
2009-12-11 13:11
可以,当然前提是put操作一般并发可能性小,主要以读为主。

xysniper
2009-12-11 15:03
其实,我的意思是说方法A和B同步,A还要和C同步,能实现吗?如下的代码是否会有问题?

如果C方法同步对的话,那么B和C相当于用同一个WL锁,岂不是执行B时,C要等,执行C时B要等,如何做到B和C异步?

public class Test extends Thread {
  private static final ReentrantReadWriteLock rrwl = new ReentrantReadWriteLock();
  private final Lock rl = rrwl.readLock();
  private final Lock wl = rrwl.writeLock();

  public static void main(String[] args) {
     new Test().start();
     new Test().start();
     new Test().start();
     new Test().start();
     new Test().start();
     new Test().start();
  }
  public void A() {
    rl.lock();
    ...
    rl.unlock();
  }

  public void B() {
    wl.lock();
     ...
    wl.unlock();
  }

/**
 * 这里的实现对吗?
 */
  public void C() {
    wl.lock();
     ...
    wl.unlock();
  }

}
<p>

[该贴被xysniper于2009-12-11 15:54修改过]

banq
2009-12-11 17:49
如果想实现异步,就是在两者之前引入Queue,如果你想B和C异步,就两者之间引入Queue,如这个帖子代码:

http://www.jdon.com/jivejdon/thread/37820#23126255

xysniper
2009-12-11 21:38
To banq

可能我没有说清楚问题,

需求:有一个队列,里面是一些用户号码信息,其中包含多个相同的号码,但入库时只保留最近一个号码,其余号码不入库,多个线程不断地从此队列中拿出数据,查库后,如果此号码没有在表中,就得插入到insertList中,如果有就插入到updateList,最后判断如果这两个list的size大于一个数,就执行batchInsert and batchUpdate到数据库。

问题:为了访止库中插入重复用户号码,需要在查询库操作和入库操作之间同步;为了访止队列中重复的号码被提交到数据库,需要使用map储存,过滤掉旧的相同号码。

程序清单:见下面

[该贴被xysniper于2009-12-12 11:06修改过]

xysniper
2009-12-11 22:18
程序清单:
/**
 * 该程序测试成功,数据库中没有重复数据,队列中也没有丢数据
 */
public class Processor extends Thread {
	private TestDao testDao;
	private ReceivedQueue receivedQueue;
	private static final ReentrantReadWriteLock rwLockCache = new ReentrantReadWriteLock();
	private static final ReentrantReadWriteLock rwLockDb = new ReentrantReadWriteLock();
	private static Map<String, UserVO> insertMap = new HashMap<String, UserVO>();
        private static Map<String, UserVO> updateMap = new HashMap<String, UserVO>();

	public Processor(ReceivedQueue receivedQueue, int dbTarget) {
		this.receivedQueue = receivedQueue;
		DaoManager daoManager = DaoConfig.getDaoManager(dbTarget);
		testDao = (TestDao)daoManager.getDao(TestDao.class);
	}

	public void run() {
		while(true) {
			UserVO testvo = receivedQueue.dequeue();
			if(testvo != null) {
				addToList(testvo);

				if(insertMap.size() > 5000) {
					insToDb();
				}
				if(updateMap.size() > 5000) {
					updeToDb();
				}
			} else {
				if(insertMap.size() > 0) {
					insToDb();
				}
				if(updateMap.size() > 0) {
					updeToDb();
				}
				if(receivedQueue.isStop() && receivedQueue.isEmpty()) {
					break;
				}
			}
		}
	}

	public void addToList(UserVO uservo) {
		rwLockDb.readLock().lock();
		try	{
			UserVO user = testDao.getUser(uservo.getMsisdn());
			if(user == null) {
				addToInsList(uservo);
			} else {
				addToUpdeList(uservo);
			}
		} finally {
			rwLockDb.readLock().unlock();
		}
	}
	public void insToDb() {
		rwLockDb.writeLock().lock();
		try	{
		  testDao.batchInsert((List<UserVO>)insertMap.values());			
		} finally {
		  insertMap.clear();
		  rwLockDb.writeLock().unlock();
		}
	}
	public void updeToDb() {
		rwLockDb.writeLock().lock();
		try	{
		  testDao.batchUpdate((List<UserVO>)updateMap.values());			
		} finally {
		  updateMap.clear();
		  rwLockDb.writeLock().unlock();
		}
	}
	public void addToInsList(UserVO user) {
		wrLockCache.writeLock().lock();
		try	{
			UserVO uv = insertMap.get(user.getMsisdn());
			if(uv!=null) {
				if(user.getUpdateDate().after(uv.getUpdateDate())) {
					uv.put(user.getMsisdn(), user);
				}
			} else {
				uv.put(user.getMsisdn(), user);
			}
		} finally {
			wrLockCache.writeLock().unlock();
		}
	}

	public void addToUpdeList(UserVO user) {
		wrLockCache.writeLock().lock();
		try	{
			UserVO uv = updateMap.get(user.getMsisdn());
			if(uv!=null) {
				if(user.getUpdateDate().after(uv.getUpdateDate())) {
					uv.put(user.getMsisdn(), user);
				}
			} else {
				uv.put(user.getMsisdn(), user);
			}
		} finally {
			wrLockCache.writeLock().unlock();
		}
	}
}
<p>

[该贴被xysniper于2009-12-12 11:08修改过]

xysniper
2009-12-11 22:24
请banq和各位看看,我这个需求使用读写锁是否合理,其实上我要不断地读取数据库(即代码testDao.getUser(uservo.getMsisdn()))将数据分发到两个List中去,还要不断要将List中的数据插入或更新到数据库中,读数据库相当于read操作,存库相当于write操作,read和write操作都很频繁,但是因为我insertList和updateList中的数只有达到一定数量时才批量入库,所以我想让多个线程同时读,相当于读要并发,这样快些,因此使用ReentrantReadWriteLock的read锁,而入库使用ReentrantReadWriteLock的write锁,防止write和read冲突

 /*
 * 这是启动程序,另外有一个job定时去解决文件,将解析出的数据不断地调用receivedQueue.enqueue(UserVO uservo)压入接受队列
 */
public class StartMainApp {
	private static ThreadPoolManager tpm = new ThreadPoolManager();

	public static void main(String[] args) {
		ReceivedQueue receivedQueue = new UserReceivedQueue();
		for(int i =0;i < 4;i++) {
			tpm.execute(new Processor(receivedQueue, 0));
		}
	}
}

[该贴被xysniper于2009-12-12 10:59修改过]

banq
2009-12-12 11:36
insToDb()似乎应该这样写:

rwLockDb.writeLock().lock();
//将insertMap拷贝出来。
List list = insertMap.values();
insertMap.clear();//然后清除原LIST,对共享资源操作。
rwLockDb.writeLock().unlock();

//数据库操作放到读写锁外面,不是对共享资源操作不要放这里
 testDao.batchInsert((list);			

<p>

为了不影响当前对List操作性能,上面batchInsert可以放到另外一台机器运行,如果是双核4G,可以搞两个JVM,让另外一个JVM运行,分担负载。

[该贴被banq于2009-12-12 11:39修改过]

xysniper
2009-12-12 12:37
嗯,是我发贴发错了,程序原本是这样的:

public void insToDb() {
   rwLockDb.writeLock().lock();
   try {
       testDao.batchInert(new ArrayList(insertMap.values()));
   } finally {
      insertMap.clear();
      rwLockDb.writeLock().unlock();
   }
}
<p>

另外,testDao.batchInsert(list);还真要放在读写锁之内,因为要和前面的testDao.getUser(uservo.getMsisdn())同步,这里数据表是共享资源,否则表中就会插入重复的用户号码

[该贴被xysniper于2009-12-12 12:39修改过]

banq
2009-12-12 12:42
2009年12月12日 12:37 "xysniper"的内容

另外,testDao.batchInsert(list);还真要放在读写锁之内,因为要和前面的testDao.getUser(uservo.getMsisdn())同步,这里数据表是共享资源,否则表中就会插入重复的用户号码

[该贴被xysniper于2009-12-12 12:39修改过]

你这个数据表共享资源和List共享资源是两个不同性质,读写锁只能锁定一个共享资源,就是List这个共享资源,凡是和List共享资源无关的都没用。

象你现在放在里面,只会拖延List的写锁时间,导致读比较忙。

数据表共享资源由数据库锁或事务来实现。

xysniper
2009-12-12 12:46
2009年12月12日 11:36 "banq"的内容
为了不影响当前对List操作性能,上面batchInsert可以放到另外一台机器运行,如果是双核4G,可以搞两个JVM,让另外一个JVM运行,分担负载。[该贴被banq于2009-12-12 11:39修改过]

如果能这样,最好了,但是公司不会采用这么复杂的部署及设计,他们要考虑业务变化性及公司成本,还有将来的维护,部署复杂意味着将来维护难呀

xysniper
2009-12-12 12:55
2009年12月12日 12:42 "banq"的内容
你这个数据表共享资源和List共享资源是两个不同性质,读写锁只能锁定一个共享资源,就是List这个共享资源,凡是和List共享资源无关的都没用。象你现在放在里面,只会拖延List的写锁时间,导致读比较忙。数据表共享资源由数据库锁或事务来实现。

嗯,但是比如线程A调用了testDao.getUser(uservo.getMsisdn())时,得到的UserVO为空,但还没有来得急插入insertList,而此时正好线程B将此用户插入到数据库中,线程A又再将此用户插入insertList中,insertList后面再次入库不就造成重复用户号码吗?所以把addToList()用read lock住,insToDb()用write lock住

banq
2009-12-12 13:13
2009年12月12日 12:55 "xysniper"的内容
insertList后面再次入库不就造成重复用户号码吗?所以把addToList()用read lock住,insToDb()用write lock住

这里锁住没有效果,在你再次入库时还需要检查一下,或者设定唯一键,从数据库方面想办法。

xysniper
2009-12-12 13:32
2009年12月12日 13:13 "banq"的内容
这里锁住没有效果,在你再次入库时还需要检查一下,或者设定唯一键,从数据库方面想办法。

为什么,readLock和writeLock本身是互斥的呀,读的时候不能写,写的时候不能读。如果入库时,再一个个拿出来去查数据库,那就更慢了,因为我发现,testDao.getUser(uservo.getMsisdn())很慢,查询一次比批量插入5000还要慢呀(msisdn已经设了索引),请参见

user表:

user_id 主键,这里没有用

msisdn mobile number

name user name

usertype user type

userscptype user scp type

userstat user state

updatedate profile date

......

如果把msisdn设为唯一键,那么就会有很多无效的数据被提交到数据库,造成无畏的时间浪费和性能浪费,所以我的想法是在前面就把无用数据过滤掉,保证到数据库时都是有效需要入库的

[该贴被xysniper于2009-12-12 13:38修改过]

itian
2009-12-13 12:42
大致看了一下,你应该是想节省数据库操作,想自己实现一个唯一键+定时批量插入,我觉得也许你考虑得太多了,就像ban说的数据库上面弄个唯一就好了,也不需要考虑数据量到达某个数量时才插入,这会带来很多问题,你说的线程是一个,还有如果你的insertList现在size是4000,还没到达5000没有插入到数据库,服务器重启了怎么办,4000个还没插入的数据难道还要自己再实现一个持久化?对于数据库,我的领悟是,宁可执行100条简单语句,也不执行1条复杂语句

猜你喜欢
2Go 1 2 下一页