使用分布式Actor实现微服务

juptr.io是一个内容个性化、分享和创作平台。通过抓取成千上万的博客和媒体网站(德语与英语),对这些内容进行分类,更易于个性化的内容管理、消费与讨论。

这就要求:
1.抓取,存储,查询,分析和分类数以百万计的文件
2.横向扩展
3.故障容错的分布式服务架构(微)
4.java,JavaScript的互操作性

使用以下基础栈:
1.java 8为后台,分析和自然语言处理
2.JavaScript polymer.js在前端
3.NoSQL水平可伸缩的DataGrid用来去实现持久性和大数据分析/ NLP

当实现一个远程服务,即使是简单的应用程序逻辑也变得复杂。

构建分布式系统需要一些样板构建等工作,比如定义通信消息和不断更新的数据格式。

服务实现主要集中在代码预处理,以及从通信消息复制到内部数据结构的转换上。通信最频繁的类型诸如httpx,TCP/IP,WebSockets会根据操作系统不同有所限制。

分布式Actor
为了实现分布式编程的生产化,我们需要:
1.抽象出网络传输边界的基础框架
2.完全自动化的通信消息编码/解码。
3.在编译时间捕捉通信协议不兼容的改变
4.透明的流stream
5.Lambda表达式的远程执行。

好处:
1.简单事情简单解决,ServiceRegistry注册服务只用不到100行的代码含括服务健康检查,可用状态改变广播和中央配置管理,见下面代码。
2.无缝衔接Java和Javascript
3.消息只需很少努力就能临时持久化,测试时或失败或复制时时重放。
4.sharding分片简单通过消息路由逻辑实现
5. 远程 lambda执行能够方便且高性能分布数据处理。
6. 高开发生产性。

下面是注册服务的代码:


public class Gravity extends Actor<Gravity> {
HashMap<String, List<ServiceDescription>> services = new HashMap<>();
List<Callback> listeners = new ArrayList<>();
JuptrCfg config;

@Local public void init() {
config = JuptrCfg.read();
checkTimeout(); // start cycle
}
public void registerService( ServiceDescription desc ) {
List<ServiceDescription> serviceList = getServiceList(desc.getName());
serviceList.add(desc);
desc.receiveHeartbeat();
if (serviceList.size()==1)
broadcastAvailable(desc);
}
public IPromise<Map<String,ServiceDescription>> getServiceMap() {
HashMap<String,ServiceDescription> servMap = new HashMap<>();
services.forEach((name, list) -> {
if (list.size() > 0)
servMap.put(name, list.get(0));
});
return resolve(servMap);
}
public void subscribe( Callback<Pair<String,ServiceDescription>> cb ) {
listeners.add(cb);
}
protected void broadcastAvailable(ServiceDescription desc) {
Pair msg = new Pair(AVAILABLE,desc);
listeners.forEach( cb -> {
try {
cb.stream(msg);
} catch (Throwable th) {
Log.Info(this, th);
}
});
}
protected void broadCastTimeOut(ServiceDescription desc) {
Pair msg = new Pair(TIMEOUT,desc);
for (int i = 0; i < listeners.size(); i++) {
Callback cb = listeners.get(i);
try {
cb.stream(msg);
} catch (Throwable th) {
Log.Info(this, th);
listeners.remove(i);
i--;
}
}
}
public IPromise<JuptrCfg> getConfig() {
return resolve(config);
}
public void receiveHeartbeat( String serviceName, String uniqueKey ) {
getServiceList(serviceName).forEach(sdesc -> {
if (sdesc.getUniqueKey().equals(uniqueKey)) {
sdesc.receiveHeartbeat();
}
});
}
@Local public void checkTimeout() {
services.values().forEach( list -> {
int prevsiz = list.size();
for (int i = 0; i < list.size(); i++) {
ServiceDescription serviceDescription = list.get(i);
if ( serviceDescription.hasTimedOut() ) {
list.remove(i);
i--;
broadCastTimeOut(serviceDescription);
}
}
// if a service timed out, but there is a replacement,
// broadcast availability
if ( prevsiz != list.size() && list.size() > 0 ) {
broadcastAvailable(list.get(0));
}
});
if ( ! isStopped() ) {
delayed(1000, () -> checkTimeout());
}
}
protected List<ServiceDescription> getServiceList(String serviceName) {
List<ServiceDescription> slist = services.get(serviceName);
if ( slist == null ) {
slist = new ArrayList<>();
services.put(serviceName, slist);
}
return slist;
}
public static void main(String[] args) {
Gravity gravity = Actors.AsActor(Gravity.class);
gravity.init();
// publish
new TCPNIOPublisher(gravity,options.getGravityPort()).publish(actor -> {
Log.Info(null, actor +
" has disconnected");
});
// log service activity
gravity.subscribe((pair, err) -> {
Log.Info(gravity.getClass(), pair.car() +
" " + pair.cdr());
});
}
}

服务连接到这个注册器并订阅的代码:


ConnectableActor gravity = new TCPConnectable(Gravity.class, host, port );
gravity = (Gravity) gravityConnectable.connect((conn, err) -> {
gravityDisconnected(conn,err);
}).await();
// make this service available to all cluster members
gravity.registerService(getServiceDescription());
// listen to events emitted by gravity
gravity.subscribe( (pair, err) -> serviceEvent(pair.car(), pair.cdr(), err) );

Make distributed programming great again: Microser