SparkSQL DataFrame与MySQL增删改查那些事儿
2018-11-28 本文已影响117人
腾飞的大象
在使用Spark中通过各种算子计算完后各种指标后,一般都需要将计算好的结果数据存放到关系型数据库,比如MySQL和PostgreSQL等,随后配置到展示平台进行展现,花花绿绿的图表就生成了。下面我讲解一下,在Spark中如何通过c3p0连接池的方式对MySQL进行增加改查(CRUD),增加(Create),读取查询(Retrieve),更新(Update)和删除(Delete)。
项目github地址:spark-mysql
1.Create(增加)
case class CardMember(m_id:String,card_type:String,expire:Timestamp,duration:Int,is_sale:Boolean,date:Date,user:long,salary:Float)
val memberSeq = Seq(
CardMember(“member_2”,“月卡”,新时间戳(System.currentTimeMillis()),31,false,新日期(System.currentTimeMillis()),123223,0.32f),
CardMember(“member_1” “,”季卡“,新的时间戳(System.currentTimeMillis()),93,false,new Date(System.currentTimeMillis()),124224,0.362f)
)
val memberDF = memberSeq.toDF()
//把DataFrame存入到MySQL的中,如果数据库中不存在此表的话就会自动创建
MySQLUtils.saveDFtoDBCreateTableIfNotExist( “member_test”,memberDF)
2.Retrieve(读取查询)
//根据表名把MySQL中的数据表直接映射成DataFrame
MySQLUtils.getDFFromMysql(hiveContext,“member_test”,null)
3.Update(更新)
//根据主键更新指定字段,如果没有此主键数据则直接插入
MySQLUtils.insertOrUpdateDFtoDBUsePool(“member_test”,memberDF,Array(“user”,“salary”))
4.Delete(删除)
//删除指定条件的数据
MySQLUtils.deleteMysqlTable(hiveContext,“member_test”,“m_id ='member_1'”);
//删除指定数据表
MySQLUtils.dropMysqlTable(hiveContext, “member_test”);
具体操作步骤如下:
在pom.xml中导入MySQL连接器jar包和c3p0的依赖包,并导入更改
<dependency>
<groupId> mysql </ groupId>
<artifactId> mysql-connector-java </ artifactId>
<version> 5.1.38 </ version>
</ dependency>
<dependency>
<groupId> com.mchange </ groupId>
<artifactId> c3p0 </ artifactId>
<version> 0.9.5 </ version>
</ dependency>
把数据库连接池的获取,DDL和DML操作方法都封装在了下面3个工具类中
PropertyUtils获取conf / mysql-user.properties文件的配置信息
package utils
import java.util.Properties
/ **
*使用IntelliJ IDEA创建。
*作者:fly_elephant@163.com
*描述:PropertyUtils工具类
*日期:创建于2018-11-17 11:43
* /
object PropertyUtils {
def getFileProperties(fileName:String,propertyKey:String):String = {
val result = this.getClass.getClassLoader.getResourceAsStream(fileName)
val prop = new Properties
prop.load(result)
prop.getProperty(propertyKey)
}
}
MySQLPoolManager此类封装了数据库连接池的获取
package utils
import java.sql.Connection
import com.mchange.v2.c3p0.ComboPooledDataSource
/ **
*使用IntelliJ IDEA创建。
*作者:fly_elephant@163.com
*描述:MySQL连接池管理类
*日期:创建于2018-11-17 12:43
* /
object MySQLPoolManager {
var mysqlManager:MysqlPool = _
def getMysqlManager:MysqlPool = {
synchronized {
if(mysqlManager == null){
mysqlManager = new MysqlPool
}
}
mysqlManager
}
class MysqlPool extends Serializable {
private val cpds:ComboPooledDataSource = new ComboPooledDataSource(true)
try {
cpds.setJdbcUrl(PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.jdbc.url”))
cpds.setDriverClass(PropertyUtils.getFileProperties) (“mysql-user.properties”,“mysql.pool.jdbc.driverClass”))
cpds.setUser(PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.jdbc.username”))
cpds.setPassword(PropertyUtils .getFileProperties(“mysql-user.properties”,“mysql.jdbc.password”))
cpds.setMinPoolSize(PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.pool.jdbc.minPoolSize”)。toInt)
cpds.setMaxPoolSize(PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.pool.jdbc.maxPoolSize”)。toInt)
cpds.setAcquireIncrement(PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.pool。 jdbc.acquireIncrement“)。toInt)
cpds.setMaxStatements(PropertyUtils.getFileProperties(”mysql-user.properties“,”mysql.pool.jdbc.maxStatements“)。toInt)
} catch {
case e:Exception => e.printStackTrace( )
}
def getConnection:Connection = {
try {
cpds.getConnection()
} catch {
case ex:Exception =>
ex.printStackTrace()
null
}
}
def close():Unit = {
try {
cpds.close()
} catch {
case ex:Exception =>
ex.printStackTrace()
}
}
}
}
MySQLUtils封装了增加改查方法,直接使用即可
package utils
import java.sql。{Date,Timestamp}
import java.util.Properties
import org.apache.log4j.Logger
import org.apache.spark.sql.types._
import org.apache.spark.sql。{DataFrame,SQLContext}
/ **
*使用IntelliJ IDEA创建。
*作者:fly_elephant@163.com
*描述:MySQL DDL和DML工具类
*日期:创建于2018-11-17 12:43
* /
object MySQLUtils {
val logger:Logger = Logger.getLogger(getClass.getSimpleName)
/ **
*将DataFrame所有类型(除id外)转换为String后,通过c3p0的连接池方法,向mysql写入数据
*
* @param tableName表名
* @param resultDateFrame DataFrame
* /
def saveDFtoDBUsePool(tableName:String ,resultDateFrame:DataFrame){
val colNumbers = resultDateFrame.columns.length
val sql = getInsertSql(tableName,colNumbers)
val columnDataTypes = resultDateFrame.schema.fields.map(_。dataType)
resultDateFrame.foreachPartition(partitionRecords => {
val conn = MySQLPoolManager .getMysqlManager.getConnection //从连接池中获取一个连接
val preparedStatement = conn.prepareStatement(sql)
val metaData = conn.getMetaData.getColumns(null,“%”,tableName,“%”)//通过连接获取表名对应数据表的元数据
try {
conn.setAutoCommit(false)
partitionRecords.foreach(record => {
//注意:setString方法从1开始,record.getString()方法从0开始
for(i < - 1 to colNumbers){
val value = record。 get(i - 1)
val dateType = columnDataTypes(i - 1)
if(value!= null){//如何值不为空,将类型转换为String
preparedStatement.setString(i,value.toString)
dateType match {
case _:ByteType => preparedStatement.setInt(i,record.getAs [Int](i - 1))
case _:ShortType => preparedStatement.setInt(i,record.getAs [Int](i - 1))
case _: IntegerType => preparedStatement.setInt(i,record.getAs [Int](i - 1))
case _:LongType => preparedStatement.setLong(i,record.getAs [Long](i - 1))
case _:BooleanType => preparedStatement.setBoolean(i,record.getAs [Boolean](i - 1))
case _ :FloatType => preparedStatement.setFloat(i,record.getAs [Float](i - 1))
case _:DoubleType => preparedStatement.setDouble(i,record.getAs [Double](i - 1))
case _:StringType => preparedStatement.setString(i,record.getAs [String](i - 1))
case _:TimestampType => preparedStatement.setTimestamp(i,record.getAs [Timestamp](i - 1))
case _:DateType => preparedStatement.setDate(i,record.getAs [Date](i - 1))
case _ => throw new RuntimeException(s“nonsupport $ {dateType} !!!”)
}
} else {//如果值为空,将值设为对应类型的空值
metaData.absolute(i)
preparedStatement.setNull( i,metaData.getInt(“DATA_TYPE”))
}
}
preparedStatement.addBatch()
})
preparedStatement.executeBatch()
conn.commit()
} catch {
case e:Exception => println(s“@@ saveDFtoDBUsePool $ {e。 getMessage}“)
//做一些log
} finally {
preparedStatement.close()
conn.close()
}
})
}
/ **
*拼装插入SQL
* @param tableName
* @param colNumbers
* @return
* /
def getInsertSql(tableName:String,colNumbers:Int):String = {
var sqlStr =“insert into”+ tableName +“values(”
for (i < - 1 to colNumbers){
sqlStr + =“?”
if if(i!= colNumbers){
sqlStr + =“,”
}
}
sqlStr + =“)”
sqlStr
}
/ **以元组的方式返回mysql属性信息** /
def getMySQLInfo:(String,String,String)= {
val jdbcURL = PropertyUtils.getFileProperties(“mysql-user.properties”,“mysql.jdbc.url”)
VAL的userName = PropertyUtils.getFileProperties( “mysql-user.properties”, “mysql.jdbc.username”)
VAL密码= PropertyUtils.getFileProperties( “mysql-user.properties”, “mysql.jdbc.password”)
(JDBCURL,用户名,passWord)
}
/ **
*从MySQL的数据库中获取DateFrame
*
* @参数sqlContext sqlContext
* @参数mysqlTableName表名
* @参数queryCondition查询条件(可选)
* @返回DateFrame
* /
DEF getDFFromMysql(sqlContext:SQLContext,mysqlTableName:字符串,queryCondition :String):DataFrame = {
val(jdbcURL,userName,passWord)= getMySQLInfo
val prop = new Properties()
prop.put(“user”,userName)
prop.put(“password”,passWord)
if(null == queryCondition ||“”= = queryCondition)
sqlContext.read.jdbc(jdbcURL,mysqlTableName,prop)
else
sqlContext.read.jdbc(jdbcURL,mysqlTableName,prop).where(queryCondition)
}
/ **
*删除数据表
* @param sqlContext
* @param mysqlTableName
* @return
* /
def dropMysqlTable(sqlContext:SQLContext,mysqlTableName:String):Boolean = {
val conn = MySQLPoolManager.getMysqlManager.getConnection //从连接池中获取一个连接
val preparedStatement = conn.createStatement()
try {
preparedStatement.execute(s“drop table $ mysqlTableName”)
} catch {
case e:Exception =>
println(s“mysql dropMysqlTable error:$ {e.getMessage}”)
false
} finally {
preparedStatement.close()
conn.close()
}
}
/ **
*删除表中的数据
* @param sqlContext
* @param mysqlTableName
* @param condition
* @return
* /
def deleteMysqlTableData(sqlContext:SQLContext,mysqlTableName:String,condition:String):Boolean = {
val conn = MySQLPoolManager。 getMysqlManager.getConnection //从连接池中获取一个连接
val preparedStatement = conn.createStatement()
try {
preparedStatement.execute(s“从$ mysqlTableName中删除$ condition”)
} catch {
case e:Exception =>
println(s“ mysql deleteMysqlTable错误:$ {e.getMessage}“)
false
} finally {
preparedStatement.close()
conn.close()
}
}
/ **
*保存DataFrame到MySQL中,如果表不存在的话,会自动创建
* @param tableName
* @param resultDateFrame
* /
def saveDFtoDBCreateTableIfNotExist(tableName:String,resultDateFrame:DataFrame){
//如果没有表,根据DataFrame建表
createTableIfNotExist(tableName,resultDateFrame)
//验证数据表字段和dataFrame字段个数和名称,顺序是否一致
verifyFieldConsistency(tableName,resultDateFrame)
//保存df
saveDFtoDBUsePool(tableName,resultDateFrame)
}
/ **
*拼装insertOrUpdate SQL语句
* @param tableName
* @param cols
* @param updateColumns
* @return
* /
def getInsertOrUpdateSql(tableName:String,cols:Array [String],updateColumns:Array [String]):String = {
val colNumbers = cols.length
var sqlStr =“insert into”+ tableName +“values(”
for(i < - 1 to colNumbers){
sqlStr + =“?”
if if(i!= colNumbers){
sqlStr + =“,”
}
}
sqlStr + = “)ON DUPLICATE KEY UPDATE”
updateColumns.foreach(str => {
sqlStr + = s“$ str =?,”
})
sqlStr.substring(0,sqlStr.length - 1)
}
/ **
*通过insertOrUpdate的方式把DataFrame写入到MySQL中,注意:此方式,必须对表设置主键
* @param tableName
* @param resultDateFrame
* @param updateColumns
* /
def insertOrUpdateDFtoDBUsePool(tableName:String,resultDateFrame:DataFrame ,updateColumns:Array [String]){
val colNumbers = resultDateFrame.columns.length
val sql = getInsertOrUpdateSql(tableName,resultDateFrame.columns,updateColumns)
val columnDataTypes = resultDateFrame.schema.fields.map(_。dataType)
println(“## ############ sql =“+ sql”
resultDateFrame.foreachPartition(partitionRecords => {
val conn = MySQLPoolManager.getMysqlManager.getConnection //从连接池中获取一个连接
val preparedStatement = conn.prepareStatement(sql)
val metaData = conn.getMetaData.getColumns(null,“%”,tableName,“%”)//通过连接获取表名对应数据表的元数据
try {
conn.setAutoCommit(false )
partitionRecords.foreach(record => {
//注意:setString方法从1开始,record.getString()方法从0开始
for(i < - 1 to colNumbers){
val value = record.get(i - 1)
val dateType = columnDataTypes(i - 1)
if(value!= null){//如何值不为空,将类型转换为String
preparedStatement.setString(i,value.toString)
dateType match {
case _:ByteType => preparedStatement。 setInt(i,record.getAs [Int](i - 1))
case _:ShortType => preparedStatement.setInt(i,record.getAs [Int](i - 1))
case _:IntegerType => preparedStatement.setInt(i,record.getAs [Int](i - 1))
case _ :LongType => preparedStatement.setLong(i,record.getAs [Long](i - 1))
case _:BooleanType => preparedStatement.setInt(i,if(record.getAs [Boolean](i - 1))1 else 0)
case _:FloatType => preparedStatement.setFloat(i,record.getAs [Float](i - 1))
case _:DoubleType => preparedStatement.setDouble(i,record.getAs [Double](i - 1))
case _:StringType => preparedStatement.setString(i,record.getAs [String](i - 1))
case _:TimestampType => preparedStatement.setTimestamp(i,record.getAs [Timestamp](i - 1))
case _:DateType => preparedStatement.setDate(i,record.getAs [Date](i - 1))
case _ =>抛出新的RuntimeException(s“nonsupport $ {dateType} !!!”)
}
} else {//如果值为空,将值设为对应类型的空值
metaData.absolute(i)
preparedStatement.setNull(i, metaData.getInt(“DATA_TYPE”))
}
}
//设置需要更新的字段值
用于(ⅰ< - 1至updateColumns.length){
VAL字段索引= record.fieldIndex(updateColumns(I - 1))
VAL值= record.get(字段索引)
VAL的dataType = columnDataTypes(字段索引)
println(s“@@ $ fieldIndex,$ value,$ dataType”)
if(value!= null){//如何值不为空,将类型转换为String
dataType match {
case _:ByteType => preparedStatement.setInt (colNumbers + i,record.getAs [Int](fieldIndex))
case _:ShortType => preparedStatement.setInt(colNumbers + i,record.getAs [Int](fieldIndex))
case _:IntegerType => preparedStatement.setInt(colNumbers + i,record.getAs [Int](fieldIndex))
case _:LongType => preparedStatement.setLong(colNumbers + i,record.getAs [Long](fieldIndex))
case _ :BooleanType => preparedStatement.setBoolean(colNumbers + i,record.getAs [Boolean](fieldIndex))
case _:FloatType => preparedStatement.setFloat(colNumbers + i,record.getAs [Float](fieldIndex))
case _:DoubleType => preparedStatement.setDouble(colNumbers + i,record.getAs [Double](fieldIndex))
case _:StringType => preparedStatement.setString(colNumbers + i,record.getAs [String](fieldIndex))
case _:TimestampType => preparedStatement.setTimestamp(colNumbers + i,record.getAs [Timestamp](fieldIndex))
case _:DateType => preparedStatement.setDate(colNumbers + i,record.getAs [Date](fieldIndex))
case _ =>抛出新的RuntimeException(s“nonsupport $ {dataType} !!!”)
}
} else {//如果值为空,将值设为对应类型的空值
metaData.absolute(colNumbers + i)
preparedStatement.setNull( colNumbers + i,metaData.getInt(“DATA_TYPE”))
}
}
preparedStatement.addBatch()
})
preparedStatement.executeBatch()
conn.commit()
} catch {
case e:Exception => println(s“@@ insertOrUpdateDFtoDBUsePool $ {e.getMessage}”)
//做一些log
} finally {
preparedStatement.close()
conn.close()
}
})
}
/ **
*如果数据表不存在,根据DataFrame的字段创建数据表,数据表字段顺序和dataFrame对应
*若DateFrame出现名为id的字段,将其设为数据库主键(int,自增,主键),其他字段会根据DataFrame的DataType类型来自动映射到MySQL中
*
* @param tableName表名
* @param df dataFrame
* @return
* /
def createTableIfNotExist(tableName:String,df:DataFrame):AnyVal = {
val con = MySQLPoolManager .getMysqlManager.getConnection
val metaData = con.getMetaData
val colResultSet = metaData.getColumns(null,“%”,tableName,“%”)
//如果没有该表,创建数据表
if(!colResultSet.next()){
/ /构建建表字符串
val sb = new StringBuilder(s“CREATE
TABLE` $ tableName`(”)df.schema.fields.foreach(x =>
if(x.name.equalsIgnoreCase(“id”)){
sb.append(s“`$ {x.name}`int(255)NOT NULL AUTO_INCREMENT PRIMARY KEY,”)//如果是字段名为id,设置主键,整形,自增
} else {
x.dataType match {
case _:ByteType => sb.append(s“`$ {x.name}`int(100)DEFAULT NULL,”)
case _:ShortType => sb .append(s“`$ {x.name}`int(100)DEFAULT NULL,”)
case _:IntegerType => sb.append(s“`$ {x.name}`int(100)DEFAULT NULL,” )
case _:LongType => sb.append(s“`$ {x.name}`bigint(100)DEFAULT NULL,”)
case _:BooleanType => sb.append(s“`$ {x.name}` tinyint DEFAULT NULL,“)
case _:FloatType => sb.append(s”`$ {x。name}`float(50)DEFAULT NULL,“)
case _:DoubleType => sb.append(s“`$ {x.name}`double(50)DEFAULT NULL,”)
case _:StringType => sb.append(s“`$ {x.name}`varchar (50)DEFAULT NULL,“)
case _:TimestampType => sb.append(s”`$ {x.name}`timestamp DEFAULT current_timestamp,“)
case _:DateType => sb.append(s”`$ {x .name}`date DEFAULT NULL,“)
case _ => throw new RuntimeException(s”nonsupport $ {x.dataType} !!!“)
}
}
)
sb.append(”)ENGINE = InnoDB DEFAULT CHARSET = utf8“)
val sql_createTable = sb.deleteCharAt(sb.lastIndexOf(','))。toString()
println(sql_createTable)
val statement = con。的createStatement()
statement.execute(sql_createTable)
}
}
/ **
*验证数据表和dataFrame字段个数,名称,顺序是否一致
*
* @param tableName表名
* @param df dataFrame
* /
def verifyFieldConsistency(tableName:String,df:DataFrame):Unit = {
val con = MySQLPoolManager.getMysqlManager.getConnection
val metaData = con.getMetaData
val colResultSet = metaData.getColumns(null,“%”,tableName,“%”)
colResultSet.last()
val tableFiledNum = colResultSet.getRow
val dfFiledNum = df.columns.length
if (tableFiledNum!= dfFiledNum){
throw new Exception(s“数据表和DataFrame字段个数不一致!! table - $ tableFiledNum但dataFrame - $ dfFiledNum”)
}
for(i < - 1 to tableFiledNum){
colResultSet.absolute(I)
VAL tableFileName = colResultSet.getString( “COLUMN_NAME”)
VAL dfFiledName = df.columns.apply(I - 1)
(!tableFileName.equals(dfFiledName)){IF
抛出新的异常(一个或多个“数据表和DataFrame字段名不一致!! table - '$ tableFileName'但dataFrame - '$ dfFiledName'“)
}
}
colResultSet.beforeFirst()
}
}