Spray + Akka高性能异步IO并发

  如何使用Java建立像Node.js那样非堵塞异步事件并发IO服务器呢?Spray是基于NIO2高并发框架,虽然Tomcat 8也是基于NIO2,但是Spary的线程数要低得到,降低CPU上下文切换的负载;Akka和其Mysql库包都是相同线程执行上下文( execution context),因为在非堵塞前提下,性能拼的就不是线程数目越多越好,正好相反,线程数目越低,越接近理论理论最佳点。

启动设置

  为了启动Spary和Akka,需要一个main启动函数,在main函数中,我们创建一个Actor系统:

    ActorSystem system = ActorSystem.create("system");

    ActorRef listener = system.actorOf(Props.create(HttpActor.class), "httpActor");

  listener是使用Actor用来处理Http请求。然后要设定监听Http的端口:

    InetSocketAddress endpoint = new InetSocketAddress(3000);
    int backlog = 100;
    List<Inet.SocketOption> options = JavaConversions.asScalaBuffer(new ArrayList<Inet.SocketOption>()).toList();
    Option<ServerSettings> settings = scala.Option.empty();

  最后,绑定Actor到监听的Http端口:

    Bind bind = new Http.Bind(listener, endpoint, backlog, options, settings, sslEngineProvider);

    IO.apply(spray.can.Http$.MODULE$, system).tell(bind, ActorRef.noSender());

 

  整个main函数主要代码如下:

public static final ActorSystem system = ActorSystem.create("system");
 
public static void main(String[] args) {
    ... 
    ActorRef listener = system.actorOf(Props.create(HttpActor.class), "httpActor"); 
    
    InetSocketAddress endpoint = new InetSocketAddress(3000);
    int backlog = 100;
    List<Inet.SocketOption> options = JavaConversions.asScalaBuffer(new ArrayList<Inet.SocketOption>()).toList();
    Option<ServerSettings> settings = scala.Option.empty();
    ServerSSLEngineProvider sslEngineProvider = null;
    Bind bind = new Http.Bind(listener, endpoint, backlog, options, settings, sslEngineProvider);
    IO.apply(spray.can.Http$.MODULE$, system).tell(bind, ActorRef.noSender());
        ... 
}

  设置好启动函数以后,下面是真正开始在Actor里处理进来Http请求了。

请求处理器

  首先,我们因为使用的是原生Java代码,不是Scala,因此需要将Scala集成到Java中,可能比较丑陋,可以用专门类包装一下,引入Scala的Http协议:

HttpProtocol HTTP_1_1 = HttpProtocols.HTTP$div1$u002E1();

  Http Actor为了响应Http请求做三件事,第一件是创建一个路由router,这样能够根据请求URL:http://xxx/path中不同的/path分别处理:

Router router = partitionAndCreateRouter();

  第二件是处理新的连接,告诉Spray这个actor不仅接受Http连接,也处理实际http连接:

       }).match(Tcp.Connected.class, r ->{
                sender().tell(new Http.Register(self(), Http.EmptyFastPath$.MODULE$), self());
 //tell that connection will be handled here! 
         })

  第三件事就是处理实际的Http连接,将http请求委托给另外一个actor处理。

            .match(HttpRequest.class, r -> {
                int id = Constants.ID.getAndIncrement();
                String path = String.valueOf(r.uri().path());
                if("/sell".equals(path)){
                    ... //逻辑处理
                }else if("/buy".equals(path)){
                    ... //逻辑处理
                }else{
                    handleUnexpected(r);
                }
            })

 

  整个HttpActor代码如下,业务逻辑以买卖为模型:

private static class HttpActor extends AbstractActor {
 
    private static final HttpProtocol HTTP_1_1 = HttpProtocols.HTTP$div1$u002E1();
 
    public HttpActor() {
        final Router router = partitionAndCreateRouter();
        
        receive(ReceiveBuilder
            .match(HttpRequest.class, r -> {
                int id = Constants.ID.getAndIncrement();
                String path = String.valueOf(r.uri().path());
                if("/sell".equals(path)){
                    String productId = r.uri().query().get("productId").get();
                    ... 
                    SalesOrder so = new SalesOrder(price, productId, quantity, id);
                    so.setSeller(new Seller(who));
                    router.route(so, self());
                    replyOK(id);
                }else if("/buy".equals(path)){
                    ... 
                }else{
                    handleUnexpected(r);
                }
            }).match(Tcp.Connected.class, r ->{
                sender().tell(new Http.Register(self(), Http.EmptyFastPath$.MODULE$), self());
 //tell that connection will be handled here! 
            }).build());
    }

 

  该案例使用的驱动包有: AkkaSpray, and this Mysql async driver., 整个源码下载:GitHub

Actor模型教程

NIO原理与应用

 

猜你喜欢