MongoDB based on Morphia by hour by day aggregation operation method

  • 2020-12-21 18:13:59
  • OfStack

MongoDB is aggregated by day or hour

demand

Recently, we received a demand that the device state under the user account should be aggregated according to the day and hour respectively, so as to draw the trend chart of the device state on this basis.
The realization idea is to start the timing task, aggregate the device status data of each user according to hours and days respectively, and store them in the database for subsequent query by users.
The technology stacks involved are as follows: Spring Boot , MongoDB,Morphia .

The data model


@Data
@Builder
@Entity(value = "rawDevStatus", noClassnameStored = true)
//  Device status index 
@Indexes({
    //  Set the data timeout (TTL . MongoDB According to the TTL Delete data in the background )
    @Index(fields = @Field("time"), options = @IndexOptions(expireAfterSeconds = 3600 * 24 * 72)),
    @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
})
public class RawDevStatus {
  @Id
  @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
  private ObjectId objectId;
  private String userId;
  private Instant time;
  @Embedded("points")
  List<Point> protocolPoints;
  @Data
  @AllArgsConstructor
  public static class Point {
    /**
     *  Protocol type 
     */
    private Protocol protocol;
    /**
     *  The total number of devices 
     */
    private Integer total;
    /**
     *  On-line number of equipment 
     */
    private Integer onlineNum;
    /**
     *  Number of devices in the enabled state 
     */
    private Integer enableNum;
  }
}

The above code is the device state entity class, where device state data is distinguished by the protocol to which the device belongs.


@Data
@Builder
@Entity(value = "aggregationDevStatus", noClassnameStored = true)
@Indexes({
    @Index(fields = @Field("expireAt"), options = @IndexOptions(expireAfterSeconds = 0)),
    @Index(fields = {@Field("userId"), @Field(value = "time", type = IndexType.DESC)})
})
public class AggregationDevStatus {
  @Id
  @JsonProperty(access = JsonProperty.Access.WRITE_ONLY)
  private ObjectId objectId;
  /**
   *  The user ID
   */
  private String userId;
  /**
   *  The total number of devices 
   */
  private Double total;
  /**
   *  On-line number of equipment 
   */
  private Double onlineNum;
  /**
   *  Number of devices in the enabled state 
   */
  private Double enableNum;
  /**
   *  Aggregate type ( By hour or by day )
   */
  @Property("aggDuration")
  private AggregationDuration aggregationDuration;
  private Instant time;
  /**
   *  Dynamically set the document expiration time 
   */
  private Instant expireAt;
}

The code above is the expected aggregation result, where two indexes are built :(1) the timeout index; (2) Composite index. The program will query the device status according to the user name and time and aggregate the results.

An introduction to the polymerization operator

An aggregation operation is similar to a pipeline, in which the intermediate result produced by each step in the pipeline serves as the input source for the next step and the final output of the aggregation result.

This aggregation mainly involves the following operations:

The & # 8226; $project: Specifies fields in the output document.
The & # 8226; $unwind: Split the array in the data;
The & # 8226; match: Select the document data to process;
The & # 8226; group: Aggregate results are grouped according to key.

Original aggregate statement


db.getCollection('raw_dev_status').aggregate([
  {$match:
    {
      time:{$gte: ISODate("2019-06-27T00:00:00Z")},
    }
  },
  {$unwind: "$points"},
  {$project:
    {
      userId:1,points:1,
      tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
    }
  },
  {$project:
    {
      userId:1,points:1,
      groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
    }
  },
  {$group:
    {
      _id:{user_id:'$userId', cal_time:'$groupTime'},
      devTotal:{'$avg':'$points.total'},
      onlineTotal:{'$avg':'$points.onlineNum'},
      enableTotal:{'$avg':'$points.enableNum'}
    }
  },
])

The above code aggregates the data by the hour to introduce the processing idea step by step:

(1) $match

Based on hourly aggregated data, the data is initially filtered because only nearly 24 hours of aggregated results need to be obtained.

(2) $unwind

The device state in raw_dev_status is a protocol-distinguished array, so it needs to be expanded for filtering in the next step.

(3) $project


  {$project:
    {
      userId:1,points:1,
      tmp: {$dateToString: { format: "%Y:%m:%dT%H:00:00Z", date: "$time" } }
    }
  }

Select the data to be output, respectively: userId,points As well as tmp.

Note that in order to aggregate by time, the $time attribute is manipulated to extract the %Y:%m:%dT%H time information to $tmp as the basis for the next step.

If you want to aggregate by day, the format data can be modified to :%Y:%m:%dT00:00:00Z Can meet the requirements.

(4) $project


  {$project:
    {
      userId:1,points:1,
      groupTime: {$dateFromString: { dateString: "$tmp", format: "%Y:%m:%dT%H:%M:%SZ", } }
    }
  }

Because the project operation in the previous step, tmp, is string data, the final aggregation result requires a timestamp (mainly lazy, and does not want to do conversion operations in the program).
Therefore, $tmp is manipulated here to convert to time-type data, namely groupTime.

(5) $group

Categorize the aggregate results and generate the final output.


 {$group:
    {
      #  According to the _id Perform grouping operations based on `user_id` As well as `$groupTime`
      _id:{user_id:'$userId', cal_time:'$groupTime'},
      #  Find the average of the total number of equipment 
      devTotal:{'$avg':'$points.total'},
      #  Find the average value of the on-line number of devices 
      onlineTotal:{'$avg':'$points.onlineNum'},
      # ...
      enableTotal:{'$avg':'$points.enableNum'}
    }
  }

The code

Here ODM chooses Morphia, or MongoTemplate, and the principle is similar.


 /**
   *  Create aggregation conditions 
   *
   * @param pastTime    Past time 
   * @param dateToString  Formatted string (%Y:%m:%dT%H:00:00Z or %Y:%m:%dT00:00:00Z)
   * @return  Polymerization conditions 
   */
  private AggregationPipeline createAggregationPipeline(Instant pastTime, String dateToString, String stringToDate) {
    Query<RawDevStatus> query = datastore.createQuery(RawDevStatus.class);
    return datastore.createAggregation(RawDevStatus.class)
        .match(query.field("time").greaterThanOrEq(pastTime))
        .unwind("points", new UnwindOptions().preserveNullAndEmptyArrays(false))
        .match(query.field("points.protocol").equal("ALL"))
        .project(Projection.projection("userId"),
            Projection.projection("points"),
            Projection.projection("convertTime",
                Projection.expression("$dateToString",
                    new BasicDBObject("format", dateToString)
                        .append("date", "$time"))
            )
        )
        .project(Projection.projection("userId"),
            Projection.projection("points"),
            Projection.projection("convertTime",
                Projection.expression("$dateFromString",
                    new BasicDBObject("format", stringToDate)
                        .append("dateString", "$convertTime"))
            )
        )
        .group(
            Group.id(Group.grouping("userId"), Group.grouping("convertTime")),
            Group.grouping("total", Group.average("points.total")),
            Group.grouping("onlineNum", Group.average("points.onlineNum")),
            Group.grouping("enableNum", Group.average("points.enableNum"))
        );
  }
  /**
   *  Get aggregate results 
   *
   * @param pipeline  Polymerization conditions 
   * @return  Aggregate the results 
   */
  private List<AggregationMidDevStatus> getAggregationResult(AggregationPipeline pipeline) {
    List<AggregationMidDevStatus> statuses = new ArrayList<>();
    Iterator<AggregationMidDevStatus> resultIterator = pipeline.aggregate(
        AggregationMidDevStatus.class, AggregationOptions.builder().allowDiskUse(true).build());
    while (resultIterator.hasNext()) {
      statuses.add(resultIterator.next());
    }
    return statuses;
  }
  //......................................................................................
  //  Get aggregate results ( Omit some code )
  AggregationPipeline pipeline = createAggregationPipeline(pastTime, dateToString, stringToDate);
  List<AggregationMidDevStatus> midStatuses = getAggregationResult(pipeline);
  if (CollectionUtils.isEmpty(midStatuses)) {
    log.warn("Can not get dev status aggregation result.");
    return;
  }

conclusion


Related articles: