juptr.io是一个内容个性化、分享和创作平台。通过抓取成千上万的博客和媒体网站(德语与英语),对这些内容进行分类,更易于个性化的内容管理、消费与讨论。
这就要求:
1.抓取,存储,查询,分析和分类数以百万计的文件
2.横向扩展
3.故障容错的分布式服务架构(微)
4.java,JavaScript的互操作性
使用以下基础栈:
1.java 8为后台,分析和自然语言处理
2.JavaScript polymer.js在前端
3.NoSQL水平可伸缩的DataGrid用来去实现持久性和大数据分析/ NLP
当实现一个远程服务,即使是简单的应用程序逻辑也变得复杂。
构建分布式系统需要一些样板构建等工作,比如定义通信消息和不断更新的数据格式。
服务实现主要集中在代码预处理,以及从通信消息复制到内部数据结构的转换上。通信最频繁的类型诸如httpx,TCP/IP,WebSockets会根据操作系统不同有所限制。
分布式Actor
为了实现分布式编程的生产化,我们需要:
1.抽象出网络传输边界的基础框架
2.完全自动化的通信消息编码/解码。
3.在编译时间捕捉通信协议不兼容的改变
4.透明的流stream
5.Lambda表达式的远程执行。
好处:
1.简单事情简单解决,ServiceRegistry注册服务只用不到100行的代码含括服务健康检查,可用状态改变广播和中央配置管理,见下面代码。
2.无缝衔接Java和Javascript
3.消息只需很少努力就能临时持久化,测试时或失败或复制时时重放。
4.sharding分片简单通过消息路由逻辑实现
5. 远程 lambda执行能够方便且高性能分布数据处理。
6. 高开发生产性。
下面是注册服务的代码:
public class Gravity extends Actor<Gravity> { HashMap<String, List<ServiceDescription>> services = new HashMap<>(); List<Callback> listeners = new ArrayList<>(); JuptrCfg config;
@Local public void init() { config = JuptrCfg.read(); checkTimeout(); // start cycle } public void registerService( ServiceDescription desc ) { List<ServiceDescription> serviceList = getServiceList(desc.getName()); serviceList.add(desc); desc.receiveHeartbeat(); if (serviceList.size()==1) broadcastAvailable(desc); } public IPromise<Map<String,ServiceDescription>> getServiceMap() { HashMap<String,ServiceDescription> servMap = new HashMap<>(); services.forEach((name, list) -> { if (list.size() > 0) servMap.put(name, list.get(0)); }); return resolve(servMap); } public void subscribe( Callback<Pair<String,ServiceDescription>> cb ) { listeners.add(cb); } protected void broadcastAvailable(ServiceDescription desc) { Pair msg = new Pair(AVAILABLE,desc); listeners.forEach( cb -> { try { cb.stream(msg); } catch (Throwable th) { Log.Info(this, th); } }); } protected void broadCastTimeOut(ServiceDescription desc) { Pair msg = new Pair(TIMEOUT,desc); for (int i = 0; i < listeners.size(); i++) { Callback cb = listeners.get(i); try { cb.stream(msg); } catch (Throwable th) { Log.Info(this, th); listeners.remove(i); i--; } } } public IPromise<JuptrCfg> getConfig() { return resolve(config); } public void receiveHeartbeat( String serviceName, String uniqueKey ) { getServiceList(serviceName).forEach(sdesc -> { if (sdesc.getUniqueKey().equals(uniqueKey)) { sdesc.receiveHeartbeat(); } }); } @Local public void checkTimeout() { services.values().forEach( list -> { int prevsiz = list.size(); for (int i = 0; i < list.size(); i++) { ServiceDescription serviceDescription = list.get(i); if ( serviceDescription.hasTimedOut() ) { list.remove(i); i--; broadCastTimeOut(serviceDescription); } } // if a service timed out, but there is a replacement, // broadcast availability if ( prevsiz != list.size() && list.size() > 0 ) { broadcastAvailable(list.get(0)); } }); if ( ! isStopped() ) { delayed(1000, () -> checkTimeout()); } } protected List<ServiceDescription> getServiceList(String serviceName) { List<ServiceDescription> slist = services.get(serviceName); if ( slist == null ) { slist = new ArrayList<>(); services.put(serviceName, slist); } return slist; } public static void main(String[] args) { Gravity gravity = Actors.AsActor(Gravity.class); gravity.init(); // publish new TCPNIOPublisher(gravity,options.getGravityPort()).publish(actor -> { Log.Info(null, actor + " has disconnected"); }); // log service activity gravity.subscribe((pair, err) -> { Log.Info(gravity.getClass(), pair.car() + " " + pair.cdr()); }); } }
|
服务连接到这个注册器并订阅的代码:
ConnectableActor gravity = new TCPConnectable(Gravity.class, host, port ); gravity = (Gravity) gravityConnectable.connect((conn, err) -> { gravityDisconnected(conn,err); }).await(); // make this service available to all cluster members gravity.registerService(getServiceDescription()); // listen to events emitted by gravity gravity.subscribe( (pair, err) -> serviceEvent(pair.car(), pair.cdr(), err) );
|
Make distributed programming great again: Microser