大数据领域精选

【Spark 精选】源码阅读 — Scala 高级语法

2023-10-09  本文已影响0人  熊本极客

1.case 模式匹配

case 模式匹配的使用样例

# 1. 匹配特定的数据类型
def processValue(value: Any): String = value match {
  case s: String => s"String: $s"
  case i: Int => s"Int: $i"
  case d: Double => s"Double: $d"
  case _ => "Other"
}

val result1 = processValue("Hello") // 输出 "String: Hello"
val result2 = processValue(123) // 输出 "Int: 123"
val result3 = processValue(3.14) // 输出 "Double: 3.14"
val result4 = processValue(true) // 输出 "Other"


# 2.根据不同的输入执行不同的逻辑
def processInput(input: Any): String = input match {
  case 1 => "One"
  case "two" => "Two"
  case _: Double => "A Double"
  case _ => "Other"
}

val result1 = processInput(1) // 输出 "One"
val result2 = processInput("two") // 输出 "Two"
val result3 = processInput(3.14) // 输出 "A Double"
val result4 = processInput(true) // 输出 "Other"


# 3.解构数据结构
val tuple = (1, "two", 3.14)
val (a, b, c) = tuple // 解构元组
println(a) // 输出 1
println(b) // 输出 "two"
println(c) // 输出 3.14

val list = List(1, 2, 3, 4, 5)
val result = list.map {
  case x if x % 2 == 0 => "Even"
  case _ => "Odd"
}
println(result) // 输出 List("Odd", "Even", "Odd", "Even", "Odd")

spark-sql 源码中的 case 模式匹配AnalyzerResolveRelations

  object ResolveRelations extends Rule[LogicalPlan] {
    def resolveViews(plan: LogicalPlan): LogicalPlan = plan match {
      case view @ View(desc, isTempView, _, child) if !child.resolved =>
          // 省略 ...
      case p @ SubqueryAlias(_, view: View) =>
        p.copy(child = resolveViews(view))
      case _ => plan
    }
}

2.case 类

case 类的使用场景

// 定义一个 case class
case class Person(name: String, age: Int)

// 创建一个 Person 对象
val person1 = Person("Alice", 25)

// 复制一个 Person 对象,并修改部分属性
val person2 = person1.copy(age = 30)

// 打印 person1 和 person2
println(person1) // 输出 Person(Alice,25)
println(person2) // 输出 Person(Alice,30)

// 模式匹配
def processPerson(person: Person): String = person match {
  case Person(name, age) if age < 30 => s"$name is young"
  case Person(name, age) if age >= 30 => s"$name is old"
}

val result1 = processPerson(person1) // 输出 "Alice is young"
val result2 = processPerson(person2) // 输出 "Alice is old"

spark-sql 源码中的 case 类LogicalPlan 的子类 Filter

// case 类 Filter
case class Filter(condition: Expression, child: LogicalPlan)
  extends OrderPreservingUnaryNode with PredicateHelper {
  override def output: Seq[Attribute] = child.output

  override def maxRows: Option[Long] = child.maxRows

  override protected lazy val validConstraints: ExpressionSet = {
    val predicates = splitConjunctivePredicates(condition)
      .filterNot(SubqueryExpression.hasCorrelatedSubquery)
    child.constraints.union(ExpressionSet(predicates))
  }
}

// case 模式匹配
// EliminateOuterJoin的apply
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _, _)) =>
      val newJoinType = buildNewJoinType(f, j)
      if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
  }

3.嵌套函数

spark-sql 源码中的嵌套函数QueryPlan 的嵌套函数 mapExpressions

  def mapExpressions(f: Expression => Expression): this.type = {
    var changed = false

     // 嵌套函数A
    @inline def transformExpression(e: Expression): Expression = {
      val newE = CurrentOrigin.withOrigin(e.origin) {
        f(e)
      }
      if (newE.fastEquals(e)) {
        e
      } else {
        changed = true
        newE
      }
    }
    
    // 嵌套函数B
    def recursiveTransform(arg: Any): AnyRef = arg match {
      case e: Expression => transformExpression(e)    // 执行嵌套函数B
      case Some(value) => Some(recursiveTransform(value))
      case m: Map[_, _] => m
      case d: DataType => d // Avoid unpacking Structs
      case stream: Stream[_] => stream.map(recursiveTransform).force
      case seq: Iterable[_] => seq.map(recursiveTransform)
      case other: AnyRef => other
      case null => null
    }
    // 执行嵌套函数A
    val newArgs = mapProductIterator(recursiveTransform)

    if (changed) makeCopy(newArgs).asInstanceOf[this.type] else this
  }

4.偏函数

偏函数的使用样例

// 定义一个偏函数,只处理正整数和字符串类型的输入
val partialFunc: PartialFunction[Any, String] = {
  case i: Int if i > 0 => s"Positive integer: $i"
  case s: String => s"String: $s"
}

// applyOrElse接口接受两个参数:输入值和默认值。如果偏函数对输入值进行定义,则返回偏函数的结果;如果偏函数没有对输入值进行定义,则返回默认值。
println(partialFunc.applyOrElse(10, (x: Any) => "Not defined")) // 输出:Positive integer: 10
println(partialFunc.applyOrElse(-5, (x: Any) => "Not defined")) // 输出:Not defined
println(partialFunc.applyOrElse("hello", (x: Any) => "Not defined")) // 输出:String: hello
println(partialFunc.applyOrElse(3.14, (x: Any) => "Not defined")) // 输出:Not define

spark-sql 源码中的偏函数
AnalysisHelper 的函数 resolveOperatorsUp

  // 跳过已经分析过的rule,并递归获取子节点
  def resolveOperatorsUp(rule: PartialFunction[LogicalPlan, LogicalPlan]): LogicalPlan = {
    if (!analyzed) {
      AnalysisHelper.allowInvokingTransformsInAnalyzer {
        val afterRuleOnChildren = mapChildren(_.resolveOperatorsUp(rule))
        if (self fastEquals afterRuleOnChildren) {
          CurrentOrigin.withOrigin(origin) {
            // rule是偏函数,applyOrElse会执行这个函数
           // 如果偏函数对输入值进行定义,则返回偏函数的结果;如果偏函数没有对输入值进行定义,则返回默认值。
            rule.applyOrElse(self, identity[LogicalPlan])
          }
        } else {
          CurrentOrigin.withOrigin(origin) {
            val afterRule = rule.applyOrElse(afterRuleOnChildren, identity[LogicalPlan])
            afterRule.copyTagsFrom(self)
            afterRule
          }
        }
      }
    } else {
      self
    }
  }

Optimizer 的函数 ConvertToLocalRelation

// Optimizer 
object ConvertToLocalRelation extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {   // 下面函数的内容整体是偏函数,作为transform的入参
    case Project(projectList, LocalRelation(output, data, isStreaming))
        if !projectList.exists(hasUnevaluableExpr) =>
      val projection = new InterpretedMutableProjection(projectList, output)
      projection.initialize(0)
      LocalRelation(projectList.map(_.toAttribute), data.map(projection(_).copy()), isStreaming)

    case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
      LocalRelation(output, data.take(limit), isStreaming)

    case Filter(condition, LocalRelation(output, data, isStreaming))
        if !hasUnevaluableExpr(condition) =>
      val predicate = Predicate.create(condition, output)
      predicate.initialize(0)
      LocalRelation(output, data.filter(row => predicate.eval(row)), isStreaming)
  }
 // 省略...
}

  // TreeNode
  def transform(rule: PartialFunction[BaseType, BaseType]): BaseType = {
    transformDown(rule)
  }

  def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {
    val afterRule = CurrentOrigin.withOrigin(origin) {
      // rule是偏函数,applyOrElse会执行这个函数
      // 如果偏函数对输入值进行定义,则返回偏函数的结果;如果偏函数没有对输入值进行定义,则返回默认值。
      rule.applyOrElse(this, identity[BaseType])
    }

    if (this fastEquals afterRule) {
      // 获取子节点,递归执行transformDown
      mapChildren(_.transformDown(rule))
    } else {
      afterRule.copyTagsFrom(this)
      afterRule.mapChildren(_.transformDown(rule))
    }
  }

5.柯里化函数

柯里化函数的使用样例

如下案例中,add是一个柯里化函数,它接受两个参数 xy。通过部分应用的方式,我们先给 add 函数提供一个参数 1,然后返回一个新的函数 addOne,这个新函数只接受一个参数 y。最后,我们调用 addOne 函数,传递剩余参数 2,得到结果 3

def add(x:Int)(y:Int): Int = x + y

val addOne = add(1) _ // 部分应用,返回一个接受一个参数的新函数

val result = addOne(2) // 调用新函数,传递剩余参数

println(result) // 输出 3

spark-sql 源码中的柯里化函数ParseDriver 的方法 parse

  // 参数1是command,参数2是toResult
  protected def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
    logDebug(s"Parsing command: $command")
   
    // 使用参数command
    val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
    lexer.removeErrorListeners()
    lexer.addErrorListener(ParseErrorListener)

    val tokenStream = new CommonTokenStream(lexer)
    val parser = new SqlBaseParser(tokenStream)
    parser.addParseListener(PostProcessor)
    // 使用参数command 
    parser.addParseListener(UnclosedCommentProcessor(command, tokenStream))
    // 省略 ...
    try {
      try {
        // first, try parsing with potentially faster SLL mode
        parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
       // 使用参数toResult
       // parser里面包含参数command,parser再作为toResult函数的入参
        toResult(parser)
      }
      catch {
        case e: ParseCancellationException =>
          // 省略...
          // 使用参数toResult
          // parser里面包含参数command,parser再作为toResult函数的入参
          toResult(parser)
      }
    }
    // 省略 ...
  }

6.基于 Product 实现的 TreeNode

Product 的使用样例

// 导入Product接口
import scala.Product

// 定义一个元组类,继承自Product接口
class MyTuple(val first: Int, val second: String) extends Product {
  // 实现Product接口的抽象方法
  def productElement(n: Int): Any = n match {
    case 0 => first
    case 1 => second
    case _ => throw new IndexOutOfBoundsException(s"Tuple index out of range: $n")
  }

  // 实现Product接口的抽象方法
  def productArity: Int = 2

  // 重写toString方法
  override def toString: String = s"MyTuple($first, $second)"
}

// 创建一个MyTuple对象
val tuple = new MyTuple(42, "Hello")

// 获取元素值
val firstElement = tuple.productElement(0)
val secondElement = tuple.productElement(1)

// 获取元素数量
val arity = tuple.productArity

// 打印结果
println(firstElement)  // 输出 42
println(secondElement) // 输出 "Hello"
println(arity)         // 输出 2
println(tuple)         // 输出 "MyTuple(42, Hello)"

spark-sql 源码中的 Product:基于 Product 实现的 TreeNode

abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
  // 省略 ...
  protected def mapProductIterator[B: ClassTag](f: Any => B): Array[B] = {    // 入参是参数类型为B的scala function,返回值是Array[B]
    val arr = Array.ofDim[B](productArity)
    var i = 0
    while (i < arr.length) {
      arr(i) = f(productElement(i))    // productElement会执行传入的函数mp,然后f会执行apply
      i += 1
    }
    arr
  }  
  // 省略 ...
}

  private def mapChildren(
      f: BaseType => BaseType,
      forceCopy: Boolean): BaseType = {
    // 省略...

    val newArgs = mapProductIterator {    // mapProductIterator 的入参是下面的函数mp
      case arg: TreeNode[_] if containsChild(arg) =>
        // 省略...
      case Some(arg: TreeNode[_]) if containsChild(arg) =>
        // 省略...
      case m: Map[_, _] => m.mapValues {
        // 省略...
      }.view.force.toMap // `mapValues` is lazy and we need to force it to materialize
      case d: DataType => d // Avoid unpacking Structs
      case args: Stream[_] => args.map(mapChild).force // Force materialization on stream
      case args: Iterable[_] => args.map(mapChild)
      case nonChild: AnyRef => nonChild
      case null => null
    }
    // 省略...
  }
上一篇下一篇

猜你喜欢

热点阅读