SparkSQL connect MySQL with JDBC

2021-07-22  本文已影响0人  RoyTien

Reference

使用 Dataset 抽象数据

Spark DataFrame has been removed in Java API (is Scala SPI it is just an alias) in Spark 2.0. We should replace it with Dataset<Row>.

需要 SparkSession,可能也需要从 RDD 转换成 Dataset<Row>


import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.JavaSparkContext;  
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructField;

SparkConf conf = new SparkConf().setMaster("local").setAppName("sample");
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();

JavaSparkContext sc = new JavaSparkContext(spark.sparkContext())
JavaRDD<String> rdd = sc.parallelize(Arrays.asList(
    "1111111||asdf",
    "222222||zxcv"
));

rdd = rdd.map(line -> {return "0||" + line;});

List<StructField> structFields = new ArrayList();
structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
structFields.add(DataTypes.createStructField("time", DataTypes.IntegerType, true));
structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(structFields);

JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
    @Override
    public Row call(String record) throws Exception {
        String[] attributes = record.split("\\|\\|");
        return RowFactory.create(
                Integer.parseInt(attributes[0]),
                Integer.parseInt(attributes[1]),
                attributes[2]
        );
    }
});

// Apply the schema to the RDD
Dataset<Row> dataset = spark.createDataFrame(rowRDD, schema);

SparkSQL 通过 JDBC 连接 MySQL 读写数据

import java.util.Properties;

Properties prop = new Properties();
prop.put("user", "root");
prop.put("password", "123456");
prop.put("driver", "com.mysql.jdbc.Driver");

dataset.write().mode("append").jdbc("jdbc:mysql://localhost:3306/DATABASE_NAME?useSSL=false", "TABLE_NAME", prop);

通过 spark-submit 提交任务

使用 spark-submit 提交任务到集群,必须通过 --driver-class-path 指定 MySQL 连接驱动 JAR 包。JAR 的版本需要和 MySQL 版本一致,可以通过 Maven 中央仓库查询。

spark-submit \
--name applicationName\
--master yarn --deploy-mode cluster \
--jars /xxx/libs/spark-sql_2.11-2.4.0.jar,/xxx/libs/mysql-connector-java-8.0.19.jar \
--driver-class-path /xxx/libs/mysql-connector-java-8.0.19.jar \
--class packageName.fileName.className \
/xxx/yyy/ZZZ.jar arg1 arg2 
上一篇 下一篇

猜你喜欢

热点阅读