Flink自定义MySQLSource读取MySQL数据
2020-05-31 本文已影响0人
喵星人ZC
先查看数据库连接池实现MySQL连接池
MySQL表student的实体采用case class定义
object Domain {
case class Access(time: Long, domain: String, traffic: Long)
case class Student(id: Int, name: String, age: Int)
}
实现RichSourceFunction来自定义MySQLSource
package com.zc.bigdata.flink02
import java.sql.{Connection, PreparedStatement}
import com.zc.bigdata.bean.Domain.Student
import com.zc.bigdata.utils.JDBCPoolUtils
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
class MySQLSource extends RichSourceFunction[Student] {
var conn: Connection = _
var pstmt: PreparedStatement = _
var running = true
override def open(parameters: Configuration): Unit = {
conn = JDBCPoolUtils.getConnection
pstmt = conn.prepareStatement("select * from student")
}
override def run(ctx: SourceFunction.SourceContext[Student]): Unit = {
val rs = pstmt.executeQuery()
while (rs.next()) {
val student = Student(rs.getInt("id"), rs.getString("name"), rs.getInt("age"))
ctx.collect(student)
}
}
override def cancel(): Unit = {
running = false
}
override def close(): Unit = {
JDBCPoolUtils.close(pstmt, conn)
}
}
StreamApp
package com.zc.bigdata.flink02
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
object StreamApp {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
/* env.addSource(new AccessSource)//.setParallelism(3) java.lang.IllegalArgumentException: The maximum parallelism of non parallel operator must be 1.
.print()*/
/* env.addSource(new AccessSource02).setParallelism(3)
.print()*/
env.addSource(new MySQLSource).print()
env.execute(this.getClass.getSimpleName)
}
}
image.png