聊聊flink TableEnvironment的scan操作

2019-01-22  本文已影响19人  go4it

本文主要研究一下flink TableEnvironment的scan操作

实例

//Scanning a directly registered table
val tab: Table = tableEnv.scan("tableName")

//Scanning a table from a registered catalog
val tab: Table = tableEnv.scan("catalogName", "dbName", "tableName")

TableEnvironment.scan

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/TableEnvironment.scala

abstract class TableEnvironment(val config: TableConfig) {

  private val internalSchema: CalciteSchema = CalciteSchema.createRootSchema(false, false)
  private val rootSchema: SchemaPlus = internalSchema.plus()

  //......

  @throws[TableException]
  @varargs
  def scan(tablePath: String*): Table = {
    scanInternal(tablePath.toArray) match {
      case Some(table) => table
      case None => throw new TableException(s"Table '${tablePath.mkString(".")}' was not found.")
    }
  }

  private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = {
    require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.")
    val schemaPaths = tablePath.slice(0, tablePath.length - 1)
    val schema = getSchema(schemaPaths)
    if (schema != null) {
      val tableName = tablePath(tablePath.length - 1)
      val table = schema.getTable(tableName)
      if (table != null) {
        return Some(new Table(this, CatalogNode(tablePath, table.getRowType(typeFactory))))
      }
    }
    None
  }

  private def getSchema(schemaPath: Array[String]): SchemaPlus = {
    var schema = rootSchema
    for (schemaName <- schemaPath) {
      schema = schema.getSubSchema(schemaName)
      if (schema == null) {
        return schema
      }
    }
    schema
  }

  //......
}

小结

doc

上一篇下一篇

猜你喜欢

热点阅读