二十 Gremlin 如何并发执行
Gremlin 多跳的查询效率问题
从上一篇我们知道,gremlin 的执行引擎,是一个 dfs 模式,对于
g.V().out().out() 这样的查询, 会从 图中开始节点, A,B,C
依次调用每个节点的邻居矩阵, getAdj(A|B|C), 获得level1 之后,然后再依次调用, 这样遍历的效率非特别慢;
之前在 paypal 上看到 他们的工程师说道对gremlin 的执行引擎进行了并行化的优化;我之前一直想不通,直到从上一篇梳理了执行模式之后, 目前我想到优化的tip就是在 processNextStart 的时候,做并发;
以 hugegVertexStep 为例
hugeVertexStep 是hugegraph 获取节点的邻居节点的核心类, 关键方法在flatMap, 也就是给定一个节点,会 返回 他的邻居节点; Vertexstep继承自 flatMapStep:
@Override
protected Traverser.Admin<E> processNextStart() {
while (true) {
if (this.iterator.hasNext()) {
return this.head.split(this.iterator.next(), this);
} else {
closeIterator();
this.head = this.starts.next();
this.iterator = this.flatMap(this.head);
}
}
}
他的实现方式就是 start 节点 获取 邻居节点,然后一个个的返回给下一层; 每次调用flatMap, hugegVertex 就是对 存储层的一次访问;
那么这里我们会自然而然的想到一种并发的查询;
第一种思路,就是先收集所有start 节点,然后并发的去查询邻居节点;
private Traverser.Admin<Vertex> head = null;
private Iterator<E> iterator = EmptyIterator.instance();
private ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() *2);
//可以并发执行
@Override
protected Traverser.Admin<E> processNextStart() {
while (true) {
if (this.iterator.hasNext()) {
return this.head.split(this.iterator.next(), this);
} else {
closeIterator();
// 这里可以并发执行,也可以用inList
this.head = this.starts.next();
this.iterator = this.flatMap(this.head);
List<Future<Iterator<E>>> futures = Lists.newArrayList();
while(this.starts.hasNext()){
Traverser.Admin<Vertex> start = this.starts.next();
Future<Iterator<E>> its = executorService.submit(()->{
Iterator<E> end = this.flatMap(start);
return end;
});
futures.add(its);
}
try {
for(Future<Iterator<E>> its: futures) {
this.iterator = Iterators.concat(iterator, its.get());
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}
第二种思路,就是batch模式,依赖与底层的存储层;
也是收集了所有的 开始节点, 我们能不能提供一个 getAdjByBatchStarts(List starts) , 也就是一次拿出来所有的邻居节点,
如果底层的数据结构支持,类似与 id in list 这种功能的化,也是一个很好的优化点,效果可能比多线程并发要好。
但是目前来看,hugegraph,对于 in List 的实现,都是flattern,我估计是为了适配不同的底层存储,如果我们不flattern效果应该不错;
尝试: g.V().has('ticker','000002').out().out().out().out().profile()
1, 直接 one by one 的模式, 11s 左右;
2, inList, 6-7s 左右, 随着 out 展开的数据越多,这个差距会越来越明显; 数据量很少的时候,差距不是很大;
测试需要修改的代码:
HugeVertextStep
private Traverser.Admin<Vertex> head = null;
private Iterator<E> iterator = EmptyIterator.instance();
private Set<Id> sourceIds = Sets.newHashSet();
//可以并发执行
@Override
protected Traverser.Admin<E> processNextStart() {
while (true) {
if (this.iterator.hasNext()) {
return this.head.split(this.iterator.next(), this);
} else {
closeIterator();
// 这里可以并发执行,也可以用inList
this.head = this.starts.next();
sourceIds.add((Id) this.head.get().id());
while(this.starts.hasNext()){
Traverser.Admin<Vertex> start = this.starts.next();
sourceIds.add((Id)start.get().id());
}
//使用sourceIds
this.iterator = this.flatMap(this.head);
}
}
}
public static ConditionQuery stateEdgesQueryWithIDS(Set<Id> sourceIds,
Directions direction,
Id... edgeLabels) {
ConditionQuery query = new ConditionQuery(HugeType.EDGE);
// Edge source vertex
// sourceIds.forEach(sourceVertex->{
// query.eq(HugeKeys.OWNER_VERTEX, sourceVertex);
// });
query.query(Condition.in(HugeKeys.OWNER_VERTEX, Lists.newArrayList(sourceIds)));
// Edge direction
if (direction == Directions.BOTH) {
query.query(Condition.or(
Condition.eq(HugeKeys.DIRECTION, Directions.OUT),
Condition.eq(HugeKeys.DIRECTION, Directions.IN)));
} else {
assert direction == Directions.OUT || direction == Directions.IN;
query.eq(HugeKeys.DIRECTION, direction);
}
// Edge labels
if (edgeLabels.length == 1) {
query.eq(HugeKeys.LABEL, edgeLabels[0]);
} else if (edgeLabels.length > 1) {
query.query(Condition.in(HugeKeys.LABEL,
Arrays.asList(edgeLabels)));
} else {
assert edgeLabels.length == 0;
}
return query;
}
ConditionQueryFlatten
public static List<ConditionQuery> flatten(ConditionQuery query) {
if (query.isFlattened() && !query.mayHasDupKeys(SPECIAL_KEYS)) {
return Arrays.asList(query);
}
List<ConditionQuery> queries = new ArrayList<>();
// Flatten IN/NOT_IN if needed
Set<Condition> conditions = InsertionOrderUtil.newSet();
// for (Condition condition : query.conditions()) {
// Condition cond = flattenIn(condition);
// if (cond == null) {
// // Process 'XX in []'
// return ImmutableList.of();
// }
// conditions.add(cond);
// }
for (Condition condition : query.conditions()) {
// Condition cond = flattenIn(condition);
// if (cond == null) {
// // Process 'XX in []'
// return ImmutableList.of();
// }
conditions.add(condition);
}
GraphTransaction -> optimizeQuery
comment verifyEdgesConditionQuery() method
CassandraSerializer: rewrite query
//查询edge 的时候,增加InList
@Override
protected Query writeQueryEdgeCondition(Query query) {
ConditionQuery result = (ConditionQuery) query;
for (Condition.Relation r : result.relations()) {
Object value = r.value();
if(value instanceof List){
serializeListValue(r,value);
}else{
serializeSingleValue(r,value);
}
}
return null;
}
private void serializeSingleValue(Condition.Relation r, Object value) {
if (value instanceof Id) {
if (r.key() == HugeKeys.OWNER_VERTEX ||
r.key() == HugeKeys.OTHER_VERTEX) {
// Serialize vertex id
r.serialValue(this.writeId((Id) value));
}
else {
// Serialize label id
r.serialValue(((Id) value).asObject());
}
}
else if (value instanceof Directions) {
r.serialValue(((Directions) value).type().code());
}
}
private void serializeListValue(Condition.Relation r, Object value) {
assert value instanceof List;
List<Object> lv = (List)value;
Object singleValue = lv.get(0);
if(singleValue instanceof Id){
if (r.key() == HugeKeys.OWNER_VERTEX ||
r.key() == HugeKeys.OTHER_VERTEX) {
List<Object> ids = Lists.newArrayList();
for(Object val : lv){
ids.add(this.writeId((Id) val));
}
// Serialize vertex id
r.serialValue(ids);
}
else {
// Serialize label id
List<Object> ids = Lists.newArrayList();
for(Object val : lv){
ids.add(((Id) val).asObject());
}
r.serialValue(ids);
}
}else if (singleValue instanceof Directions) {
r.serialValue(((Directions) singleValue).type().code());
}
}
cassandraTable:
case IN:
if(value instanceof List){
return QueryBuilder.in(key, (List)value);
}else{
return QueryBuilder.in(key,value);
}
思路现在有了,可以具体去实现一下,然后测试性能到底差多少。
存在的问题
- return this.head.split(this.iterator.next(), this); head 与 iterator 对应,在path 的时候才能得到正确的信息,需要自己维护,head 与iterator的关系;
- 开启 8个线程 查询 start 之后(BFS),性能不如 in List, 和原来的 one vertex one query 的模式。 有点奇怪;
最终的方案:
- Iterators.concat(iterator, its.get()) 会耗时,用map 存起来;
- map 存起来 head - > adj 的关系,那么路径信息就会保留;
private Traverser.Admin<Vertex> head = null;
private Iterator<E> iterator = EmptyIterator.instance();
private static ExecutorService executorService = Executors.newFixedThreadPool(8);
private Map<Traverser.Admin<Vertex>, Iterator<E>> adj = new HashMap<>();
//可以并发执行
@Override
protected Traverser.Admin<E> processNextStart() {
while (true) {
if(!adj.isEmpty()){
Set<Traverser.Admin<Vertex>> toRemove = Sets.newHashSet();
for(Traverser.Admin<Vertex> head: adj.keySet()){
Iterator<E> iterator = adj.get(head);
if(iterator.hasNext()){
return head.split(iterator.next(), this);
}else{
toRemove.add(head);
}
}
toRemove.forEach(k-> adj.remove(k));
}
else {
closeIterator();
// 这里可以并发执行,也可以用inList
this.head = this.starts.next();
this.iterator = this.flatMap(this.head);
adj.put(head,this.iterator);
List<Future<Pair<Traverser.Admin<Vertex>,Iterator<E>>>> futures = Lists.newArrayList();
while(this.starts.hasNext()){
Traverser.Admin<Vertex> start = this.starts.next();
Future<Pair<Traverser.Admin<Vertex>,Iterator<E>>> its = executorService.submit(()->{
Iterator<E> end = this.flatMap(start);
return new ImmutablePair<>(start, end);
});
futures.add(its);
}
try {
for(Future<Pair<Traverser.Admin<Vertex>,Iterator<E>>> its: futures) {
Pair<Traverser.Admin<Vertex>,Iterator<E>> pair = its.get();
this.adj.put(pair.getKey(), pair.getValue());
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}