使用Confluent Kafka,KSQL,Spring Boot和分布式SQL开发物联网应用程序


展示了如何集成Confluent Kafka,KSQL,Spring Boot和YugaByte DB来开发用于管理物联网(IoT)传感器数据的应用程序。

场景 - 支持物联网的车队管理
一家货运公司希望跟踪其在全国范围内运送货物的物联网车辆。车辆属于不同类型(例如18轮车,公共汽车,大型卡车),并遵循3条交付路线(Route-37,Route-82,Route-43)。特别是,该公司希望跟踪:

  • 每条交付路线的车辆类型的总体分布。
  • 最近(例如,在过去30秒内)每个货物交付路线的这些车辆类型的子集。
  • 道路封闭附近的车辆清单,以便他们可以预测交货延误。

应用架构
除了Confluent Kafka作为流媒体平台,该应用程序还具有以下组件:

  • 数据存储: YugaByte DB用于存储来自Kafka流的原始事件以及来自KSQL数据处理器的聚合。
  • 数据生成器:用于模拟写入Kafka流的车辆事件的程序。
  • 数据处理器: 从Data Producer读取KSQL,计算聚合并将结果存储在Data Store中。
  • Data Dashboard: Spring Boot应用程序,使用Web套接字,jQuery和Bootstrap显示数据处理器的输出。

下面是显示这些组件如何组合在一起的架构图。我们称之为Confluent Kafka,KSQL和YugaByte DB堆栈或CKY堆栈。
我们现在将详细介绍这些组件中的每一个。
数据存储
该层存储所有用户数据。YugaByte DB用作数据库,YugaByte云查询语言(YCQL)用作数据库API。所有数据都存储在密钥空间TrafficKeySpace中。有一个Origin_Table用于存储原始事件的表。

CREATE TABLE TrafficKeySpace.Origin_Table (
   vehicleId text, 
   routeId text, 
   vehicleType text, 
   longitude text, 
   latitude text, 
   timeStamp timestamp, 
   speed double
   fuelLevel double
   PRIMARY KEY ((vehicleId), timeStamp)
) WITH default_time_to_live = 3600;

请注意default_time_to_live设置为3600秒的值,以确保原始事件在1小时后自动删除。这是为了确保原始事件不会消耗数据库中的所有存储,并且在计算聚合后不久就会有效地从数据库中删除。

有三个表用于保存用于面向用户的显示的数据:

  • Total_Traffic 交通信息
  • Window_Traffic 最后30秒的流量和
  • poi_traffic 对于兴趣点附近的交通(道路封闭)。

数据处理器不断更新这些表,仪表板从中读取。
以下是这些表:
CREATE TABLE TrafficKeySpace.Total_Traffic (
   routeId text, 
   vehicleType text, 
   totalCount bigint, 
   timeStamp timestamp, 
   recordDate text, 
   PRIMARY KEY (routeId, recordDate, vehicleType)
);

CREATE TABLE TrafficKeySpace.Window_Traffic (
   routeId text, 
   vehicleType text, 
   totalCount bigint, 
   timeStamp timestamp, 
   recordDate text, 
   PRIMARY KEY (routeId, recordDate, vehicleType)
);

CREATE TABLE TrafficKeySpace.Poi_Traffic(
   vehicleid text, 
   vehicletype text, 
   distance bigint, 
   timeStamp timestamp, 
   PRIMARY KEY (vehicleid)
);

数据生产者
这包含生成模拟测试数据并将其发布到Kafka主题iot-data-event的程序。这模拟了使用现实世界中的消息代理从连接的车辆接收的数据。
单个数据点是JSON有效负载,如下所示:

{
  "vehicleId":"0bf45cac-d1b8-4364-a906-980e1c2bdbcb",
 
"vehicleType":"Taxi",
 
"routeId":"Route-37",
 
"longitude":"-95.255615",
 
"latitude":"33.49808",
 
"timestamp":"2017-10-16 12:31:03",
 
"speed":49.0,
 
"fuelLevel":38.0
}

消费者读取上面的iot-data-event主题,将每个这样的事件转换为YCQL INSERT语句,然后调用YugaByte DB持久化到事件表TrafficKeySpace.Origin_Table。

数据处理器
KSQL是Apache Kafka的流式SQL引擎。它为Kafka上的流处理提供了一个易于使用但功能强大的交互式SQL接口,无需使用Java或Python等编程语言编写代码。它支持广泛的流操作,包括数据过滤,转换,聚合,连接,窗口和会话。
使用KSQL的第一步是STREAM从原始事件创建一个iot-data-event如下所示。

CREATE STREAM traffic_stream (
           vehicleId varchar,
           vehicleType varchar,
           routeId varchar,
           timeStamp varchar,
           latitude varchar,
           longitude varchar)
    WITH (
           KAFKA_TOPIC='iot-data-event',
           VALUE_FORMAT='json',
           TIMESTAMP='timeStamp',
           TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss');

现在可以在上面的流上运行各种聚合/查询,每种类型的查询的结果存储在它自己的新Kafka主题中。此应用程序使用3个此类查询/主题。此后,Kafka Connect YugaByte DB Sink Connector读取这3个主题,并将结果保存到YugaByte DB中的3个相应表中。

CREATE TABLE total_traffic
     WITH ( PARTITIONS=1,
            KAFKA_TOPIC='total_traffic',
            TIMESTAMP='timeStamp',
            TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss') AS
     SELECT routeId,
            vehicleType,
            count(vehicleId) AS totalCount,
            max(rowtime) AS timeStamp,
            TIMESTAMPTOSTRING(max(rowtime), 'yyyy-MM-dd') AS recordDate
     FROM traffic_stream
     GROUP BY routeId, vehicleType;

CREATE TABLE window_traffic
     WITH ( TIMESTAMP='timeStamp',
            KAFKA_TOPIC='window_traffic',
            TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss',
            PARTITIONS=1) AS
     SELECT routeId,
            vehicleType,
            count(vehicleId) AS totalCount,
            max(rowtime) AS timeStamp,
          TIMESTAMPTOSTRING(max(rowtime), 'yyyy-MM-dd') AS recordDate
     FROM traffic_stream
     WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS)
     GROUP BY routeId, vehicleType;

CREATE STREAM poi_traffic
      WITH ( PARTITIONS=1,
             KAFKA_TOPIC='poi_traffic',
             TIMESTAMP='timeStamp',
             TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss') AS
      SELECT vehicleId,
             vehicleType,
             cast(GEO_DISTANCE(cast(latitude AS double),cast(longitude AS double),33.877495,-95.50238,'KM') AS bigint) AS distance,
             timeStamp
      FROM traffic_stream
      WHERE GEO_DISTANCE(cast(latitude AS double),cast(longitude AS double),33.877495,-95.50238,'KM') < 30;

所有Kafka Connect YugaByte DB Sink Connector用于存储两个原始事件以及集合数据(这是使用KSQL生成)。它计算如下:

  • 到目前为止所有车辆和货物的车辆类型和装运路线细分。
  • 仅针对有效货件计算上述细分。这是通过计算最近30秒的车辆类型和装运路线的细分来完成的。
  • 检测距离给定兴趣点(POI)20英里范围内的车辆,这表示道路封闭。

数据仪表板
这是一个Spring Boot应用程序,它从YugaByte DB查询数据,并使用Web SocketsjQuery将数据推送到网页。数据以固定间隔推送到网页,因此数据将自动刷新。主UI页面使用bootstrap.js显示包含图表和表格的仪表板。
我们为三个表创建实体类Total_Traffic,Window_Traffic并poi_traffic为所有实体扩展创建数据访问对象(DAO)接口CassandraRepository。例如,我们为TotalTrafficData实体创建DAO类,如下所示。

@Repository
public interface TotalTrafficDataRepository extends CassandraRepository<TotalTrafficData>{

     @Query("SELECT * FROM traffickeyspace.total_traffic WHERE recorddate = ?0 ALLOW FILTERING")
     Iterable<TotalTrafficData> findTrafficDataByDate(String date);

为了连接到YugaByte数据库集群并获得数据库操作的连接,我们还编写了一个DatabaseConfig类。
请注意,目前仪表板不访问原始事件表,仅依赖于存储在聚合表中的数据。

总结
此应用程序是使用Confluent Kafka,KSQL,Spring Boot和YugaByte DB构建IoT应用程序的蓝图。虽然这篇文章专注于本地集群部署,但Kafka代理和YugaByte数据库节点可以在真正的集群部署中进行水平扩展,以获得更多的应用程序吞吐量和容错能力。可以在yb-iot-fleet-managementGitHub仓库中找到构建和运行应用程序的说明以及源代码。​​​​​​​