线程池模式(Thread Pool Pattern)


目的
通常情况下,要执行的任务是短期的,并且任务数量很大。为每个任务创建一个新线程会使系统花费更多时间来创建和销毁线程,而不是执行实际任务。线程池通过重用现有线程并消除创建新线程的延迟来解决此问题。

说明
 线程池模式是指创建多个线程来执行多个任务,这些任务通常组织在一个队列中。正在执行的任务的结果也可能放在队列中,或者任务可能没有返回任何结果。通常,任务比线程多。一旦一个线程完成了它的任务,它就会从队列中请求下一个任务,直到所有任务都完成为止。然后线程可以终止或休眠,直到有新任务可用。

性能
线程池的大小是为执行任务而保留的线程数。它通常是应用程序的可调参数,经过调整以优化程序性能。
线程池为每个任务创建新线程的主要好处是线程创建和销毁开销仅限于池的初始创建,这可能会带来更好的性能和更好的系统稳定性。创建和销毁线程及其相关资源在时间上是一个昂贵的过程。但是,保留的过多线程会浪费内存,并且可运行线程之间的上下文切换会产生调用时的性能损失。 

用于确定何时创建或销毁线程的算法会影响总体性能:

  • 创建太多线程会浪费资源与时间。

  • 在再次创建它们时,销毁太多线程需要更多时间。

  • 创建线程太慢可能会导致客户端性能不佳(等待时间过长)。

  • 破坏线程太慢可能会使其他资源进程匮乏。

源代码
在这个例子中,我们创建了一个任务列表,列出了要完成的工作。然后将每个任务包装到实现Runnable接口的Worker对象中。我们使用固定数量的线程(线程池)创建一个ExecutorService,并使用它们来执行工作线程。

类图

让我们实现上面类图的源代码。
第1步:为任务创建一个抽象基类。

public abstract class Task {

  private static final AtomicInteger ID_GENERATOR = new AtomicInteger();

  private final int id;
  private final int timeMs;

  public Task(final int timeMs) {
    this.id = ID_GENERATOR.incrementAndGet();
    this.timeMs = timeMs;
  }

  public int getId() {
    return id;
  }

  public int getTimeMs() {
    return timeMs;
  }

  @Override
  public String toString() {
    return String.format("id=%d timeMs=%d", id, timeMs);
  }
}

第2步:CoffeeMakingTask是一项具体任务。

public class CoffeeMakingTask extends Task {

  private static final int TIME_PER_CUP = 100;

  public CoffeeMakingTask(int numCups) {
    super(numCups * TIME_PER_CUP);
  }

  @Override
  public String toString() {
    return String.format("%s %s", this.getClass().getSimpleName(), super.toString());
  }
}

第3步:PotatoPeelingTask是一项具体任务。

public class PotatoPeelingTask extends Task {

  private static final int TIME_PER_POTATO = 200;

  public PotatoPeelingTask(int numPotatoes) {
    super(numPotatoes * TIME_PER_POTATO);
  }

  @Override
  public String toString() {
    return String.format("%s %s", this.getClass().getSimpleName(), super.toString());
  }
}

第4步:Worker实现Runnable,因此可以由ExecutorService执行。

public class Worker implements Runnable {

  private final Task task;

  public Worker(final Task task) {
    this.task = task;
  }

  @Override
  public void run() {
    System.out.println(String.format("%s processing %s", Thread.currentThread().getName(),
        task.toString()));
    try {
      Thread.sleep(task.getTimeMs());
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

第5步:让我们通过main方法测试这个模式:程序入口点。

public class App {

  /**
   * Program entry point
   * 
   * @param args command line args
   */
  public static void main(String[] args) {

    System.out.println(
"Program started");

   
// Create a list of tasks to be executed
    List<Task> tasks = new ArrayList<>();
    tasks.add(new PotatoPeelingTask(3));
    tasks.add(new PotatoPeelingTask(6));
    tasks.add(new CoffeeMakingTask(2));
    tasks.add(new CoffeeMakingTask(6));
    tasks.add(new PotatoPeelingTask(4));
    tasks.add(new CoffeeMakingTask(2));
    tasks.add(new PotatoPeelingTask(4));
    tasks.add(new CoffeeMakingTask(9));
    tasks.add(new PotatoPeelingTask(3));
    tasks.add(new CoffeeMakingTask(2));
    tasks.add(new PotatoPeelingTask(4));
    tasks.add(new CoffeeMakingTask(2));
    tasks.add(new CoffeeMakingTask(7));
    tasks.add(new PotatoPeelingTask(4));
    tasks.add(new PotatoPeelingTask(5));

   
// Creates a thread pool that reuses a fixed number of threads operating off a shared
   
// unbounded queue. At any point, at most nThreads threads will be active processing
   
// tasks. If additional tasks are submitted when all threads are active, they will wait
   
// in the queue until a thread is available.
    ExecutorService executor = Executors.newFixedThreadPool(3);

   
// Allocate new worker for each task
   
// The worker is executed when a thread becomes
   
// available in the thread pool
    for (int i = 0; i < tasks.size(); i++) {
      Runnable worker = new Worker(tasks.get(i));
      executor.execute(worker);
    }
   
// All tasks were executed, now shutdown
    executor.shutdown();
    while (!executor.isTerminated()) {
      Thread.yield();
    }
    System.out.println(
"Program finished");
  }
}

适用性
使用线程池模式时

  • 你有大量短期任务要并行执行。