大数据

使用SparkSQL操作Elasticsearch - Spar

2019-08-06  本文已影响62人  DreamsonMa

Hadoop允许Elasticsearch在Spark中以两种方式使用:通过自2.1以来的专用支持,或者通过自2.0以来的Map/Reduce桥接器。从5.0版本开始,elasticsearch-hadoop就支持Spark 2.0。

为Spark添加ES支持

1、引入Maven

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.version}</artifactId>
    <version>${spark.version}</version>
    <scope>test</scope>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-hadoop</artifactId>
    <version>7.2.0</version>
    <scope>test</scope>
</dependency>

2、添加基础配置

更多配置详情请参考:点我

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
 * @Auther: majx2
 * @Date: 2019-8-2 09:41
 * @Description:
 */
public class SparkHelper {

    private static SparkSession session = SparkSession.builder().config(getConf()).getOrCreate();

    public static JavaSparkContext getContext(){
        return JavaSparkContext.fromSparkContext(session.sparkContext());
    }
    public static SparkSession getSession() {
        return session;
    }

    private static SparkConf getConf(){
        final SparkConf conf = new SparkConf().setAppName("SparkDemo").setMaster("local[4]");
        conf.set("es.nodes", "127.0.0.1");
        conf.set("es.port", "9200");
        conf.set("es.net.http.auth.user", "elastic");
        conf.set("es.net.http.auth.pass", "elastic");
        conf.set("es.scroll.size", "10000");
        return conf;
    }

    public static StructType getStructType() {
        List<StructField> fields = new ArrayList<>();
        StructField field;
        field = DataTypes.createStructField("id", DataTypes.StringType, true);
        fields.add(field);
        field = DataTypes.createStructField("entity", DataTypes.StringType, true);
        fields.add(field);
        return DataTypes.createStructType(fields);
    }
}

3、搭建一个运行结构

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.map.MapUtil;
import com.alibaba.fastjson.JSON;
import com.midea.ec.fc.datacenter.common.spark.dto.CustomOrderReport;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.elasticsearch.spark.rdd.Metadata;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;
import org.spark_project.guava.collect.ImmutableList;
import org.spark_project.guava.collect.ImmutableMap;
import scala.Tuple2;

import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

/**
 * @Auther: majx2
 * @Date: 2019-8-2 09:38
 * @Description:
 */
@Slf4j
public class SparkEsTest {

    final static JavaSparkContext jsc = SparkHelper.getContext();
    final static SparkSession session =  SparkHelper.getSession();
    final static SQLContext sql = new SQLContext(SparkHelper.getSession());

    public static void main(String[] args) throws SQLException {
        // 写入数据
        simpleWrite();
        simpleWrite2();
        jsonWrite();
        dynamicIndexWrite();
        saveWithMeta();
        readByRDD();
        saveBySQL();
        // 读取数据
        readByRDD();
        readBySQL();
    }
}

写入数据到ES

1、通过rdd写入两条记录

并通过es.mapping.id设置元数据_id值为id字段。

private static void simpleWrite(){
    Map<String, ?> numbers = ImmutableMap.of("id",1,"one", 1, "two", 2);
    Map<String, ?> airports = ImmutableMap.of("id",2,"OTP", "Otopeni", "SFO", "San Fran");
    // 创建一个简单的RDD
    JavaRDD<Map<String, ?>> javaRDD = SparkHelper.getContext().parallelize(ImmutableList.of(numbers, airports));
    JavaEsSpark.saveToEs(javaRDD, "spark/demo",ImmutableMap.of("es.mapping.id", "id"));
}

效果如下:

rdd写入es

2、通过对象的方式写入

准备对象,并实现序列化。如果不序列化会异常

import lombok.Data;

import java.io.Serializable;
import java.sql.Timestamp;

/**
 * @Auther: majx2
 * @Date: 2019-8-5 11:04
 * @Description:
 */
@Data
public class CustomOrderReport implements Serializable {
    private static final long serialVersionUID = 4858959432062088728L;
    private Long id;
    private Timestamp shipedReturnTime;
    private String customerCode;
}

通过rdd的方式写入

private static void simpleWrite2() throws SQLException {
    List<CustomOrderReport> reportList = Lists.newArrayList();
    List<Entity> list = Db.use().query("select * from f_order_report_701 where id < ?", 30);
    list.forEach(p->{
        CustomOrderReport report = p.toBean(CustomOrderReport.class);
        reportList.add(report);
    });
    JavaRDD<CustomOrderReport> javaRDD = jsc.parallelize(reportList);
    JavaEsSpark.saveToEs(javaRDD, "spark-simple2/_doc", ImmutableMap.of("es.mapping.id", "id"));
}

效果如下:

把对象写入ES

3、通过JSON写入ES

public static void jsonWrite(){
    String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";
    String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";
    JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2));
    JavaEsSpark.saveJsonToEs(stringRDD, "spark-json/_doc");
}

效果如下:

将JSON数据写入ES

4、动态索引写入数据

使用占位符:{media_type},media_type对应数据中的字段key

public static void dynamicIndexWrite(){
    Map<String, ?> game = ImmutableMap.of("media_type", "game", "title", "FF VI",
            "year", "1994");
    JavaRDD<Map<String, ?>> dynamicRDD = jsc.parallelize(ImmutableList.of(game));
    JavaEsSpark.saveToEs(dynamicRDD, "spark-collection-{media_type}/_doc");
}

效果如下:

动态索引

5、带上元数据写入

public static void saveWithMeta(){
    // 保存元数据
    Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
    Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran");

    // 文档对应的元数据
    Map<Metadata, Object> otpMeta = ImmutableMap.of(Metadata.ID, 1, Metadata.VERSION, "1");
    Map<Metadata, Object> sfoMeta = ImmutableMap.of(Metadata.ID, "2", Metadata.VERSION, "23");

    // create a pair RDD between the id and the docs
    JavaPairRDD<?, ?> pairRDD = jsc.parallelizePairs(
            ImmutableList.of(new Tuple2(otpMeta, otp), new Tuple2(sfoMeta, sfo))
    );
    JavaEsSpark.saveToEsWithMeta(pairRDD, "spark-with-meta-data/_doc");
}

效果如下:

添加元数据

6、通过SparkSQL保存结果

private static void saveBySQL(){
    final Dataset<Row> dataset = JavaEsSparkSQL.esDF(sql, "spark-simple/_doc", "?q=Otopeni");
    JavaEsSparkSQL.saveToEs(dataset, "spark-sql/_doc",ImmutableMap.of("es.mapping.id", "id"));
}

效果如下:

SparkSQL写入数据

从ES读取数据

1、通过rdd读取ES数据

并转换为自定义的Dataset对象。

private static void readByRDD() {
    JavaPairRDD<String, Map<String, Object>> esPairRDD = JavaEsSpark.esRDD(SparkHelper.getContext(), "f_order_report_701/_doc");
    JavaRDD<Row> esRDD = esPairRDD.map(entity -> {
        Map<String, Object> source = entity._2();
        source.remove("esCreateTime");
        source.remove("esUpdateTime");
        source = MapUtil.sort(source);
        return RowFactory.create(entity._1(), JSON.toJSONString(source));
    });
    Dataset<Row> dataset = SparkHelper.getSession().createDataFrame(esRDD, SparkHelper.getStructType());
    dataset.show(3);
    log.info(dataset.head().toString());
}

返回结果:

+---+--------------------+
| id|              entity|
+---+--------------------+
| 69|{"accountTime":15...|
| 70|{"accountTime":15...|
| 72|{"accountTime":15...|
+---+--------------------+
only showing top 3 rows

2019-08-06 14:41:22.460 [main]  [INFO ] [c.m.e.f.d.common.spark.SparkEsTest] - [69,{"accountTime":1563379200000,"applyType":5,"cateId":"1","createTime":1563420070000,"customerCode":"C0010850","franchiser":"0","id":69,"isAdjust":0,"itemId":"31031050000042","orderAttribute":2,"ouId":701,"outerOrderId":"920190718004","outerStoreCode":"SHC44F4140","salesCenterCode":"3608380682-1","salesChannel":2,"salesOrderId":"OM19071800000003","settlementAmount":699.0,"shipedReturnTime":1563420061000,"shopId":2,"status":5,"updateTime":1563421756000,"writeOffAmount":0.0,"writeOffStatus":0}]

2、使用SparkSQL读取ES数据

private static void readBySQL() {
    final Dataset<Row> dataset = JavaEsSparkSQL.esDF(sql, "spark-simple/_doc");
    dataset.createOrReplaceTempView("simple");
    final Dataset<Row> result = session.sql("SELECT * FROM simple WHERE id =1");
    result.show();
}

返回结果如下:

+----+----+---+---+---+
| OTP| SFO| id|one|two|
+----+----+---+---+---+
|null|null|  1|  1|  2|
+----+----+---+---+---+

3、提高性能的写法

使用elasticsearch-hadoop作为Spark源的一个重要隐藏特性是,连接器理解在DataFrame/SQL中执行的操作,并且在默认情况下,将这些操作转换为适当的QueryDSL。换句话说,连接器直接下推源上的操作,在源上有效地过滤数据,以便只将所需的数据流回Spark。这极大地提高了查询性能,并将Spark和Elasticsearch集群上的CPU、内存和I/O降到最低,因为只返回所需的数据(而不是只返回批量数据,由Spark处理和丢弃)。注意,下推操作即使在指定查询时也适用——连接器将根据指定的SQL增强查询。

private static void readByPushDown(){
    Dataset<Row> dataset;

    dataset = sql.read().format("org.elasticsearch.spark.sql").load("spark-simple/_doc");
    dataset = dataset.filter(dataset.col("id").equalTo(1)).select("id","one","two");
    dataset.show();

    session.sql("CREATE TEMPORARY TABLE simple USING org.elasticsearch.spark.sql OPTIONS (path 'spark-simple/_doc',scroll_size '20')");
    dataset = session.sql("SELECT id,one,two FROM simple WHERE id = 1");
    dataset.show();
}

返回结果:

+---+---+---+
| id|one|two|
+---+---+---+
|  1|  1|  2|
+---+---+---+

+---+---+---+
| id|one|two|
+---+---+---+
|  1|  1|  2|
+---+---+---+

The End !

参考资料:https://www.elastic.co/guide/en/elasticsearch/hadoop/7.3/spark.html

上一篇下一篇

猜你喜欢

热点阅读