使用SparkSQL操作Elasticsearch - Spar
Hadoop允许Elasticsearch在Spark中以两种方式使用:通过自2.1以来的专用支持,或者通过自2.0以来的Map/Reduce桥接器。从5.0版本开始,elasticsearch-hadoop就支持Spark 2.0。
为Spark添加ES支持
1、引入Maven
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>7.2.0</version>
<scope>test</scope>
</dependency>
2、添加基础配置
更多配置详情请参考:点我
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* @Auther: majx2
* @Date: 2019-8-2 09:41
* @Description:
*/
public class SparkHelper {
private static SparkSession session = SparkSession.builder().config(getConf()).getOrCreate();
public static JavaSparkContext getContext(){
return JavaSparkContext.fromSparkContext(session.sparkContext());
}
public static SparkSession getSession() {
return session;
}
private static SparkConf getConf(){
final SparkConf conf = new SparkConf().setAppName("SparkDemo").setMaster("local[4]");
conf.set("es.nodes", "127.0.0.1");
conf.set("es.port", "9200");
conf.set("es.net.http.auth.user", "elastic");
conf.set("es.net.http.auth.pass", "elastic");
conf.set("es.scroll.size", "10000");
return conf;
}
public static StructType getStructType() {
List<StructField> fields = new ArrayList<>();
StructField field;
field = DataTypes.createStructField("id", DataTypes.StringType, true);
fields.add(field);
field = DataTypes.createStructField("entity", DataTypes.StringType, true);
fields.add(field);
return DataTypes.createStructType(fields);
}
}
3、搭建一个运行结构
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.map.MapUtil;
import com.alibaba.fastjson.JSON;
import com.midea.ec.fc.datacenter.common.spark.dto.CustomOrderReport;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.elasticsearch.spark.rdd.Metadata;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL;
import org.spark_project.guava.collect.ImmutableList;
import org.spark_project.guava.collect.ImmutableMap;
import scala.Tuple2;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* @Auther: majx2
* @Date: 2019-8-2 09:38
* @Description:
*/
@Slf4j
public class SparkEsTest {
final static JavaSparkContext jsc = SparkHelper.getContext();
final static SparkSession session = SparkHelper.getSession();
final static SQLContext sql = new SQLContext(SparkHelper.getSession());
public static void main(String[] args) throws SQLException {
// 写入数据
simpleWrite();
simpleWrite2();
jsonWrite();
dynamicIndexWrite();
saveWithMeta();
readByRDD();
saveBySQL();
// 读取数据
readByRDD();
readBySQL();
}
}
写入数据到ES
1、通过rdd写入两条记录
并通过es.mapping.id
设置元数据_id
值为id
字段。
private static void simpleWrite(){
Map<String, ?> numbers = ImmutableMap.of("id",1,"one", 1, "two", 2);
Map<String, ?> airports = ImmutableMap.of("id",2,"OTP", "Otopeni", "SFO", "San Fran");
// 创建一个简单的RDD
JavaRDD<Map<String, ?>> javaRDD = SparkHelper.getContext().parallelize(ImmutableList.of(numbers, airports));
JavaEsSpark.saveToEs(javaRDD, "spark/demo",ImmutableMap.of("es.mapping.id", "id"));
}
效果如下:
rdd写入es2、通过对象的方式写入
准备对象,并实现序列化。如果不序列化会异常
import lombok.Data;
import java.io.Serializable;
import java.sql.Timestamp;
/**
* @Auther: majx2
* @Date: 2019-8-5 11:04
* @Description:
*/
@Data
public class CustomOrderReport implements Serializable {
private static final long serialVersionUID = 4858959432062088728L;
private Long id;
private Timestamp shipedReturnTime;
private String customerCode;
}
通过rdd的方式写入
private static void simpleWrite2() throws SQLException {
List<CustomOrderReport> reportList = Lists.newArrayList();
List<Entity> list = Db.use().query("select * from f_order_report_701 where id < ?", 30);
list.forEach(p->{
CustomOrderReport report = p.toBean(CustomOrderReport.class);
reportList.add(report);
});
JavaRDD<CustomOrderReport> javaRDD = jsc.parallelize(reportList);
JavaEsSpark.saveToEs(javaRDD, "spark-simple2/_doc", ImmutableMap.of("es.mapping.id", "id"));
}
效果如下:
把对象写入ES3、通过JSON写入ES
public static void jsonWrite(){
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}";
String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}";
JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2));
JavaEsSpark.saveJsonToEs(stringRDD, "spark-json/_doc");
}
效果如下:
将JSON数据写入ES4、动态索引写入数据
使用占位符:{media_type}
,media_type对应数据中的字段key
public static void dynamicIndexWrite(){
Map<String, ?> game = ImmutableMap.of("media_type", "game", "title", "FF VI",
"year", "1994");
JavaRDD<Map<String, ?>> dynamicRDD = jsc.parallelize(ImmutableList.of(game));
JavaEsSpark.saveToEs(dynamicRDD, "spark-collection-{media_type}/_doc");
}
效果如下:
动态索引5、带上元数据写入
public static void saveWithMeta(){
// 保存元数据
Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni");
Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran");
// 文档对应的元数据
Map<Metadata, Object> otpMeta = ImmutableMap.of(Metadata.ID, 1, Metadata.VERSION, "1");
Map<Metadata, Object> sfoMeta = ImmutableMap.of(Metadata.ID, "2", Metadata.VERSION, "23");
// create a pair RDD between the id and the docs
JavaPairRDD<?, ?> pairRDD = jsc.parallelizePairs(
ImmutableList.of(new Tuple2(otpMeta, otp), new Tuple2(sfoMeta, sfo))
);
JavaEsSpark.saveToEsWithMeta(pairRDD, "spark-with-meta-data/_doc");
}
效果如下:
添加元数据6、通过SparkSQL保存结果
private static void saveBySQL(){
final Dataset<Row> dataset = JavaEsSparkSQL.esDF(sql, "spark-simple/_doc", "?q=Otopeni");
JavaEsSparkSQL.saveToEs(dataset, "spark-sql/_doc",ImmutableMap.of("es.mapping.id", "id"));
}
效果如下:
SparkSQL写入数据从ES读取数据
1、通过rdd读取ES数据
并转换为自定义的Dataset对象。
private static void readByRDD() {
JavaPairRDD<String, Map<String, Object>> esPairRDD = JavaEsSpark.esRDD(SparkHelper.getContext(), "f_order_report_701/_doc");
JavaRDD<Row> esRDD = esPairRDD.map(entity -> {
Map<String, Object> source = entity._2();
source.remove("esCreateTime");
source.remove("esUpdateTime");
source = MapUtil.sort(source);
return RowFactory.create(entity._1(), JSON.toJSONString(source));
});
Dataset<Row> dataset = SparkHelper.getSession().createDataFrame(esRDD, SparkHelper.getStructType());
dataset.show(3);
log.info(dataset.head().toString());
}
返回结果:
+---+--------------------+
| id| entity|
+---+--------------------+
| 69|{"accountTime":15...|
| 70|{"accountTime":15...|
| 72|{"accountTime":15...|
+---+--------------------+
only showing top 3 rows
2019-08-06 14:41:22.460 [main] [INFO ] [c.m.e.f.d.common.spark.SparkEsTest] - [69,{"accountTime":1563379200000,"applyType":5,"cateId":"1","createTime":1563420070000,"customerCode":"C0010850","franchiser":"0","id":69,"isAdjust":0,"itemId":"31031050000042","orderAttribute":2,"ouId":701,"outerOrderId":"920190718004","outerStoreCode":"SHC44F4140","salesCenterCode":"3608380682-1","salesChannel":2,"salesOrderId":"OM19071800000003","settlementAmount":699.0,"shipedReturnTime":1563420061000,"shopId":2,"status":5,"updateTime":1563421756000,"writeOffAmount":0.0,"writeOffStatus":0}]
2、使用SparkSQL读取ES数据
private static void readBySQL() {
final Dataset<Row> dataset = JavaEsSparkSQL.esDF(sql, "spark-simple/_doc");
dataset.createOrReplaceTempView("simple");
final Dataset<Row> result = session.sql("SELECT * FROM simple WHERE id =1");
result.show();
}
返回结果如下:
+----+----+---+---+---+
| OTP| SFO| id|one|two|
+----+----+---+---+---+
|null|null| 1| 1| 2|
+----+----+---+---+---+
3、提高性能的写法
使用elasticsearch-hadoop作为Spark源的一个重要隐藏特性是,连接器理解在DataFrame/SQL中执行的操作,并且在默认情况下,将这些操作转换为适当的QueryDSL。换句话说,连接器直接下推源上的操作,在源上有效地过滤数据,以便只将所需的数据流回Spark。这极大地提高了查询性能,并将Spark和Elasticsearch集群上的CPU、内存和I/O降到最低,因为只返回所需的数据(而不是只返回批量数据,由Spark处理和丢弃)。注意,下推操作即使在指定查询时也适用——连接器将根据指定的SQL增强查询。
private static void readByPushDown(){
Dataset<Row> dataset;
dataset = sql.read().format("org.elasticsearch.spark.sql").load("spark-simple/_doc");
dataset = dataset.filter(dataset.col("id").equalTo(1)).select("id","one","two");
dataset.show();
session.sql("CREATE TEMPORARY TABLE simple USING org.elasticsearch.spark.sql OPTIONS (path 'spark-simple/_doc',scroll_size '20')");
dataset = session.sql("SELECT id,one,two FROM simple WHERE id = 1");
dataset.show();
}
返回结果:
+---+---+---+
| id|one|two|
+---+---+---+
| 1| 1| 2|
+---+---+---+
+---+---+---+
| id|one|two|
+---+---+---+
| 1| 1| 2|
+---+---+---+
The End !
参考资料:https://www.elastic.co/guide/en/elasticsearch/hadoop/7.3/spark.html