SparkSQL connect MySQL with JDBC
2021-07-22 本文已影响0人
RoyTien
Reference
使用 Dataset 抽象数据
Spark DataFrame
has been removed in Java API (is Scala SPI it is just an alias) in Spark 2.0. We should replace it with Dataset<Row>
.
需要 SparkSession
,可能也需要从 RDD
转换成 Dataset<Row>
。
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructField;
SparkConf conf = new SparkConf().setMaster("local").setAppName("sample");
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())
JavaRDD<String> rdd = sc.parallelize(Arrays.asList(
"1111111||asdf",
"222222||zxcv"
));
rdd = rdd.map(line -> {return "0||" + line;});
List<StructField> structFields = new ArrayList();
structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
structFields.add(DataTypes.createStructField("time", DataTypes.IntegerType, true));
structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(structFields);
JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
@Override
public Row call(String record) throws Exception {
String[] attributes = record.split("\\|\\|");
return RowFactory.create(
Integer.parseInt(attributes[0]),
Integer.parseInt(attributes[1]),
attributes[2]
);
}
});
// Apply the schema to the RDD
Dataset<Row> dataset = spark.createDataFrame(rowRDD, schema);
SparkSQL 通过 JDBC 连接 MySQL 读写数据
import java.util.Properties;
Properties prop = new Properties();
prop.put("user", "root");
prop.put("password", "123456");
prop.put("driver", "com.mysql.jdbc.Driver");
dataset.write().mode("append").jdbc("jdbc:mysql://localhost:3306/DATABASE_NAME?useSSL=false", "TABLE_NAME", prop);
通过 spark-submit 提交任务
使用 spark-submit
提交任务到集群,必须通过 --driver-class-path
指定 MySQL 连接驱动 JAR 包。JAR 的版本需要和 MySQL 版本一致,可以通过 Maven 中央仓库查询。
spark-submit \
--name applicationName\
--master yarn --deploy-mode cluster \
--jars /xxx/libs/spark-sql_2.11-2.4.0.jar,/xxx/libs/mysql-connector-java-8.0.19.jar \
--driver-class-path /xxx/libs/mysql-connector-java-8.0.19.jar \
--class packageName.fileName.className \
/xxx/yyy/ZZZ.jar arg1 arg2