Apache Flink复杂事件处理指南 - softwaremill


如今,流处理是一个非常流行的话题。公司处理成千上万个需要实时或近实时处理的事件。企业需要分析客户的行为,交易,股票价格变化甚至自动驾驶汽车传感器读数。但是,今天,我们要专注于复杂事件处理
 
什么是复杂事件处理(CEP)
事件流可以以不同的方式处理。在最简单的情况下,独立分析单个事件就足够了。如果温度传感器的值高于50°C,则发送警报。在更高级的情况下,将一起分析事件的组或窗口,例如,如果平均5分钟的温度传感器读数高于50°C,则发送警报。这样可以过滤异常值和错误的读数。但是,CEP涵盖了甚至更复杂的情况,其中对流进行分析以寻找特定的数据模式或趋势。例如,我们查看5个温度读数,这些数值的增长值超过了目标限值,同时烟雾传感器开启。
CEP允许我们定义各种规则。我们可能会尝试查找类型A后面类型B的事件。我们可能还想查找没有事件B的事件A。可能与订单,时间,数字有关。意味着它们之间可能还会发生其他类型的事件。对于人类来说,这听起来似乎很简单,但是在处理大数据时,它开始成为主要的计算挑战。而且,为了拯救人们的生命,经常需要即时接收算法的结果。
在工业4.0时代,不需要仅在大数据中心中执行复杂的分析。我们注意到围绕边缘AI和边缘分析的术语的增长趋势。专门的物联网微芯片与传感器一起部署,因此可以在本地执行分析,从而省去了与将数据传输到云相关的延迟。这样,可以更快地采取适当的措施。
 

复杂事件处理用例
哪种用例最适合CEP?好吧,我们在数据流中寻找特定模式的所有内容。为了进一步说明,让我们看几个例子。

  • 算法股票交易

在金融世界中,利用技术分析来预测股票价格的走势。有很多信息可以使用,但是最重要的信息涉及交易价格和交易量。仅基于这两个指标,就有很多模式。各种三角形,楔形,头和肩膀,双顶/底等。其中一些实际上看起来很简单,但是并不是每个人都有时间手动跟踪整个股票市场的图表。还有一些更复杂的方法,例如与Wyckoff方法有关的方法。CEP用于创建支持股票交易的系统,也可用于算法交易系统。
  • 欺诈识别

该领域最常见的主题是信用卡欺诈。银行会尝试检测您的卡详细信息是否可能被盗,以免造成损失。可疑模式中可以包含哪些内容?例如,交易晚点或在世界各地的位置。如有任何疑问,将与客户联系以确认交易。
爱沙尼亚的SEB银行利用Apama Complex Event Processing进行付款欺诈检测。
  • 系统安全

安全是一个困难的话题,涉及组织的各个领域。可用的解决方案主要集中在异常或入侵检测以及网络监控。在信息来源中,最重要的是网络日志,可以对其进行分析以找出可疑模式。
进一步阅读:复杂事件处理系统在异常检测和网络监控中的应用

意外的硬件故障总是代价高昂。它们导致停机时间和生产计划复杂化。如何避免它们?有些技术仅基于特定模型的故障统计信息。但是,这些只是统计信息,永远不会100%准确。更现代的方法是监视设备并从各种传感器收集读数。数据中的模式可能表明设备即将失效。
  • 其他用例

我们仅描述了一些特定的用例领域,但是更多的用例是可能的。在商务智能,天气报告,业务活动监控,点击流分析或自动驾驶工具中也提到了复杂事件处理。范围非常广泛,并且随着工业和5G物联网的增长趋势,对于复杂的场景,将需要分析越来越多的数据。
 
用于复杂事件处理的技术
由于可以使用“复杂事件处理”来分析广泛的领域,因此实际上有大量的产品和工具可以实现此技术。其中一些专门用于特定用例,其中一些仅在付费产品中可用。每个主要参与者都有与CEP相关的产品-例如Azure流分析,Microsoft StreamInsight,SAP ESP,TIBCO BusinessEvents和Streambase。让我们更深入地研究一种有前途的开源解决方案Apache Flink。
 
Apache Flink
Apache Flink是用于批处理和流处理的工具。通常将它与Apache Spark进行比较,因为它提供了非常相似的功能。但是,从CEP角度来看,有一个主要区别:分别用于复杂事件处理的模块和DSL。
假设我们要在至少三个温度高于50°C的事件之后发送警报,然后发出烟雾。
首先,我们需要定义事件类型,以表示系统处理的消息。
 sealed trait Event

  case class TemperatureEvent(value: Double) extends Event
  case class SmokeEvent() extends Event
  case class LightEvent(value: Double) extends Event

上面Scala代码中声明了事件特征和三种可能的事件类型-一种用于温度,一种用于烟雾,另一种用于-LightEvent在事件流中引入一些噪声。
然后,让我们定义最重要的部分-模式:

 val pattern = Pattern.begin[Event]("temperature").subtype(classOf[TemperatureEvent]).where(_.value >= 50.0).timesOrMore(3)
    .followedBy(
"smoke").subtype(classOf[SmokeEvent])

该模式以至少三个类型的事件开头,TemperatureEvent后跟一个SmokeEvent。followedBy表示其他事件可能在两者之间发生。
最后一部分与运行和报告检测到的模式有关:
val env = StreamExecutionEnvironment.createLocalEnvironment()

 val patternStream = CEP.pattern(input, pattern)

  case class Alert(message: String) // just for presenting results

  val result: DataStream[Alert] = patternStream.process(
    new PatternProcessFunction[Event, Alert]() {
      override def processMatch(
                                 `match`: util.Map[String, util.List[Event]],
                                 ctx: PatternProcessFunction.Context,
                                 out: Collector[Alert]): Unit = {
        out.collect(Alert(`match`.toString))
// match map converted to string
      }
    })

  result.print()  
// let’s display the results

  env.execute()
// run the flow

我说这是最后一部分,但是我们已经忘记了input!假设它将是:

 

val input: DataStream[Event] = env.fromElements(
    TemperatureEvent(49.0),
    TemperatureEvent(51.0),
    LightEvent(100),
    TemperatureEvent(52.0),
    TemperatureEvent(53.0),
    LightEvent(125),
    SmokeEvent(),
    LightEvent(135)
  )

结果:
Alert({
    temperature=[TemperatureEvent(51.0), TemperatureEvent(52.0), TemperatureEvent(53.0)],
    smoke=[SmokeEvent()]
})

一切都按预期工作,并报告了警报。但是,提出的示例并不完美。可以通过在实际事件中添加时间戳以及输入序列必须与模式匹配的最大时间间隔来改善此效果。当前代码不受任何时间范围的限制。如果您想了解有关FlinkCEP的更多信息,请查看其文档
 
结论
复杂的事件处理已经存在了很多年。它涉及各个领域,可以应用于许多领域。市场上的CEP工具为标准流处理增加了一层,因此可以更轻松地定义复杂的模式。到目前为止,大多数可用于生产环境的“复杂事件处理”工具都是开源的,并且只能在商业上使用。
Apache Flink是CEP的可靠工具。它已在批处理和流处理领域中得到了一定的采用,因此附加的CEP模块使该技术的访问范围更广,从而使我们能够发现潜在的新用例。