使用MongoDB存储时间序列数据 - DAC


了解如何使用Java + Spring示例在MongoDB中优化时序数据的存储。

什么是时间序列数据?
引用维基百科,“ 时间序列是按时间顺序索引(或列出或绘制)的一系列数据点 ”。但这实际上是什么意思?让我们举个例子。
考虑交通工程公司的情况。在特定的用例中,该公司需要收集横穿特定路口的确切数量的汽车的数据,以便管理交通灯算法。通过交叉路口的每辆汽车都将被报告回系统。这种事件就是我们所说的数据时间点。经过交叉路口的多辆汽车就是我们所说的数据“系列”。
时间序列数据可能以不规则的速率生成-在我们的示例中,它仅在汽车经过时才出现-或可以以固定的时间间隔捕获-例如每秒的设备测量值。

Mongo数据读取
假设我们要每隔1秒收集一次设备温度。在典型的MongoDB文档中存储该信息看起来像这样:

{
    "_id" : ObjectId(),
   
"temperature" : 90.50,
   
"deviceId" : "1",
   
"date" : ISODate("2019–11–10T00:00:01Z")
},
{
   
"_id" : ObjectId(),
   
"temperature" : 92.50,
   
"deviceId" : "1",
   
"date" : ISODate("2019–11–10T00:00:02Z")
}

每分钟产生60个文档,每小时产生3600个文件,每天产生86400个文档,这还只是一台设备!这种方法有明显的缺陷,并且对性能和磁盘使用率有重大影响。
必须有更好的解决方案…

Mongo时间序列,又称基于大小的存储桶
MongoDB允许我们创建一个文档来存储多个连续的数据读取。在我们设备的温度收集情况下,此类文件要求:

  • id —文档的ID(MongoDB的ObjectId)
  • deviceId —查询样品时文档分组依据的元素
  • bucketSize -文件中的最大样本量
  • date—样品的日期;我们可以将其存储在此处以简化聚合
  • first —在存储桶中读取的最旧数据的时间戳
  • last —存储桶中读取的最新数据的时间戳
  • samples —数据容器

我们的最终时间序列文件将类似于以下文件:
{
    _id: ObjectId(),
    deviceid: 1,
    date: ISODate("2019-11-10"),
    bucketSize: 4,
    first: 1573833152,
    last: 1573833155,
    samples : [
       { val: 10, time: 1573833152},
       { val: 15, time : 1573833153},
       { val: 14, time: 1573833154},
       { val: 20, time : 1573833155}
   ]
}

每个新的从设备读取得数值都将附加到这个文档,直到存储桶大小达到1000个样本的阈值(请参阅:bucketSize值)。upsert:true一旦达到阈值,将通过该子句创建一个新文档。
在upsert过程中,我们将sample对象推送至samples,first将其更新为最小值,last最大值,并bucketSize为每个新条目增加1。

Java和Spring中基于大小的存储
好了,足够多的理论,让我们现在付诸实践。我们将使用Spring的MongoTemplate将入站温度读取插入MongoDB。让我们实现上面给出的示例。
我们的DeviceTemperature模型将如下所示:

@lombok.Data
class DeviceTemperature {
   private Integer deviceId;
   private Integer temperature;
   private Long timestamp;
   private LocalDate date;
}
DeviceTemperature deviceTemperature = new DeviceTemperature();
deviceTemperature.setDeviceId(1);
deviceTemperature.setTemperature(12);
deviceTemperature.setTimestamp(1573833152L);
deviceTemperature.setDate(LocalDate.parse(“2019–11–10”);

建立好模型后,我们需要创建criteria和query:

Criteria criteria = Criteria.where("deviceId").is(deviceTemperature.getDeviceId())
    .and(
"bucketSize").lt(1000)
    .and(
"date").is(deviceTemperature.getDate());
Query query = Query.query(criteria);

上面的代码创建了一个查询,其中包含用于上载特定MongoDB文档的条件。

现在创建一个update命令:

Integer deviceId = deviceTemperature.getDeviceId();
Update update = new Update()
                   .push("samples", deviceTemperature)
                   .inc(
"bucketSize", deviceId)
                   .min(
"first", deviceTemperature.getTimestamp())
                   .max(
"last", deviceTemperature.getTimestamp());

最后调用mongoTemplate 实现upset数据:

mongoTemplate.upsert(query, update, DeviceTemperature.class)

这就是使用Java和Spring以时间序列的方式存储数据所需的一切。