展示了如何集成Confluent Kafka,KSQL,Spring Boot和YugaByte DB来开发用于管理物联网(IoT)传感器数据的应用程序。
场景 - 支持物联网的车队管理
一家货运公司希望跟踪其在全国范围内运送货物的物联网车辆。车辆属于不同类型(例如18轮车,公共汽车,大型卡车),并遵循3条交付路线(Route-37,Route-82,Route-43)。特别是,该公司希望跟踪:
- 每条交付路线的车辆类型的总体分布。
- 最近(例如,在过去30秒内)每个货物交付路线的这些车辆类型的子集。
- 道路封闭附近的车辆清单,以便他们可以预测交货延误。
应用架构
除了Confluent Kafka作为流媒体平台,该应用程序还具有以下组件:
- 数据存储: YugaByte DB用于存储来自Kafka流的原始事件以及来自KSQL数据处理器的聚合。
- 数据生成器:用于模拟写入Kafka流的车辆事件的程序。
- 数据处理器: 从Data Producer读取KSQL,计算聚合并将结果存储在Data Store中。
- Data Dashboard: Spring Boot应用程序,使用Web套接字,jQuery和Bootstrap显示数据处理器的输出。
下面是显示这些组件如何组合在一起的架构图。我们称之为Confluent Kafka,KSQL和YugaByte DB堆栈或CKY堆栈。
我们现在将详细介绍这些组件中的每一个。
数据存储
该层存储所有用户数据。YugaByte DB用作数据库,YugaByte云查询语言(YCQL)用作数据库API。所有数据都存储在密钥空间TrafficKeySpace中。有一个Origin_Table用于存储原始事件的表。
CREATE TABLE TrafficKeySpace.Origin_Table ( |
请注意default_time_to_live设置为3600秒的值,以确保原始事件在1小时后自动删除。这是为了确保原始事件不会消耗数据库中的所有存储,并且在计算聚合后不久就会有效地从数据库中删除。
有三个表用于保存用于面向用户的显示的数据:
- Total_Traffic 交通信息
- Window_Traffic 最后30秒的流量和
- poi_traffic 对于兴趣点附近的交通(道路封闭)。
数据处理器不断更新这些表,仪表板从中读取。
以下是这些表:
CREATE TABLE TrafficKeySpace.Total_Traffic ( |
数据生产者
这包含生成模拟测试数据并将其发布到Kafka主题iot-data-event的程序。这模拟了使用现实世界中的消息代理从连接的车辆接收的数据。
单个数据点是JSON有效负载,如下所示:
{ |
消费者读取上面的iot-data-event主题,将每个这样的事件转换为YCQL INSERT语句,然后调用YugaByte DB持久化到事件表TrafficKeySpace.Origin_Table。
数据处理器
KSQL是Apache Kafka的流式SQL引擎。它为Kafka上的流处理提供了一个易于使用但功能强大的交互式SQL接口,无需使用Java或Python等编程语言编写代码。它支持广泛的流操作,包括数据过滤,转换,聚合,连接,窗口和会话。
使用KSQL的第一步是STREAM从原始事件创建一个iot-data-event如下所示。
CREATE STREAM traffic_stream ( |
现在可以在上面的流上运行各种聚合/查询,每种类型的查询的结果存储在它自己的新Kafka主题中。此应用程序使用3个此类查询/主题。此后,Kafka Connect YugaByte DB Sink Connector读取这3个主题,并将结果保存到YugaByte DB中的3个相应表中。
CREATE TABLE total_traffic |
所有Kafka Connect YugaByte DB Sink Connector用于存储两个原始事件以及集合数据(这是使用KSQL生成)。它计算如下:
- 到目前为止所有车辆和货物的车辆类型和装运路线细分。
- 仅针对有效货件计算上述细分。这是通过计算最近30秒的车辆类型和装运路线的细分来完成的。
- 检测距离给定兴趣点(POI)20英里范围内的车辆,这表示道路封闭。
数据仪表板
这是一个Spring Boot应用程序,它从YugaByte DB查询数据,并使用Web Sockets和jQuery将数据推送到网页。数据以固定间隔推送到网页,因此数据将自动刷新。主UI页面使用bootstrap.js显示包含图表和表格的仪表板。
我们为三个表创建实体类Total_Traffic,Window_Traffic并poi_traffic为所有实体扩展创建数据访问对象(DAO)接口CassandraRepository。例如,我们为TotalTrafficData实体创建DAO类,如下所示。
@Repository |
为了连接到YugaByte数据库集群并获得数据库操作的连接,我们还编写了一个DatabaseConfig类。
请注意,目前仪表板不访问原始事件表,仅依赖于存储在聚合表中的数据。
总结
此应用程序是使用Confluent Kafka,KSQL,Spring Boot和YugaByte DB构建IoT应用程序的蓝图。虽然这篇文章专注于本地集群部署,但Kafka代理和YugaByte数据库节点可以在真正的集群部署中进行水平扩展,以获得更多的应用程序吞吐量和容错能力。可以在yb-iot-fleet-managementGitHub仓库中找到构建和运行应用程序的说明以及源代码。