Understanding Neo4j Pipes

2020-04-27  本文已影响0人  中科院_白乔
match (m)-[x]-(n) where m.age>30 return m.name, m.age

如上查询语句产生如下查询计划:


image.png
abstract class PipeWithSource(source: Pipe) extends Pipe {
  override def createResults(state: QueryState): Iterator[ExecutionContext] = {
    val sourceResult = source.createResults(state)

    val decoratedState = state.decorator.decorate(this, state)
    decoratedState.setExecutionContextFactory(executionContextFactory)
    val result = internalCreateResults(sourceResult, decoratedState)
    state.decorator.decorate(this, result)
  }

  protected def internalCreateResults(state: QueryState): Iterator[ExecutionContext] =
    throw new UnsupportedOperationException("This method should never be called on PipeWithSource")

  protected def internalCreateResults(input:Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext]
  private[pipes] def testCreateResults(input:Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] =
    internalCreateResults(input, state)
}
case class ProduceResultsPipe(source: Pipe, columns: Seq[String])  (val id: Id = Id.INVALID_ID) extends PipeWithSource(source) 
case class ProjectionPipe(source: Pipe, projection: CommandProjection)
                         (val id: Id = Id.INVALID_ID) extends PipeWithSource(source) {

  projection.registerOwningPipe(this)

  protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
    if (projection.isEmpty)
      input
    else {
      input.map {
        ctx =>
          projection.project(ctx, state)
          ctx
      }
    }
  }
}
case class AllNodesScanPipe(ident: String)(val id: Id = Id.INVALID_ID) extends Pipe {

  protected def internalCreateResults(state: QueryState): Iterator[ExecutionContext] = {
    val baseContext = state.newExecutionContext(executionContextFactory)
    state.query.nodeOps.all.map(n => executionContextFactory.copyWith(baseContext, ident, n))
  }
}
case class ExpandAllPipe(source: Pipe,
                         fromName: String,
                         relName: String,
                         toName: String,
                         dir: SemanticDirection,
                         types: LazyTypes)
                        (val id: Id = Id.INVALID_ID) extends PipeWithSource(source) {

  protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] = {
    input.flatMap {
      row =>
        getFromNode(row) match {
          case n: NodeValue =>
            val relationships: Iterator[RelationshipValue] = state.query.getRelationshipsForIds(n.id(), dir, types.types(state.query))
            relationships.map { r =>
                val other = r.otherNode(n)
                executionContextFactory.copyWith(row, relName, r, toName, other)
            }

          case Values.NO_VALUE => None

          case value => throw new InternalException(s"Expected to find a node at '$fromName' but found $value instead")
        }
    }
  }
}
case class FilterPipe(source: Pipe, predicate: Expression)
                     (val id: Id = Id.INVALID_ID) extends PipeWithSource(source) {

  predicate.registerOwningPipe(this)

  protected def internalCreateResults(input: Iterator[ExecutionContext], state: QueryState): Iterator[ExecutionContext] =
    input.filter(ctx => predicate(ctx, state) eq Values.TRUE)
}
上一篇下一篇

猜你喜欢

热点阅读