多线程锁的问题

需求:有两个方法:put()与get(),多个线程都会调用put和get,要求put与get同步,put自己也要同步
问题:请问使用ReentrantReadWriteLock可以解决吗?

可以,当然前提是put操作一般并发可能性小,主要以读为主。

其实,我的意思是说方法A和B同步,A还要和C同步,能实现吗?如下的代码是否会有问题?
如果C方法同步对的话,那么B和C相当于用同一个WL锁,岂不是执行B时,C要等,执行C时B要等,如何做到B和C异步?


public class Test extends Thread {
private static final ReentrantReadWriteLock rrwl = new ReentrantReadWriteLock();
private final Lock rl = rrwl.readLock();
private final Lock wl = rrwl.writeLock();

public static void main(String[] args) {
new Test().start();
new Test().start();
new Test().start();
new Test().start();
new Test().start();
new Test().start();
}
public void A() {
rl.lock();
...
rl.unlock();
}

public void B() {
wl.lock();
...
wl.unlock();
}

/**
* 这里的实现对吗?
*/

public void C() {
wl.lock();
...
wl.unlock();
}

}


[该贴被xysniper于2009-12-11 15:54修改过]

To banq
可能我没有说清楚问题,
需求:有一个队列,里面是一些用户号码信息,其中包含多个相同的号码,但入库时只保留最近一个号码,其余号码不入库,多个线程不断地从此队列中拿出数据,查库后,如果此号码没有在表中,就得插入到insertList中,如果有就插入到updateList,最后判断如果这两个list的size大于一个数,就执行batchInsert and batchUpdate到数据库。
问题:为了访止库中插入重复用户号码,需要在查询库操作和入库操作之间同步;为了访止队列中重复的号码被提交到数据库,需要使用map储存,过滤掉旧的相同号码。
程序清单:见下面

[该贴被xysniper于2009-12-12 11:06修改过]

程序清单:


/**
* 该程序测试成功,数据库中没有重复数据,队列中也没有丢数据
*/

public class Processor extends Thread {
private TestDao testDao;
private ReceivedQueue receivedQueue;
private static final ReentrantReadWriteLock rwLockCache = new ReentrantReadWriteLock();
private static final ReentrantReadWriteLock rwLockDb = new ReentrantReadWriteLock();
private static Map<String, UserVO> insertMap = new HashMap<String, UserVO>();
private static Map<String, UserVO> updateMap = new HashMap<String, UserVO>();

public Processor(ReceivedQueue receivedQueue, int dbTarget) {
this.receivedQueue = receivedQueue;
DaoManager daoManager = DaoConfig.getDaoManager(dbTarget);
testDao = (TestDao)daoManager.getDao(TestDao.class);
}

public void run() {
while(true) {
UserVO testvo = receivedQueue.dequeue();
if(testvo != null) {
addToList(testvo);

if(insertMap.size() > 5000) {
insToDb();
}
if(updateMap.size() > 5000) {
updeToDb();
}
} else {
if(insertMap.size() > 0) {
insToDb();
}
if(updateMap.size() > 0) {
updeToDb();
}
if(receivedQueue.isStop() && receivedQueue.isEmpty()) {
break;
}
}
}
}

public void addToList(UserVO uservo) {
rwLockDb.readLock().lock();
try {
UserVO user = testDao.getUser(uservo.getMsisdn());
if(user == null) {
addToInsList(uservo);
} else {
addToUpdeList(uservo);
}
} finally {
rwLockDb.readLock().unlock();
}
}
public void insToDb() {
rwLockDb.writeLock().lock();
try {
testDao.batchInsert((List<UserVO>)insertMap.values());
} finally {
insertMap.clear();
rwLockDb.writeLock().unlock();
}
}
public void updeToDb() {
rwLockDb.writeLock().lock();
try {
testDao.batchUpdate((List<UserVO>)updateMap.values());
} finally {
updateMap.clear();
rwLockDb.writeLock().unlock();
}
}
public void addToInsList(UserVO user) {
wrLockCache.writeLock().lock();
try {
UserVO uv = insertMap.get(user.getMsisdn());
if(uv!=null) {
if(user.getUpdateDate().after(uv.getUpdateDate())) {
uv.put(user.getMsisdn(), user);
}
} else {
uv.put(user.getMsisdn(), user);
}
} finally {
wrLockCache.writeLock().unlock();
}
}

public void addToUpdeList(UserVO user) {
wrLockCache.writeLock().lock();
try {
UserVO uv = updateMap.get(user.getMsisdn());
if(uv!=null) {
if(user.getUpdateDate().after(uv.getUpdateDate())) {
uv.put(user.getMsisdn(), user);
}
} else {
uv.put(user.getMsisdn(), user);
}
} finally {
wrLockCache.writeLock().unlock();
}
}
}

[该贴被xysniper于2009-12-12 11:08修改过]

请banq和各位看看,我这个需求使用读写锁是否合理,其实上我要不断地读取数据库(即代码testDao.getUser(uservo.getMsisdn()))将数据分发到两个List中去,还要不断要将List中的数据插入或更新到数据库中,读数据库相当于read操作,存库相当于write操作,read和write操作都很频繁,但是因为我insertList和updateList中的数只有达到一定数量时才批量入库,所以我想让多个线程同时读,相当于读要并发,这样快些,因此使用ReentrantReadWriteLock的read锁,而入库使用ReentrantReadWriteLock的write锁,防止write和read冲突


/*
* 这是启动程序,另外有一个job定时去解决文件,将解析出的数据不断地调用receivedQueue.enqueue(UserVO uservo)压入接受队列
*/

public class StartMainApp {
private static ThreadPoolManager tpm = new ThreadPoolManager();

public static void main(String[] args) {
ReceivedQueue receivedQueue = new UserReceivedQueue();
for(int i =0;i < 4;i++) {
tpm.execute(new Processor(receivedQueue, 0));
}
}
}


[该贴被xysniper于2009-12-12 10:59修改过]

insToDb()似乎应该这样写:


rwLockDb.writeLock().lock();
//将insertMap拷贝出来。
List list = insertMap.values();
insertMap.clear();
//然后清除原LIST,对共享资源操作。
rwLockDb.writeLock().unlock();

//数据库操作放到读写锁外面,不是对共享资源操作不要放这里
testDao.batchInsert((list);

为了不影响当前对List操作性能,上面batchInsert可以放到另外一台机器运行,如果是双核4G,可以搞两个JVM,让另外一个JVM运行,分担负载。

[该贴被banq于2009-12-12 11:39修改过]

嗯,是我发贴发错了,程序原本是这样的:


public void insToDb() {
rwLockDb.writeLock().lock();
try {
testDao.batchInert(new ArrayList(insertMap.values()));
} finally {
insertMap.clear();
rwLockDb.writeLock().unlock();
}
}

另外,testDao.batchInsert(list);还真要放在读写锁之内,因为要和前面的testDao.getUser(uservo.getMsisdn())同步,这里数据表是共享资源,否则表中就会插入重复的用户号码
[该贴被xysniper于2009-12-12 12:39修改过]

2009年12月12日 12:37 "xysniper"的内容

另外,testDao.batchInsert(list);还真要放在读写锁之内,因为要和前面的testDao.getUser(uservo.getMsisdn())同步,这里数据表是共享资源,否则表中就会插入重复的用户号码
[该贴被xysniper于2009-12-12 12:39修改过]

你这个数据表共享资源和List共享资源是两个不同性质,读写锁只能锁定一个共享资源,就是List这个共享资源,凡是和List共享资源无关的都没用。

象你现在放在里面,只会拖延List的写锁时间,导致读比较忙。

数据表共享资源由数据库锁或事务来实现。

2009年12月12日 11:36 "banq"的内容
为了不影响当前对List操作性能,上面batchInsert可以放到另外一台机器运行,如果是双核4G,可以搞两个JVM,让另外一个JVM运行,分担负载。[该贴被banq于2009-12-12 11:39修改过]

如果能这样,最好了,但是公司不会采用这么复杂的部署及设计,他们要考虑业务变化性及公司成本,还有将来的维护,部署复杂意味着将来维护难呀

2009年12月12日 12:42 "banq"的内容
你这个数据表共享资源和List共享资源是两个不同性质,读写锁只能锁定一个共享资源,就是List这个共享资源,凡是和List共享资源无关的都没用。象你现在放在里面,只会拖延List的写锁时间,导致读比较忙。数据表共享资源由数据库锁或事务来实现。

嗯,但是比如线程A调用了testDao.getUser(uservo.getMsisdn())时,得到的UserVO为空,但还没有来得急插入insertList,而此时正好线程B将此用户插入到数据库中,线程A又再将此用户插入insertList中,insertList后面再次入库不就造成重复用户号码吗?所以把addToList()用read lock住,insToDb()用write lock住

2009年12月12日 12:55 "xysniper"的内容
insertList后面再次入库不就造成重复用户号码吗?所以把addToList()用read lock住,insToDb()用write lock住

这里锁住没有效果,在你再次入库时还需要检查一下,或者设定唯一键,从数据库方面想办法。

2009年12月12日 13:13 "banq"的内容
这里锁住没有效果,在你再次入库时还需要检查一下,或者设定唯一键,从数据库方面想办法。

为什么,readLock和writeLock本身是互斥的呀,读的时候不能写,写的时候不能读。如果入库时,再一个个拿出来去查数据库,那就更慢了,因为我发现,testDao.getUser(uservo.getMsisdn())很慢,查询一次比批量插入5000还要慢呀(msisdn已经设了索引),请参见
user表:
user_id 主键,这里没有用
msisdn mobile number
name user name
usertype user type
userscptype user scp type
userstat user state
updatedate profile date
......
如果把msisdn设为唯一键,那么就会有很多无效的数据被提交到数据库,造成无畏的时间浪费和性能浪费,所以我的想法是在前面就把无用数据过滤掉,保证到数据库时都是有效需要入库的

[该贴被xysniper于2009-12-12 13:38修改过]

大致看了一下,你应该是想节省数据库操作,想自己实现一个唯一键+定时批量插入,我觉得也许你考虑得太多了,就像ban说的数据库上面弄个唯一就好了,也不需要考虑数据量到达某个数量时才插入,这会带来很多问题,你说的线程是一个,还有如果你的insertList现在size是4000,还没到达5000没有插入到数据库,服务器重启了怎么办,4000个还没插入的数据难道还要自己再实现一个持久化?对于数据库,我的领悟是,宁可执行100条简单语句,也不执行1条复杂语句