展示了如何集成Confluent Kafka,KSQL,Spring Boot和YugaByte DB来开发用于管理物联网(IoT)传感器数据的应用程序。
场景 - 支持物联网的车队管理
一家货运公司希望跟踪其在全国范围内运送货物的物联网车辆。车辆属于不同类型(例如18轮车,公共汽车,大型卡车),并遵循3条交付路线(Route-37,Route-82,Route-43)。特别是,该公司希望跟踪:
- 每条交付路线的车辆类型的总体分布。
- 最近(例如,在过去30秒内)每个货物交付路线的这些车辆类型的子集。
- 道路封闭附近的车辆清单,以便他们可以预测交货延误。
应用架构
除了Confluent Kafka作为流媒体平台,该应用程序还具有以下组件:
- 数据存储: YugaByte DB用于存储来自Kafka流的原始事件以及来自KSQL数据处理器的聚合。
- 数据生成器:用于模拟写入Kafka流的车辆事件的程序。
- 数据处理器: 从Data Producer读取KSQL,计算聚合并将结果存储在Data Store中。
- Data Dashboard: Spring Boot应用程序,使用Web套接字,jQuery和Bootstrap显示数据处理器的输出。
下面是显示这些组件如何组合在一起的架构图。我们称之为Confluent Kafka,KSQL和YugaByte DB堆栈或CKY堆栈。
我们现在将详细介绍这些组件中的每一个。
数据存储
该层存储所有用户数据。YugaByte DB用作数据库,YugaByte云查询语言(YCQL)用作数据库API。所有数据都存储在密钥空间TrafficKeySpace中。有一个Origin_Table用于存储原始事件的表。
CREATE TABLE TrafficKeySpace.Origin_Table ( vehicleId text, routeId text, vehicleType text, longitude text, latitude text, timeStamp timestamp, speed double, fuelLevel double, PRIMARY KEY ((vehicleId), timeStamp) ) WITH default_time_to_live = 3600; |
请注意default_time_to_live设置为3600秒的值,以确保原始事件在1小时后自动删除。这是为了确保原始事件不会消耗数据库中的所有存储,并且在计算聚合后不久就会有效地从数据库中删除。
有三个表用于保存用于面向用户的显示的数据:
- Total_Traffic 交通信息
- Window_Traffic 最后30秒的流量和
- poi_traffic 对于兴趣点附近的交通(道路封闭)。
数据处理器不断更新这些表,仪表板从中读取。
以下是这些表:
CREATE TABLE TrafficKeySpace.Total_Traffic ( routeId text, vehicleType text, totalCount bigint, timeStamp timestamp, recordDate text, PRIMARY KEY (routeId, recordDate, vehicleType) ); CREATE TABLE TrafficKeySpace.Window_Traffic ( routeId text, vehicleType text, totalCount bigint, timeStamp timestamp, recordDate text, PRIMARY KEY (routeId, recordDate, vehicleType) ); CREATE TABLE TrafficKeySpace.Poi_Traffic( vehicleid text, vehicletype text, distance bigint, timeStamp timestamp, PRIMARY KEY (vehicleid) ); |
数据生产者
这包含生成模拟测试数据并将其发布到Kafka主题iot-data-event的程序。这模拟了使用现实世界中的消息代理从连接的车辆接收的数据。
单个数据点是JSON有效负载,如下所示:
{ "vehicleId":"0bf45cac-d1b8-4364-a906-980e1c2bdbcb", "vehicleType":"Taxi", "routeId":"Route-37", "longitude":"-95.255615", "latitude":"33.49808", "timestamp":"2017-10-16 12:31:03", "speed":49.0, "fuelLevel":38.0 } |
消费者读取上面的iot-data-event主题,将每个这样的事件转换为YCQL INSERT语句,然后调用YugaByte DB持久化到事件表TrafficKeySpace.Origin_Table。
数据处理器
KSQL是Apache Kafka的流式SQL引擎。它为Kafka上的流处理提供了一个易于使用但功能强大的交互式SQL接口,无需使用Java或Python等编程语言编写代码。它支持广泛的流操作,包括数据过滤,转换,聚合,连接,窗口和会话。
使用KSQL的第一步是STREAM从原始事件创建一个iot-data-event如下所示。
CREATE STREAM traffic_stream ( vehicleId varchar, vehicleType varchar, routeId varchar, timeStamp varchar, latitude varchar, longitude varchar) WITH ( KAFKA_TOPIC='iot-data-event', VALUE_FORMAT='json', TIMESTAMP='timeStamp', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss'); |
现在可以在上面的流上运行各种聚合/查询,每种类型的查询的结果存储在它自己的新Kafka主题中。此应用程序使用3个此类查询/主题。此后,Kafka Connect YugaByte DB Sink Connector读取这3个主题,并将结果保存到YugaByte DB中的3个相应表中。
CREATE TABLE total_traffic WITH ( PARTITIONS=1, KAFKA_TOPIC='total_traffic', TIMESTAMP='timeStamp', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss') AS SELECT routeId, vehicleType, count(vehicleId) AS totalCount, max(rowtime) AS timeStamp, TIMESTAMPTOSTRING(max(rowtime), 'yyyy-MM-dd') AS recordDate FROM traffic_stream GROUP BY routeId, vehicleType; CREATE TABLE window_traffic WITH ( TIMESTAMP='timeStamp', KAFKA_TOPIC='window_traffic', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss', PARTITIONS=1) AS SELECT routeId, vehicleType, count(vehicleId) AS totalCount, max(rowtime) AS timeStamp, TIMESTAMPTOSTRING(max(rowtime), 'yyyy-MM-dd') AS recordDate FROM traffic_stream WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS) GROUP BY routeId, vehicleType; CREATE STREAM poi_traffic WITH ( PARTITIONS=1, KAFKA_TOPIC='poi_traffic', TIMESTAMP='timeStamp', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss') AS SELECT vehicleId, vehicleType, cast(GEO_DISTANCE(cast(latitude AS double),cast(longitude AS double),33.877495,-95.50238,'KM') AS bigint) AS distance, timeStamp FROM traffic_stream WHERE GEO_DISTANCE(cast(latitude AS double),cast(longitude AS double),33.877495,-95.50238,'KM') < 30; |
所有Kafka Connect YugaByte DB Sink Connector用于存储两个原始事件以及集合数据(这是使用KSQL生成)。它计算如下:
- 到目前为止所有车辆和货物的车辆类型和装运路线细分。
- 仅针对有效货件计算上述细分。这是通过计算最近30秒的车辆类型和装运路线的细分来完成的。
- 检测距离给定兴趣点(POI)20英里范围内的车辆,这表示道路封闭。
数据仪表板
这是一个Spring Boot应用程序,它从YugaByte DB查询数据,并使用Web Sockets和jQuery将数据推送到网页。数据以固定间隔推送到网页,因此数据将自动刷新。主UI页面使用bootstrap.js显示包含图表和表格的仪表板。
我们为三个表创建实体类Total_Traffic,Window_Traffic并poi_traffic为所有实体扩展创建数据访问对象(DAO)接口CassandraRepository。例如,我们为TotalTrafficData实体创建DAO类,如下所示。
@Repository public interface TotalTrafficDataRepository extends CassandraRepository<TotalTrafficData>{ @Query("SELECT * FROM traffickeyspace.total_traffic WHERE recorddate = ?0 ALLOW FILTERING") Iterable<TotalTrafficData> findTrafficDataByDate(String date); |
为了连接到YugaByte数据库集群并获得数据库操作的连接,我们还编写了一个DatabaseConfig类。
请注意,目前仪表板不访问原始事件表,仅依赖于存储在聚合表中的数据。
总结
此应用程序是使用Confluent Kafka,KSQL,Spring Boot和YugaByte DB构建IoT应用程序的蓝图。虽然这篇文章专注于本地集群部署,但Kafka代理和YugaByte数据库节点可以在真正的集群部署中进行水平扩展,以获得更多的应用程序吞吐量和容错能力。可以在yb-iot-fleet-managementGitHub仓库中找到构建和运行应用程序的说明以及源代码。