半同步/半异步模式

19-04-11 jdon
                   

目的

半同步/半异步模式将同步I/O与系统中的异步I/O分离,以简化并发编程工作,且不会降低执行效率。

将并发软件的服务分解为同步和异步两个独立的层,并添加一个排队层来协调它们之间的通信。

在单独的线程或进程中,同步处理更高级别的服务,如域功能、数据库查询或文件传输。相反,异步处理较低级别的系统服务,例如由网络硬件中断驱动的短期协议处理程序。如果同步层中的服务必须与异步层中的服务通信,则让它们通过队列层交换消息。

注意:并发软件通常同时执行异步和同步服务处理。异步是用来高效地处理低级系统服务,同步是为了简化应用程序服务处理。然而,要从这两种编程模型中获益,必须有效地协调异步和同步服务处理。

结构

执行

在下面的示例中,main方法创建一个异步服务,该任务在执行任务时不会阻塞主线程。主线程继续其工作,类似于异步方法调用模式。它们之间的区别在于异步层和同步层之间存在排队层,这允许两层之间的不同通信模式。例如,Priority Queue可用作排队层,以优先处理任务的执行方式。我们的实现只是实现此模式的一种简单方法,它的应用程序中描述了许多可能的变体。

步骤1:创建一个接口,表示异步执行的某些计算及其结果。计算通常是后台线程完成的,结果以回调的形式发回。回调没有实现isComplete,取消因为它超出了这个模式的范围。 注意这里是这个接口扩展 Callable 接口。

public interface AsyncTask<O> extends Callable<O> {
  /**
   * Is called in context of caller thread before call to {@link #call()}. Large tasks should not be
   * performed in this method as it will block the caller thread. Small tasks such as validations
   * can be performed here so that the performance penalty of context switching is not incurred in
   * case of invalid requests.
   */
  void onPreCall();

  /**
   * A callback called after the result is successfully computed by {@link #call()}. In our
   * implementation this method is called in context of background thread but in some variants, such
   * as Android where only UI thread can change the state of UI widgets, this method is called in
   * context of UI thread.
   */
  void onPostCall(O result);

  /**
   * A callback called if computing the task resulted in some exception. This method is called when
   * either of {@link #call()} or {@link #onPreCall()} throw any exception.
   * 
   * @param throwable error cause
   */
  void onError(Throwable throwable);

  /**
   * This is where the computation of task should reside. This method is called in context of
   * background thread.
   */
  @Override
  O call() throws Exception;
}

步骤2:  这是异步层,在新请求到达时不会阻塞。它只是将请求传递给同步层,该同步层由一个队列组成,该同步层由一个队列组成,即一个BlockingQueue和一个线程池ThreadPoolExecutor。在这个工作线程池中,其中一个线程获取任务并在后台同步执行它,结果通过回调发回给调用者。

public class AsynchronousService {

 /*
  * This represents the queuing layer as well as synchronous layer of the
  * pattern. The thread pool contains worker threads which execute the tasks
  * in blocking/synchronous manner. Long running tasks should be performed in
  * the background which does not affect the performance of main thread.
  */
 private ExecutorService service;

 /**
  * Creates an asynchronous service using {@code workQueue} as communication
  * channel between asynchronous layer and synchronous layer. Different types
  * of queues such as Priority queue, can be used to control the pattern of
  * communication between the layers.
  */
 public AsynchronousService(BlockingQueue<Runnable> workQueue) {
  service = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS, workQueue);
 }

 /**
  * A non-blocking method which performs the task provided in background and
  * returns immediately.
  * <p>
  * On successful completion of task the result is posted back using callback
  * method {@link AsyncTask#onPostCall(Object)}, if task execution is unable
  * to complete normally due to some exception then the reason for error is
  * posted back using callback method {@link AsyncTask#onError(Throwable)}.
  * <p>
  * NOTE: The results are posted back in the context of background thread in
  * this implementation.
  */
 public <T> void execute(final AsyncTask<T> task) {
  try {
   // some small tasks such as validation can be performed here.
   task.onPreCall();
  } catch (Exception e) {
   task.onError(e);
   return;
  }

  service.submit(new FutureTask<T>(task) {
   @Override
   protected void done() {
    super.done();
    try {
     /*
      * called in context of background thread. There is other
      * variant possible where result is posted back and sits in
      * the queue of caller thread which then picks it up for
      * processing. An example of such a system is Android OS,
      * where the UI elements can only be updated using UI
      * thread. So result must be posted back in UI thread.
      */
     task.onPostCall(get());
    } catch (InterruptedException e) {
     // should not occur
    } catch (ExecutionException e) {
     task.onError(e.getCause());
    }
   }
  });
 }
}

步骤3:让我们测试一下这个设计模式 

public class Client{

 /**
  * Program entry point
  * 
  * @param args
  *            command line args
  */
 public static void main(String args) {

  BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
  AsynchronousService service = new AsynchronousService(workQueue);
  /*
   * A new task to calculate sum is received but as this is main thread,
   * it should not block. So it passes it to the asynchronous task layer
   * to compute and proceeds with handling other incoming requests. This
   * is particularly useful when main thread is waiting on Socket to
   * receive new incoming requests and does not wait for particular
   * request to be completed before responding to new request.
   */
  service.execute(new ArithmeticSumTask(1000));

  /*
   * New task received, lets pass that to async layer for computation. So
   * both requests will be executed in parallel.
   */
  service.execute(new ArithmeticSumTask(500));
  service.execute(new ArithmeticSumTask(2000));
  service.execute(new ArithmeticSumTask(1));
 }

 /**
  * 
  * ArithmeticSumTask
  *
  */
 static class ArithmeticSumTask implements AsyncTask<Long> {
  private long n;

  public ArithmeticSumTask(long n) {
   this.n = n;
  }

  /*
   * This is the long running task that is performed in background. In our
   * example the long running task is calculating arithmetic sum with
   * artificial delay.
   */
  @Override
  public Long call() throws Exception {
   return ap(n);
  }

  /*
   * This will be called in context of the main thread where some
   * validations can be done regarding the inputs. Such as it must be
   * greater than 0. It's a small computation which can be performed in
   * main thread. If we did validated the input in background thread then
   * we pay the cost of context switching which is much more than
   * validating it in main thread.
   */
  @Override
  public void onPreCall() {
   if (n < 0) {
    throw new IllegalArgumentException("n is less than 0");
   }
  }

  @Override
  public void onPostCall(Long result) {
   // Handle the result of computation
   System.out.println(result.toString());
  }

  @Override
  public void onError(Throwable throwable) {
   throw new IllegalStateException("Should not occur");
  }
 }

 private static long ap(long i) {
  try {
   Thread.sleep(i);
  } catch (InterruptedException e) {
   System.out.println("Exception caught." + e);
  }
  return i * (i + 1) / 2;
 }
}

适用性

当系统具有以下特征时,请使用半同步/半异步模式:

  • 系统必须执行响应异步发生的外部事件的任务,如OS中的硬件中断。

  • 为每个外部事件源指定单独的控制线程来执行同步I/O是低效的。

  • 如果同步执行I / O,则可以显著简化系统中的更高级别任务。

  • 系统中的一个或多个任务必须在单个控制线程中运行,而其他任务可能受益于多线程。