使用ZIO-Streams的Redis Stream消费者和生产者实现源码


一个利用zio-streams和Redisson库来使用和产生事件到Redis流的示例应用程序,基于Java 11,使用ZIO-Streams的Redis Stream消费者和生产者实现。点击标题见Github。
 
ZIO/ZIO Streams库在Scala函数式编程世界中非常令人着迷,Monad风格。与Akka流相比,ZIO流更简单,更灵活且更具表现力。
以下是ZIO另外一个使用案例
要求:

  • 使用流逐行处理日志
  • 过滤掉不是ERROR或WARN的消息
  • 将流分成2个流:一个用于ERROR,另一个用于WARN消息
  • 根据一定的时间表处理错误消息,例如 每2秒
  • 批量处理WARN消息。例如,将10条消息分成一批

ZStream是ZIO Streams中的主要构造。首先,我们将从文件输入流创建ZStream:
ZStream.fromInputStream(fileInputStream)

您得到的是流的块[T]。您可以将Chunk [T]视为Array [T]的不变版本,但效率更高。
.chunks
.aggregate(ZSink.utf8DecodeChunk)
.aggregate(ZSink.splitLines)
.mapConcatChunk(identity)

然后,我们将Chunk [String]的流转换为String的流,并且每个元素都是日志文件中的一行消息。
ZIO Streams具有.tap函数,您可以使用该函数使流中的消息达到峰值。它是调试流应用的好工具。
.tap(data => putStrLn(s"> $data"))

然后我们过滤掉不是ERROR或WARN的消息:
.filter(isErrorWarning)

下一行代码基于isError谓词返回true或false ,将当前流分成2个流。数字4是缓冲区大小,因此两个子流可以以不同的速度运行直至缓冲区大小:

.partition(isError, 4)
val errorStream = leftStream
  .mapM(processError(_))
  .schedule(Schedule.fixed(2.second))

使用ZIO Schedule每2秒处理一次左流中的每个ERROR消息。
对于正确流中的WARN消息,我们将一次将它们分为6条消息并进行批量处理:
val warningStream = rightStream
  .aggregate(ZSink.collectAllN[String](10))
  .mapM(processWarning(_))

如果您更容易理解,您可以认为ZSink是消息生产者/使用者上下文中的消息使用者。(并认为ZStream是消息生成器)。ZSink通常用于ZIO Stream中的聚合功能。
最后,我们将2个流合并为一个并收集所有元素:

errorStream.merge(warningStream).runCollect

以下是完整代码:

import java.nio.file.{Files, Paths}

import zio._
import zio.console._
import zio.duration._
import zio.stream._

object LogStreamApp extends App {

  def isErrorWarning(data: String) = {
    data.contains("ERROR") || data.contains("WARN")
  }

  def isError(data: String): Boolean = {
    data.contains(
"ERROR")
  }

  def processError(data: String) = {
    putStrLn(s
"process error message: ${data}") *>
      Task.succeed()
  }

  def processWarning(list: List[String]) = {
    putStrLn(s
"process warning messages in batch: ${list.length} => $list") *>
      Task.succeed()
  }

  def run(args: List[String]): ZIO[ZEnv, Nothing, Int] = {
    val is = Files.newInputStream(Paths.get(ClassLoader.getSystemResource(
"prod_log.txt").toURI()))

    val theJob = (for {
      streams <- ZStream
        .fromInputStream(is)
        .chunks
        .aggregate(ZSink.utf8DecodeChunk)
        .aggregate(ZSink.splitLines)
        .mapConcatChunk(identity)
        .tap(data => putStrLn(s
"> $data"))
        .filter(isErrorWarning)
        .partition(isError, 4)
    } yield streams).use {
      case (leftStream, rightStream) => {
        val errorStream = leftStream
          .mapM(processError(_))
          .schedule(Schedule.fixed(2.second))

        val warningStream = rightStream
          .aggregate(ZSink.collectAllN[String](10))
          .mapM(processWarning(_))

        errorStream.merge(warningStream).runCollect
      }
    }

    theJob.fold(_ => 1, _ => 0)
  }
}

the code at github.