一个利用zio-streams和Redisson库来使用和产生事件到Redis流的示例应用程序,基于Java 11,使用ZIO-Streams的Redis Stream消费者和生产者实现。点击标题见Github。
ZIO/ZIO Streams库在Scala函数式编程世界中非常令人着迷,Monad风格。与Akka流相比,ZIO流更简单,更灵活且更具表现力。
以下是ZIO另外一个使用案例
要求:
- 使用流逐行处理日志
- 过滤掉不是ERROR或WARN的消息
- 将流分成2个流:一个用于ERROR,另一个用于WARN消息
- 根据一定的时间表处理错误消息,例如 每2秒
- 批量处理WARN消息。例如,将10条消息分成一批
ZStream是ZIO Streams中的主要构造。首先,我们将从文件输入流创建ZStream:
ZStream.fromInputStream(fileInputStream)
|
您得到的是流的块[T]。您可以将Chunk [T]视为Array [T]的不变版本,但效率更高。
.chunks .aggregate(ZSink.utf8DecodeChunk) .aggregate(ZSink.splitLines) .mapConcatChunk(identity)
|
然后,我们将Chunk [String]的流转换为String的流,并且每个元素都是日志文件中的一行消息。
ZIO Streams具有.tap函数,您可以使用该函数使流中的消息达到峰值。它是调试流应用的好工具。
.tap(data => putStrLn(s"> $data"))
|
然后我们过滤掉不是ERROR或WARN的消息:
下一行代码基于isError谓词返回true或false ,将当前流分成2个流。数字4是缓冲区大小,因此两个子流可以以不同的速度运行直至缓冲区大小:
.partition(isError, 4) val errorStream = leftStream .mapM(processError(_)) .schedule(Schedule.fixed(2.second))
|
使用ZIO Schedule每2秒处理一次左流中的每个ERROR消息。
对于正确流中的WARN消息,我们将一次将它们分为6条消息并进行批量处理:
val warningStream = rightStream .aggregate(ZSink.collectAllN[String](10)) .mapM(processWarning(_))
|
如果您更容易理解,您可以认为ZSink是消息生产者/使用者上下文中的消息使用者。(并认为ZStream是消息生成器)。ZSink通常用于ZIO Stream中的聚合功能。
最后,我们将2个流合并为一个并收集所有元素:
errorStream.merge(warningStream).runCollect
|
以下是完整代码:
import java.nio.file.{Files, Paths}
import zio._ import zio.console._ import zio.duration._ import zio.stream._
object LogStreamApp extends App {
def isErrorWarning(data: String) = { data.contains("ERROR") || data.contains("WARN") }
def isError(data: String): Boolean = { data.contains("ERROR") }
def processError(data: String) = { putStrLn(s"process error message: ${data}") *> Task.succeed() }
def processWarning(list: List[String]) = { putStrLn(s"process warning messages in batch: ${list.length} => $list") *> Task.succeed() }
def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = { val is = Files.newInputStream(Paths.get(ClassLoader.getSystemResource("prod_log.txt").toURI()))
val theJob = (for { streams <- ZStream .fromInputStream(is) .chunks .aggregate(ZSink.utf8DecodeChunk) .aggregate(ZSink.splitLines) .mapConcatChunk(identity) .tap(data => putStrLn(s"> $data")) .filter(isErrorWarning) .partition(isError, 4) } yield streams).use { case (leftStream, rightStream) => { val errorStream = leftStream .mapM(processError(_)) .schedule(Schedule.fixed(2.second))
val warningStream = rightStream .aggregate(ZSink.collectAllN[String](10)) .mapM(processWarning(_))
errorStream.merge(warningStream).runCollect } }
theJob.fold(_ => 1, _ => 0) } }
|
the code at github.