使用分布式Actor实现微服务

16-11-14 banq
         

juptr.io是一个内容个性化、分享和创作平台。通过抓取成千上万的博客和媒体网站(德语与英语),对这些内容进行分类,更易于个性化的内容管理、消费与讨论。

这就要求:

1.抓取,存储,查询,分析和分类数以百万计的文件

2.横向扩展

3.故障容错的分布式服务架构(微)

4.java,JavaScript的互操作性

使用以下基础栈:

1.java 8为后台,分析和自然语言处理

2.JavaScript polymer.js在前端

3.NoSQL水平可伸缩的DataGrid用来去实现持久性和大数据分析/ NLP

当实现一个远程服务,即使是简单的应用程序逻辑也变得复杂。

构建分布式系统需要一些样板构建等工作,比如定义通信消息和不断更新的数据格式。

服务实现主要集中在代码预处理,以及从通信消息复制到内部数据结构的转换上。通信最频繁的类型诸如httpx,TCP/IP,WebSockets会根据操作系统不同有所限制。

分布式Actor

为了实现分布式编程的生产化,我们需要:

1.抽象出网络传输边界的基础框架

2.完全自动化的通信消息编码/解码。

3.在编译时间捕捉通信协议不兼容的改变

4.透明的流stream

5.Lambda表达式的远程执行。

好处:

1.简单事情简单解决,ServiceRegistry注册服务只用不到100行的代码含括服务健康检查,可用状态改变广播和中央配置管理,见下面代码。

2.无缝衔接Java和Javascript

3.消息只需很少努力就能临时持久化,测试时或失败或复制时时重放。

4.sharding分片简单通过消息路由逻辑实现

5. 远程 lambda执行能够方便且高性能分布数据处理。

6. 高开发生产性。

下面是注册服务的代码:

public class Gravity extends Actor<Gravity> {
    HashMap<String, List<ServiceDescription>> services = new HashMap<>();
    List<Callback> listeners = new ArrayList<>();
    JuptrCfg config;

    @Local public void init() {
        config = JuptrCfg.read();
        checkTimeout(); // start cycle
    }
    public void registerService( ServiceDescription desc ) {
        List<ServiceDescription> serviceList = getServiceList(desc.getName());
        serviceList.add(desc);
        desc.receiveHeartbeat();
        if (serviceList.size()==1)
            broadcastAvailable(desc);
    }
    public IPromise<Map<String,ServiceDescription>> getServiceMap() {
        HashMap<String,ServiceDescription> servMap = new HashMap<>();
        services.forEach((name, list) -> {
            if (list.size() > 0)
                servMap.put(name, list.get(0));
        });
        return resolve(servMap);
    }
    public void subscribe( Callback<Pair<String,ServiceDescription>> cb ) {
        listeners.add(cb);
    }
    protected void broadcastAvailable(ServiceDescription desc) {
        Pair msg = new Pair(AVAILABLE,desc);
        listeners.forEach( cb -> {
            try {
                cb.stream(msg);
            } catch (Throwable th) {
                Log.Info(this, th);
            }
        });
    }
    protected void broadCastTimeOut(ServiceDescription desc) {
        Pair msg = new Pair(TIMEOUT,desc);
        for (int i = 0; i < listeners.size(); i++) {
            Callback cb = listeners.get(i);
            try {
                cb.stream(msg);
            } catch (Throwable th) {
                Log.Info(this, th);
                listeners.remove(i);
                i--;
            }
        }
    }
    public IPromise<JuptrCfg> getConfig() {
        return resolve(config);
    }
    public void receiveHeartbeat( String serviceName, String uniqueKey ) {
        getServiceList(serviceName).forEach(sdesc -> {
            if (sdesc.getUniqueKey().equals(uniqueKey)) {
                sdesc.receiveHeartbeat();
            }
        });
    }
    @Local public void checkTimeout() {
        services.values().forEach( list -> {
            int prevsiz = list.size();
            for (int i = 0; i < list.size(); i++) {
                ServiceDescription serviceDescription = list.get(i);
                if ( serviceDescription.hasTimedOut() ) {
                    list.remove(i);
                    i--;
                    broadCastTimeOut(serviceDescription);
                }
            }
            // if a service timed out, but there is a replacement,
            // broadcast availability
            if ( prevsiz != list.size() && list.size() > 0 ) {
                broadcastAvailable(list.get(0));
            }
        });
        if ( ! isStopped() ) {
            delayed(1000, () -> checkTimeout());
        }
    }
    protected List<ServiceDescription> getServiceList(String serviceName) {
        List<ServiceDescription> slist = services.get(serviceName);
        if ( slist == null ) {
            slist = new ArrayList<>();
            services.put(serviceName, slist);
        }
        return slist;
    }
    public static void main(String[] args) {
        Gravity gravity = Actors.AsActor(Gravity.class);
        gravity.init();
        // publish
        new TCPNIOPublisher(gravity,options.getGravityPort()).publish(actor -> {
            Log.Info(null, actor + " has disconnected");
        });
        // log service activity
        gravity.subscribe((pair, err) -> {
            Log.Info(gravity.getClass(), pair.car() + " " + pair.cdr());
        });
    }
}
<p>

服务连接到这个注册器并订阅的代码:

ConnectableActor gravity = new TCPConnectable(Gravity.class, host, port );
gravity = (Gravity) gravityConnectable.connect((conn, err) -> {
    gravityDisconnected(conn,err);
}).await();
// make this service available to all cluster members
gravity.registerService(getServiceDescription());
// listen to events emitted by gravity
gravity.subscribe( (pair, err) -> serviceEvent(pair.car(), pair.cdr(), err) );

<p>

Make distributed programming great again: Microser