2019-01-16
2019-01-16 本文已影响0人
早安丶全世界
MongoDB聚合操作
MongoDB版本:1.8.6.RELEASE
需求:有一个会员消费记录表需要查询每个会员最后的消费时间
//会员手机号码列表
List<String> phoneList = memberList.stream().map(item -> MapUtils.getString(item, "phone")).collect(Collectors.toList());
//一次处理10万条
int max = 100000;
//一共需要处理次数
int times = phoneList.size() / max;
times = phoneList.size() % max == 0 ? times : times + 1;
//按照十万一条分页
List<List<String>> splitList = Stream.iterate(0, n -> n + 1).limit(times).parallel()
.map(idx -> phoneList.stream().skip(idx * max).limit(max).parallel().collect(Collectors.toList())).collect(Collectors.toList());
//分页去积分表查询会员最后一条消费记录
List<Map> resultList = splitList.parallelStream().map(item -> {
Criteria criteria = new Criteria();
criteria.and("phone").in(item);
MatchOperation match = Aggregation.match(criteria);
GroupOperation group = Aggregation.group("phone").max("orderTime").as("orderTime");
ProjectionOperation project = Aggregation.project( "phone","orderTime").and("_id").as("phone").andExclude("_id");
Aggregation aggregation = Aggregation.newAggregation(match, group, project);
List<Map> mapList = mongoTemplate.aggregate(aggregation, "member_point_document", Map.class).getMappedResults();
return mapList;
}).flatMap(List::stream).collect(Collectors.toList());
// 按照时间天分组 这样就可以减少插入会员的次数
Map<String, List<String>> orderTimeList = resultList.parallelStream().filter(item -> StringUtils.isNotEmpty(MapUtils.getString(item,"orderTime",""))).collect(Collectors.groupingBy(item -> MapUtils.getString(item, "orderTime"), Collectors.mapping(item -> MapUtils.getString(item, "phone"), Collectors.toList())));
注意:聚合操作的时候如果结果大于16M时需要使用游标的方式返回结果缓存在硬盘中
AggregationOptions.builder().outputMode(AggregationOptions.OutputMode.CURSOR).allowDiskUse(true).build());
如果想使用上面的方式必须要保证MongoDB的版本为1.9.0以上