首先,你永远不应该直接使用wait, notify, 或notifyAll(除非你有充分的理由)。如果您需要执行多线程操作,请使用并发库。
使用 wait/notify/notifyAll 构建生产者/消费者是在谈论 Java 中的多线程时可能会遇到的面试问题之一。
public class Main { public static void main(String args) { Executor executor = Executors.newFixedThreadPool(2);
Queue<String> broker = new LinkedList<>();
Producer producer = new Producer(broker); Consumer consumer = new Consumer(broker);
executor.execute(consumer); executor.execute(producer); } }
public class Producer implements Runnable{ private final int MAX_SIZE = 2; private final Queue<String> broker;
public Producer(Queue<String> broker) { this.broker = broker; }
@Override public void run() { while(true){ System.out.println("Producer thread waiting for lock..."); synchronized (broker) { System.out.println("Producer acquired lock..."); while(broker.size() == MAX_SIZE) { try { System.out.println("Producer waiting"); broker.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } }
String uuid = UUID.randomUUID().toString(); broker.add(uuid); System.out.println("Producer produced string" + uuid); try { Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } broker.notifyAll(); } } } }
public class Consumer implements Runnable { private final Queue<String> broker;
public Consumer(Queue<String> broker) { this.broker = broker; }
@Override public void run() { while(true){ System.out.println("Consumer thread waiting for lock..."); synchronized (broker) { System.out.println("Consumer acquired lock..."); while(broker.isEmpty()) { try { System.out.println("Consumer waiting"); broker.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } }
String took = broker.remove(); System.out.println("Consumer got string " + took); try { Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } broker.notifyAll(); } } } }
|
如果你要运行这个小程序,你最终会得到这样的输出:
Consumer thread waiting for lock... Consumer acquired lock... Producer thread waiting for lock... Consumer waiting Producer acquired lock... Producer produced stringa2975d33-a6dd-4692-9988-3868df02daa1 Producer thread waiting for lock... Consumer got string a2975d33-a6dd-4692-9988-3868df02daa1 Consumer thread waiting for lock... Producer acquired lock... Producer produced stringac36bede-779a-42bb-b98d-35e8b99a4035
|
除非停止,否则它将永远持续下去。
让我们再次运行它,输出却不同的:
Producer thread waiting for lock... Producer acquired lock... Consumer thread waiting for lock... Producer produced string574b4875-0dc8-4f51-b2b4-e80ffbd31bec Producer thread waiting for lock... Consumer acquired lock... Consumer got string 574b4875-0dc8-4f51-b2b4-e80ffbd31bec Consumer thread waiting for lock... Producer acquired lock... Producer produced string6a1e3ac7-f606-4520-9afe-9805091a535d Producer thread waiting for lock... Consumer acquired lock...
|
代码是一样的,输出是不同的。
这引出了下一点:Java Thread Scheduler。
我们首先讨论不同类型的线程。
线程的类型本身不应该对线程调度Thread Scheduler的执行产生任何影响。
- 平台线程:JVM 将每个 java 线程与唯一的平台线程(也称为本机线程)相关联。java 线程与本机线程的关联在 java 线程的生命周期内是持久且稳定的。平台线程更加资源密集,更适合长时间运行的进程。这就是我在这里使用的,也是 java 开箱即用的,除非另有说明。
- 虚拟线程(或绿色线程)完全由 JVM 管理,虽然它们实际上使用平台线程来运行,但很少是 1:1 的比例。理论上,单个 JVM 可以支持数百万个虚拟线程。它们适合短时间运行,而不是 CPU 密集型任务。它们使用的资源比平台线程少得多,并且创建开销也更少。
当谈到线程调度Scheduler时,有两个决定因素:优先级和到达时间。
优先级是一个范围从 1 到 10 的整数值。数字越大,优先级越高。该类Thread还定义了 3 个常量,其值为 1、5 和 10,分别与MIN_PRIORITY, NORM_PRIORITY、 和关联MAX_PRIORITY。默认情况下,除非另有指定,否则每个新线程的优先级均为 5。在我们的例子中,我们没有指定值,因此默认优先级是NORM_PRIORITY。
到达时间几乎与线程调度Thread Scheduler有关。
如果两个线程具有相同的优先级,则到底为哪个线程首先进入该"到达时间"。这可以是这两个线程中的任何一个。你可能会认为因为我按照这个顺序编写了代码:
executor.execute(producer); executor.execute(consumer);
|
上述代码线程producer将首先运行。
execute使用参数调用的代码producer将首先运行,是的,但这并不意味着Producer线程将立即运行。
当Runnable实例被传递给execute任务的方法时,Executor任务将在将来的某个时候运行。“有时”故意含糊其辞,因为您无法保证Runnable状态时间。
使用Thread类也不会给你带来更好的结果。线程调度程序很少关心线程到达的形式。
您可以通过多次运行代码来亲自尝试,您会发现任何人都可以猜测哪个线程将首先启动。
如果您想保证一个线程在另一个线程之前运行,您需要使用同步器,在这种情况下,CountDownLatch会很好地工作。对于我们的代码来说,哪个线程先启动并不重要,因为我们有条件使该线程先启动,所以如果消费者先启动,消费者将等待生产者。
再次查看代码,您可能会注意到我们使用 aQueue作为代理,并且还使用它作为监视器对象。请记住,锁定是在您共享的对象上完成的,而不是在线程本身上完成的。
只能保证锁定当前类,但会让
承受多线程带来的各种痛苦。事实上,你很快就会遇到 IllegalMonitorStateException 异常,因为你试图在线程不拥有的监视器上等待wait 或通知所有notifyAll 监视器。
简而言之,我们需要锁定我们共享的对象。你可以把它想象成两个(或更多)人共享一个房间的一把钥匙,如果你有钥匙,你进入房间,锁上门,做任何事,开锁,离开房间,锁上门,然后把钥匙递给前台(线程调度员),由前台决定下一个拿到钥匙的人。
监控器并不一定充当锁中介。任何共享对象都可以充当监视器(只是让监视器充当代理非常方便)。
让我们修改代码,锁定另一个对象。为了清晰起见,我删除了部分代码。
public class Main { public static void main(String args) throws InterruptedException { ... Object lock = new Object(); // new monitor here
Producer producer = new Producer(broker, lock); Consumer consumer = new Consumer(broker, lock); ... } } public class Producer implements Runnable{ @Override public void run() { while(true){ synchronized (lock) { // synchronized on the new monitor while(broker.size() == MAX_SIZE) { lock.wait(); // wait on the new monitor } broker.add(uuid); lock.notifyAll(); // don't forget to notify too } } } }
public class Consumer implements Runnable { @Override public void run() { while(true){ synchronized (lock) { while(broker.isEmpty()) { lock.wait(); // wait on the new monitor }
broker.remove(); lock.notifyAll(); // use the new monitor } } } }
运行输出: Producer thread waiting for lock... Producer acquired lock... Consumer thread waiting for lock... Producer produced string2e63f16b-af44-46b3-8733-32c6ae886547 Producer thread waiting for lock... Consumer acquired lock... ...
|
反复运行因此,几乎没有什么变化。
你只需记住在新锁上调用 wait 和 notifyAll,而不是中介。
你也可以只使用普通的锁,但这样就失去了使用 wait 和 notifyAll 的意义。
虚假唤醒
接下来你会注意到这段有趣的代码Consumer
while ( broker.isEmpty ( ))
这一个是为了Producer:
while ( broker.size () == MAX_SIZE ) // MAX_SIZE = 1
为什么不直接在这里使用if?
原因是虚假唤醒。
这实际上在wait文档中也提到了。
虚假唤醒:
这是因为在一个条件发出信号到线程最终运行之间,条件可能会因为其他线程的运行而改变。另一个原因是所谓的 "虚假唤醒",即在该线程唤醒后,另一个线程开始运行,并拿走了该线程正在等待的东西,因此该线程必须回到等待状态。
虚假唤醒不仅与 Java 相关,Linuxpthread_cond_wait功能有时也会导致虚假唤醒。
Windows 也不是更好,所以不要以为自己是安全的。Windows 上的并发编程
要点是:循环检查
notifyAll
notify 和 notifyAll 的区别在于,notify 会唤醒监视器上等待的随机线程,而 notifyAll 会唤醒监视器上等待的所有线程。
在这个特殊的例子中,notify 可以正常工作,因为只有一个其他线程在监视器上等待,所以该线程不可能因为是唯一等待通知的线程而不被唤醒。
但在不确定的情况下,最好还是使用 notifyAll。
最后,让我们看看代码实际运行的一步步流程,这样会更容易理解。
假设我们运行代码,而生产者赢得了先运行的比赛。
我将再次复制粘贴代码,但只保留重要部分。