Flink自定义MySQLSource读取MySQL数据

2020-05-31  本文已影响0人  喵星人ZC

先查看数据库连接池实现MySQL连接池

MySQL表student的实体采用case class定义

object Domain {

  case class Access(time: Long, domain: String, traffic: Long)

  case class Student(id: Int, name: String, age: Int)

}

实现RichSourceFunction来自定义MySQLSource

package com.zc.bigdata.flink02

import java.sql.{Connection, PreparedStatement}

import com.zc.bigdata.bean.Domain.Student
import com.zc.bigdata.utils.JDBCPoolUtils
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}

class MySQLSource extends RichSourceFunction[Student] {

  var conn: Connection = _
  var pstmt: PreparedStatement = _

  var running = true

  override def open(parameters: Configuration): Unit = {
    conn = JDBCPoolUtils.getConnection
    pstmt = conn.prepareStatement("select * from student")
  }

  override def run(ctx: SourceFunction.SourceContext[Student]): Unit = {
    val rs = pstmt.executeQuery()
    while (rs.next()) {
      val student = Student(rs.getInt("id"), rs.getString("name"), rs.getInt("age"))
      ctx.collect(student)
    }

  }

  override def cancel(): Unit = {
    running = false
  }

  override def close(): Unit = {
    JDBCPoolUtils.close(pstmt, conn)
  }
}

StreamApp

package com.zc.bigdata.flink02

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

object StreamApp {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    /* env.addSource(new AccessSource)//.setParallelism(3)  java.lang.IllegalArgumentException: The maximum parallelism of non parallel operator must be 1.
       .print()*/

    /*    env.addSource(new AccessSource02).setParallelism(3)
          .print()*/


    env.addSource(new MySQLSource).print()

    env.execute(this.getClass.getSimpleName)

  }
}

image.png
上一篇下一篇

猜你喜欢

热点阅读