Liiklus [li:klus](爱沙尼亚语中的“流量”) - 基于gRPC的网关,用于基于事件的系统,如果你认为Kafka实现事件系统过于底层,可以使用该系统:
- 水平可扩展的gRPC流媒体网关
- 支持与gRPC一样多的客户端语言(Java,Go,C ++,Python等)
- 响应reactive第一
- 每个分区有背压感知源
- 至少一次/最多一次交付保证
- 可插拔事件存储(Kafka,Pulsar,Kinesis等......)
- 可插拔位置存储(DynamoDB,Cassandra,Redis等...)
- WIP:冷事件存储支持(S3,Minio,SQL,键/值等...)
目前谁在使用?开始使用:
$ docker run \ -e kafka_bootstrapServers=some.kafka.host:9092 \ -e storage_positions_type=MEMORY \ # only for testing, DO NOT use in production -p 6565:6565 \ bsideup/liiklus:$LATEST_VERSION
|
现在可以使用LiiklusService.proto生成您的客户端。
客户端必须实现以下算法:
- 订阅作业:
stub.subscribe(SubscribeRequest( topic="your-topic", group="your-consumer-group", [autoOffsetReset="earliest|latest"]))
|
- 接受:对Subscribe使用相同频道的每个发出的回复:
stub.receive(ReceiveRequest( assignment=reply.getAssignment()))
|
- 确认
stub.ack(AckRequest( assignment=reply.getAssignment(), offset=record.getOffset()))
|
注1:如果在处理之前确认记录是最多一次,在处理后确认记录是至少一次
注意2:建议每隔n个记录确认一次,或者每隔n秒确认一次,以减少位置存储库的负载
使用Project Reactor和reactive-grpc的示例代码:
var stub = ReactorLiiklusServiceGrpc.newReactorStub(channel); stub .subscribe( SubscribeRequest.newBuilder() .setTopic("user-events") .setGroup("analytics") .setAutoOffsetReset(AutoOffsetReset.EARLIEST) .build() ) .flatMap(reply -> stub .receive(ReceiveRequest.newBuilder().setAssignment(reply.getAssignment()).build()) .window(1000) // ACK every 1000th records .concatMap( batch -> batch .map(ReceiveReply::getRecord) // TODO process instead of Mono.delay(), i.e. by indexing to ElasticSearch .concatMap(record -> Mono.delay(Duration.ofMillis(100))) .sample(Duration.ofSeconds(5)) // ACK every 5 seconds .onBackpressureLatest() .delayUntil(record -> stub.ack( AckRequest.newBuilder() .setAssignment(reply.getAssignment()) .setOffset(record.getOffset()) .build() )), 1 ) ) .blockLast()
|