目的
半同步/半异步模式将同步I/O与系统中的异步I/O分离,以简化并发编程工作,且不会降低执行效率。
将并发软件的服务分解为同步和异步两个独立的层,并添加一个排队层来协调它们之间的通信。
在单独的线程或进程中,同步处理更高级别的服务,如域功能、数据库查询或文件传输。相反,异步处理较低级别的系统服务,例如由网络硬件中断驱动的短期协议处理程序。如果同步层中的服务必须与异步层中的服务通信,则让它们通过队列层交换消息。
注意:并发软件通常同时执行异步和同步服务处理。异步是用来高效地处理低级系统服务,同步是为了简化应用程序服务处理。然而,要从这两种编程模型中获益,必须有效地协调异步和同步服务处理。
结构
执行
在下面的示例中,main方法创建一个异步服务,该任务在执行任务时不会阻塞主线程。主线程继续其工作,类似于异步方法调用模式。它们之间的区别在于异步层和同步层之间存在排队层,这允许两层之间的不同通信模式。例如,Priority Queue可用作排队层,以优先处理任务的执行方式。我们的实现只是实现此模式的一种简单方法,它的应用程序中描述了许多可能的变体。
步骤1:创建一个接口,表示异步执行的某些计算及其结果。计算通常是后台线程完成的,结果以回调的形式发回。回调没有实现isComplete,取消因为它超出了这个模式的范围。 注意这里是这个接口扩展 Callable 接口。
public interface AsyncTask<O> extends Callable<O> { /** * Is called in context of caller thread before call to {@link call()}. Large tasks should not be * performed in this method as it will block the caller thread. Small tasks such as validations * can be performed here so that the performance penalty of context switching is not incurred in * case of invalid requests. */ void onPreCall();
/** * A callback called after the result is successfully computed by {@link call()}. In our * implementation this method is called in context of background thread but in some variants, such * as Android where only UI thread can change the state of UI widgets, this method is called in * context of UI thread. */ void onPostCall(O result);
/** * A callback called if computing the task resulted in some exception. This method is called when * either of {@link call()} or {@link onPreCall()} throw any exception. * * @param throwable error cause */ void onError(Throwable throwable);
/** * This is where the computation of task should reside. This method is called in context of * background thread. */ @Override O call() throws Exception; }
|
步骤2: 这是异步层,在新请求到达时不会阻塞。它只是将请求传递给同步层,该同步层由一个队列组成,该同步层由一个队列组成,即一个BlockingQueue和一个线程池ThreadPoolExecutor。在这个工作线程池中,其中一个线程获取任务并在后台同步执行它,结果通过回调发回给调用者。
public class AsynchronousService {
/* * This represents the queuing layer as well as synchronous layer of the * pattern. The thread pool contains worker threads which execute the tasks * in blocking/synchronous manner. Long running tasks should be performed in * the background which does not affect the performance of main thread. */ private ExecutorService service;
/** * Creates an asynchronous service using {@code workQueue} as communication * channel between asynchronous layer and synchronous layer. Different types * of queues such as Priority queue, can be used to control the pattern of * communication between the layers. */ public AsynchronousService(BlockingQueue<Runnable> workQueue) { service = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, workQueue); }
/** * A non-blocking method which performs the task provided in background and * returns immediately. * <p> * On successful completion of task the result is posted back using callback * method {@link AsyncTaskonPostCall(Object)}, if task execution is unable * to complete normally due to some exception then the reason for error is * posted back using callback method {@link AsyncTaskonError(Throwable)}. * <p> * NOTE: The results are posted back in the context of background thread in * this implementation. */ public <T> void execute(final AsyncTask<T> task) { try { // some small tasks such as validation can be performed here. task.onPreCall(); } catch (Exception e) { task.onError(e); return; }
service.submit(new FutureTask<T>(task) { @Override protected void done() { super.done(); try { /* * called in context of background thread. There is other * variant possible where result is posted back and sits in * the queue of caller thread which then picks it up for * processing. An example of such a system is Android OS, * where the UI elements can only be updated using UI * thread. So result must be posted back in UI thread. */ task.onPostCall(get()); } catch (InterruptedException e) { // should not occur } catch (ExecutionException e) { task.onError(e.getCause()); } } }); } }
|
步骤3:让我们测试一下这个设计模式
public class Client{
/** * Program entry point * * @param args * command line args */ public static void main(String args) {
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(); AsynchronousService service = new AsynchronousService(workQueue); /* * A new task to calculate sum is received but as this is main thread, * it should not block. So it passes it to the asynchronous task layer * to compute and proceeds with handling other incoming requests. This * is particularly useful when main thread is waiting on Socket to * receive new incoming requests and does not wait for particular * request to be completed before responding to new request. */ service.execute(new ArithmeticSumTask(1000));
/* * New task received, lets pass that to async layer for computation. So * both requests will be executed in parallel. */ service.execute(new ArithmeticSumTask(500)); service.execute(new ArithmeticSumTask(2000)); service.execute(new ArithmeticSumTask(1)); }
/** * * ArithmeticSumTask * */ static class ArithmeticSumTask implements AsyncTask<Long> { private long n;
public ArithmeticSumTask(long n) { this.n = n; }
/* * This is the long running task that is performed in background. In our * example the long running task is calculating arithmetic sum with * artificial delay. */ @Override public Long call() throws Exception { return ap(n); }
/* * This will be called in context of the main thread where some * validations can be done regarding the inputs. Such as it must be * greater than 0. It's a small computation which can be performed in * main thread. If we did validated the input in background thread then * we pay the cost of context switching which is much more than * validating it in main thread. */ @Override public void onPreCall() { if (n < 0) { throw new IllegalArgumentException("n is less than 0"); } }
@Override public void onPostCall(Long result) { // Handle the result of computation System.out.println(result.toString()); }
@Override public void onError(Throwable throwable) { throw new IllegalStateException("Should not occur"); } }
private static long ap(long i) { try { Thread.sleep(i); } catch (InterruptedException e) { System.out.println("Exception caught." + e); } return i * (i + 1) / 2; } }
|
适用性
当系统具有以下特征时,请使用半同步/半异步模式:
- 系统必须执行响应异步发生的外部事件的任务,如OS中的硬件中断。
- 为每个外部事件源指定单独的控制线程来执行同步I/O是低效的。
- 如果同步执行I / O,则可以显著简化系统中的更高级别任务。
- 系统中的一个或多个任务必须在单个控制线程中运行,而其他任务可能受益于多线程。