并发主题

ExecutorService - 10个技巧和窍门

 ExecutorService已经成为Java并发编程中常用的基础库,几乎所有到线程 任务等执行都要委托ExecutorService。下面是使用过程中10个技巧和窍门。

1.为线程池和线程取名

  当我们在调试程序将JVM中线程导出Dump时,会出现pool-N-thread-M这样的提示,这是缺省的线程池名称,其中N代表池的序列号,每次你创建一个新的线程池,这个N数字就增加1;而M是这个线程池中的线程顺序。举例, pool-2-thread-3 表示在第二个线程池中的第三个显式,JDK将这种命名策略封装在ThreadFactory,Guava可以帮助你方便命名:

import com.google.common.util.concurrent.ThreadFactoryBuilder;

 

final ThreadFactory threadFactory = new ThreadFactoryBuilder()

        .setNameFormat("Orders-%d")

        .setDaemon(true)

        .build();

final ExecutorService executorService = Executors.newFixedThreadPool(10, threadFactory);

 

2. 按照上下文切换名称

  一旦我们记住线程的名称,我们就可以在运行时改变它们,这是有意义的,因为线程dump值显示类和方法名称,没有参数和本地变量。通过调整线程名称能实现事务标识,以便我们方便跟踪具体的消息 记录或查询,找出哪个是引起死锁的原因,如下:

private void process(String messageId) {

    executorService.submit(() -> {

        final Thread currentThread = Thread.currentThread();

        final String oldName = currentThread.getName();

        currentThread.setName("Processing-" + messageId);

        try {

            //real logic here...

        } finally {

            currentThread.setName(oldName);

        }

    });

}

  在try-finally中线程被取名为Processing-消息ID,这样我们就能跟踪是哪个消息流经系统。

3.明确的和安全的关闭

  客户端线程和线程池之间有一个任务队列。 当应用程序关闭时,你必须注意两件事:在排队任务发生了什么?以及已经运行的任务具体行为如何(稍后详细介绍)。 令人惊讶的是许多开发人员都不能或有意识地正常关闭线程池。 有两种方法:要么让所有队列任务执行( shutdown() )或使用(shutdownNow()立即关闭 )——这完全取决于你的用例。 例如如果我们提交了一堆任务,希望尽快返回全部完成后使用 shutdown() :

private void sendAllEmails(List<String> emails) throws InterruptedException {

    emails.forEach(email ->

            executorService.submit(() ->

                    sendEmail(email)));

    executorService.shutdown();

    final boolean done = executorService.awaitTermination(1, TimeUnit.MINUTES);

    log.debug("All e-mails were sent so far? {}", done);

}

  这种情况下我们发送大量电子邮件,每个作为在线程池中一个单独的任务。 这些任务提交后,我们关闭池不再接受任何新的任务。 然后我们等待最多一分钟,直到完成所有这些任务。 然而,如果某些任务仍悬而未决,,awaitTermination() 只会返回 假 。 ,等待任务将继续进行处理。

final List<Runnable> rejected = executorService.shutdownNow();
log.debug("Rejected tasks: {}", rejected.size());

  这次所有队列中任务将被取消返回,而已经运行的任务将允许继续。

4. 小心处理中断

  • 当线程堵塞在某个带有InterruptedException的方法时,你可以在这个线程中调用Thread.interrupt(),大多数堵塞的方法将立即抛出InterruptedException.
  • 如果你将任务提交给线程池(ExecutorService.submit()),当认为已经在执行时,你能调用Future.cancel(true),在这种情况下线程池将试图中断线程运行你的任务,你能很有效率的中断你的任务。

 

5. 监控队列和保持长度有限

  不正确大小的线程池可能会导致缓慢、不稳定和内存泄漏。 如果您配置线程太少,将建立队列消耗大量的内存。 另一方面,太多的线程由于过度上下文切换会减慢整个系统,导致相同的症状。 查看队列深度很重要,保持它有界,以便超负荷的线程池可以暂时拒绝新任务:

final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);

executorService = new ThreadPoolExecutor(n, n,

        0L, TimeUnit.MILLISECONDS,

        queue);

 

  上面代码等同于Executors.newFixedThreadPool(n),但是我们使用有固定容量100的ArrayBlockQueue替代了缺省是无限的LinkedBlockingQueue,这意味着如果有100个任务在队列中,n代表被执行,新的任务将被拒绝加入抛出RejectedExecutionException, 因为队列现在置于外部,我们可以定期地调用其size()放入logs/JMX等监控机制中。

6. 记得Exception处理

  下面这段代码会是什么结果?

executorService.submit(() -> {
System.out.println(1 / 0);
});

  它不会打印任何东西。也没有抛出除0的错误,线程池自己吞进了这个exception,好像从来没有发生一样,这是线程池与普通线程的区别,如果你提交一个Runnable,你必须自己使用try-catch环抱方法体,至少记录日志,如果你提交Callable确保你总是能解除引用,使用堵塞get()到re-throw exception:

final Future<Integer> division = executorService.submit(() -> 1 / 0);
//下面将抛出由除零ArithmeticException引起的ExecutionException
division.get();

  有趣的是Spring框架的@Async也有这个bug.

7.监控队列中等待时间

  监控工作队列深度是一方面。 然而,当单一事务/任务发生故障排除时,能看到提交任务和实际执行之间有多少时间是值得。 这个时间最好是应该接近0(当有空闲线程池),、然而队列中噢乖有任务必须排队时。 而且如果没有固定数量的线程池,运行新任务可能需要生成线程,这也会消耗些短的时间。 为了清晰地监控这一指标,包装原始 ExecutorService 类似这样:

public class WaitTimeMonitoringExecutorService implements ExecutorService {

 

    private final ExecutorService target;

 

    public WaitTimeMonitoringExecutorService(ExecutorService target) {

        this.target = target;

    }

 

    @Override

    public <T> Future<T> submit(Callable<T> task) {

        final long startTime = System.currentTimeMillis();

        return target.submit(() -> {

                    final long queueDuration = System.currentTimeMillis() - startTime;

                    log.debug("Task {} spent {}ms in queue", task, queueDuration);

                    return task.call();

                }

        );

    }

 

    @Override

    public <T> Future<T> submit(Runnable task, T result) {

        return submit(() -> {

            task.run();

            return result;

        });

    }

 

    @Override

    public Future<?> submit(Runnable task) {

        return submit(new Callable<Void>() {

            @Override

            public Void call() throws Exception {

                task.run();

                return null;

            }

        });

    }

 

    //...

 

}

  这不是一个完整的实现,但可以体现基本的概念。 当我们提交一个任务给线程池,我们立即开始测量时间。 我们的任务是尽快被线程池捡起并开始执行。 不要以为startTime 和 queueDuration不重要,事实上这两行是在不同的线程不同的,可能毫秒甚至秒开或分开,例如:

Task com.nurkiewicz.MyTask@7c7f3894 spent 9883ms in queue

8. 保存客户端跟踪堆栈

  反应性编程这些天似乎得到了大量的关注,. Reactive manifestoreactive streamsRxJava (just released 1.0!),Clojure agentsscala.rx… 他们都工作的很好,但堆栈跟踪不再是你的朋友,他们是最无用的。 比如一个例外发生在任务提交给线程池:

java.lang.NullPointerException: null

    at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na]

    at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na]

    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0]

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0]

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0]

    at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]

  我们可以很容易发现 mytask 在第76行抛出NPE错误。 但是我们不知道谁提交了这个任务,因为堆栈跟踪只显示Thread 和ThreadPoolExecutor 。 在技术上我们可以浏览源代码的希望能找到 mytask 被创建的地方。 但是我们不能立即看到线程全貌(更不用说event-drivent事件驱动或 actor-ninja-编程了)。 如果我们可以保留客户端代码(提交的任务)的堆栈跟踪并显示它,例如在失败时显示,那就是一个好办法,当然这并不是一个新想法, Hazelcast  从所有者节点到客户机传播异常代码 中也是这么做的。自然地保存客户端堆栈可以跟踪错误:

public class ExecutorServiceWithClientTrace implements ExecutorService {

 

    protected final ExecutorService target;

 

    public ExecutorServiceWithClientTrace(ExecutorService target) {

        this.target = target;

    }

 

    @Override

    public <T> Future<T> submit(Callable<T> task) {

        return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));

    }

 

    private <T> Callable<T> wrap(final Callable<T> task, final Exception clientStack, String clientThreadName) {

        return () -> {

            try {

                return task.call();

            } catch (Exception e) {

                log.error("Exception {} in task submitted from thrad {} here:", e, clientThreadName, clientStack);

                throw e;

            }

        };

    }

 

    private Exception clientTrace() {

        return new Exception("Client stack trace");

    }

 

    @Override

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {

        return tasks.stream().map(this::submit).collect(toList());

    }

 

    //...

 

}

这次失败的情况下我们将检索完整的堆栈跟踪和线程的名字提交任务的地方:

Exception java.lang.NullPointerException in task submitted from thrad main here:

java.lang.Exception: Client stack trace

    at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na]

    at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na]

    at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na]

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0]

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0]

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0]

    at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0]

    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]

 

9.倾向使用CompletableFuture

  在Java 8引入了更强大 CompletableFuture 。 请尽可能使用它。 ExecutorService 没有扩展到支持这个增强的抽象,所以你必须自己使用。 下面是没有使用:

final Future<BigDecimal> future = executorService.submit(this::calculate);

使用后效果:

final CompletableFuture<BigDecimal> future = CompletableFuture.supplyAsync(this::calculate, executorService);

  虽然CompletableFuture拓展了Future,但更高级的API的消费者会真正欣赏这个扩展功能。

 

10. 同步队列

  SynchronousQueue 是一个有趣的 BlockingQueue,但不是真正的队列。 它甚至不是一个数据结构 。 最好是解释为一个容量0的队列。 引用JavaDoc:

每一个 插入 操作必须等待由另一个线程 删除,反之亦然。 同步队列没有任何内部容量,甚至一个也没有。 你不能插入任何一个元素(使用任何方法),除非有另一个线程正在试图删除它,你不能没有迭代遍历。 […]

  同步队列类似CSP和Ada的通道。

  这和线程池是什么关系,看看下面使用同步队列实现的线程池:

BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(n, n,
  0L, TimeUnit.MILLISECONDS,
  queue);

  当创建带有两个线程的线程池,因为SynchronousQueue 是一个容量为0的队列,因此线程池只有在有空闲线程的情况下接受新任务,如果所有线程很忙,将拒绝新任务,不会有任何等待,这种模式适合后台启动任务需要要么立即执行要么取消不能执行。

Java8的CompletableFuture

fork/join

性能专题

 

猜你喜欢