使用Kafka和ZeroMQ实现分布式Quasar Actor

  Quasar 是一个类似Scala的Akka的Java Actor模型实现框架,它使用 fiber(纤程)绿色线程实现类似Erlang的Actor模型,本站介绍结合Apache Kafka和ZeroMQ实现Quasar的分布式Actor模型。

当然,Galaxy也是一种非常好的选择,它是一个快速in-memory数据网格,专门为数据本地化复制,可选的持久和分布式actor注册,甚至能够在节点服务器之间迁移actor。

Kafka

  Apache Kafka是目前流行的提供事件日志流的项目,它的API包括两种生产者:同步和异步,消费者只有一种:同步。一个Kafka生产者处理是线程安全的,易于使用。

Comsat项目包括一个纤程友好fiber-friendly的 Kafka producer 集成,.我们使用它在actor内实现kafka的生产者,我们案例是演示从生产者发送几千个序列化的消息到消费者。

生产者代码如下:

final Properties producerConfig = new Properties();
producerConfig.put("bootstrap.servers", "localhost:9092");
producerConfig.put("client.id", "DemoProducer");
producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

try (final FiberKafkaProducer<Integer, byte[]> producer = new FiberKafkaProducer<>(new KafkaProducer<>(producerConfig))) {
  final byte[] myBytes = getMyBytes(); // ...
  final Future<RecordMetaData> res = producer.send(new ProducerRecord<>("MyTopic", i,                     myBytes));
  res.get(); // 可选,堵塞绿色线程直到记录被持久化,也可以 `producer.flush()`
}

我们使用Comsat的FiberKafkaProducer 包装了KafkaProducer 对象,这是为了获得fiber-blocking的future。

Kafka的消费处理不是线程安全的,且只有线程堵塞的同步方式:

final Properties producerConfig = new Properties();
consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
consumerConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");

try (final Consumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfig)) {
  consumer.subscribe(Collections.singletonList(TOPIC));
  final ConsumerRecords<Integer, byte[]> records = consumer.poll(1000L);
  for (final ConsumerRecord<Integer, byte[]> record : records) {
    final byte[] v = record.value();
    useMyBytes(v); // ...
  }
}

因为我们并不想堵塞fiber的底层线程池,这里就不能使用FiberAsync.runBlocking来将数据喂给固定大小的线程池了,而是在我们的actor的doRun中,我们使用一个异步任务,它会堵塞fiber会等到poll(会在指定池中执行)返回:

final ExecutorService e = Executors.newFixedThreadPool(2);

try (final Consumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfig)) {
  consumer.subscribe(Collections.singletonList(TOPIC));
  final ConsumerRecords<Integer, byte[]> records = call(e, () -> consumer.poll(1000L));
  for (final ConsumerRecord<Integer, byte[]> record : records) {
    final byte[] v = record.value();
    useMyBytes(v); // ...
  }
}

这里的call是一个工具方法,如下定义,如果没有 这个Java编译bug是不必要的。

@Suspendable
public static <V> V call(ExecutorService es, Callable<V> c) throws InterruptedException, SuspendExecution {
  try {
    return runBlocking(es, (CheckedCallable<V, Exception>) c::call);
  } catch (final InterruptedException | SuspendExecution e) {
    throw e;
  } catch (final Exception e) {
    throw new RuntimeException(e);
  }
}

完整案例代码见:complete example,这是一个从生产者发送几千个序列化的消息到消费者。

 

ØMQ

  ØMQ (或 ZeroMQ)是一个非集中的broker解决方案,有各种socket适合不同通讯模式(请求/应答或pub/sub发布/订阅),在我们的案例中我们使用最简单的请求应答模式,也是演示从生产者发送消息到消费者,生产者代码如下:

try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);
  final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) {
    trgt.connect("tcp://localhost:8000");
    final byte[] myBytes = getMyBytes(); // ...
    trgt.send(baos.toByteArray(), 0 /* flags */)
    trgt.recv(); // Reply, e.g. ACK
}

这里context实际作为一个socket工厂,其中有I/O线程数量作为context参数,这是因为ZeroMQ的socket不是基于connection-bound OS处理,而是一个简单前端,能够进行连接重试处理,或多个连接处理,或有效率的并发I/O和提供队列。者就是为什么send方法调用几乎从来不会堵塞,而recv方法不是基于连接的I/O调用,而是一个基于你的线程和指定I/O任务之间的同步调用,所谓指定I/O任务是指从一个或多个连接中接受进来的字节数据。

在Actor我们就不使用堵塞fiber的方式了,因为这里send不会堵塞,因此使用FiberAsync.runBlocking堵塞住read调用。

因此改写上面生产者代码如下,这是Actor的代码:

final ExecutorService ep = Executors.newFixedThreadPool(2);

try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);
final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) {
  exec(e, () -> trgt.connect("tcp://localhost:8000"));
  final byte[] myBytes = getMyBytes(); // ...
  call(e, trgt.send(myBytes, 0 /* flags */));
  call(e, trgt::recv); // Reply, e.g. ACK
}

下面是消费者代码:

try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);
final ZMQ.Socket src = zmq.socket(ZMQ.REP)) {
  exec(e, () -> src.bind("tcp://*:8000"));
  final byte[] v = call(e, src::recv);
  exec(e, () -> src.send("ACK"));
  useMyBytes(v); // ...
}

这里exec是一个工具函数,类似之前的call,代码如下:

@Suspendable
public static void exec(ExecutorService es, Runnable r) throws InterruptedException, SuspendExecution {
  try {
    runBlocking(es, (CheckedCallable<Void, Exception>) () -> { r.run(); return null; });
  } catch (final InterruptedException | SuspendExecution e) {
    throw e;
  } catch (final Exception e) {
    throw new RuntimeException(e);
  }
}

完整代码见:这里example

更深入应用见:Distributed Quasar Actors with Kafka and ZeroMQ

源码项目: on GitHub

 

Quasar与Akka比较

Quasar专题

Actor专题

Reactive专题