Flink之ElasticSearch应用
2019-03-22 本文已影响0人
神奇的考拉
准备
一.docker安装es
1.拉取镜像: 可以指定国内的镜像源 速度会快很多
sudo docker pull docker.elastic.co/elasticsearch/elasticsearch
2.查看已安装的es镜像
sudo docker images
本地镜像内容如下
REPOSITORY TAG IMAGE ID CREATED SIZE
wurstmeister/kafka latest eb5fa40d9f7f 9 days ago 420MB
registry.docker-cn.com/library/redis latest 0f88f9be5839 2 weeks ago 95MB
wurstmeister/zookeeper latest 3f43f72cb283 2 months ago 510MB
registry.docker-cn.com/library/elasticsearch latest 5acf0e8da90b 6 months ago 486MB
3.启动es镜像: 将es容器启动并在后台运行
sudo docker run -d --name es -p 9200:9200 -p 9300:9300 \
-e "discovery.type=single-node" \
registry.docker-cn.com/library/elasticsearch
查看es容器是否启动成功
sudo docker container ls
输出如下内容:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
6cd0610f00d7 registry.docker-cn.com/library/elasticsearch "/docker-entrypoint.…" 3 hours ago Up 3 hours 0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp es
2d5fb0099017 registry.docker-cn.com/library/redis "docker-entrypoint.s…" 7 days ago Up 2 days 0.0.0.0:6379->6379/tcp myredis
779882fc6409 wurstmeister/kafka "start-kafka.sh" 8 days ago Up 2 days 0.0.0.0:9092->9092/tcp kafka
24a169067e20 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 8 days ago Up 2 days 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp zookeeper
4.配置文件elasticsearch.yml修改
sudo docker exec -it es /bin/bash
cd config
修改内容
sed '1a http.cors.enabled: true\nhttp.cors.allow-origin: "*"' elasticsearch.yml
cat elasticsearch.yml
退出容器: exit
5.重启容器
sudo docker restart es
6.验证es容器是否正常
浏览器输入: http://127.0.0.1:9200/_cat/indices?v
查看es内容: http://127.0.0.1:9200/taxi_ride_idx/_search?q=*
二.实现
import com.jdd.streaming.demos.entity.TaxiRide;
import com.jdd.streaming.demos.source.TaxiRideSource;
import com.jdd.streaming.demos.utils.GeoUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.*;
/**
* @Auther: dalan
* @Date: 19-3-22 15:36
* @Description:
*/
public class SimpleESStream {
// main
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
String data = params.get("data","/home/wmm/go_bench/flink_sources/nycTaxiRides.gz");
// 数据模拟参数
int maxServingDelay = 60;
int servingSpeedFactor = 600;
// 窗口参数
int countWindowLength = 15; // 窗口大小
int countWindowFrequency = 5; // 每5min计算一次
int earlyCountThreshold = 50;
// es相关参数
boolean writeToElasticsearch = true;
String elasticsearchHost = "";
int elasticsearchPort = 9300;
// 创建Environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 读取Source
DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(data, maxServingDelay, servingSpeedFactor));
// filter
DataStream<TaxiRide> cleansedRides = rides
.filter(r->!r.isStart)
.filter(r -> GeoUtils.isInNYC(r.startLon, r.startLat));
DataStream<Tuple2<Integer, Short>> cellIds = cleansedRides
.map(
new MapFunction<TaxiRide, Tuple2<Integer, Short>>() {
@Override
public Tuple2<Integer, Short> map(TaxiRide r) throws Exception {
return new Tuple2<Integer, Short>(GeoUtils.mapToGridCell(r.startLon, r.startLat), r.passengerCnt);
}
}
);
// cellIds.print();
DataStream<Tuple3<Integer, Long, Integer>> passengerCnts = cellIds
.keyBy(0) // 分组key
.timeWindow(Time.minutes(countWindowLength), Time.minutes(countWindowFrequency))
.apply(
new WindowFunction<Tuple2<Integer, Short>, Tuple3<Integer, Long, Integer>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple cell, TimeWindow window, Iterable<Tuple2<Integer, Short>> events, Collector<Tuple3<Integer, Long, Integer>> out) throws Exception {
Integer count = 0;
Iterator<Tuple2<Integer, Short>> iter = events.iterator();
while (iter.hasNext()) {
Tuple2<Integer, Short> t = iter.next();
count += t.f1.intValue();
}
out.collect(new Tuple3<Integer, Long, Integer>(cell.getField(0), window.getEnd(), count));
}
}
);
DataStream<Tuple4<Integer, Long, Tuple2<Double,Double>, Integer>> cntByLocaltion = passengerCnts
.map(
new MapFunction<Tuple3<Integer, Long, Integer>, Tuple4<Integer, Long, Tuple2<Double,Double>, Integer>>() {
@Override
public Tuple4<Integer, Long, Tuple2<Double,Double>, Integer> map(Tuple3<Integer, Long, Integer> r) throws Exception {
return (new Tuple4<Integer, Long, Tuple2<Double,Double>, Integer>(r.f0, r.f1, getGridCellCenter(r.f0), r.f2));
}
}
);
// cntByLocaltion.print();
if(writeToElasticsearch){
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
Map<String, String> config = new HashMap<>();
config.put("bulk.flush.max.actions", "1");
// elasticsearch sink
ElasticsearchSink.Builder<Tuple4<Integer, Long, Tuple2<Double,Double>, Integer>> builder = new ElasticsearchSink.Builder<Tuple4<Integer, Long, Tuple2<Double,Double>, Integer>>(
httpHosts,
new ElasticsearchSinkFunction<Tuple4<Integer, Long, Tuple2<Double,Double>, Integer>>() {
public IndexRequest createIndexRequest(Tuple4<Integer, Long, Tuple2<Double,Double>, Integer> element) { // 创建es对应的index
Map<String, Object> json = new HashMap<>();
json.put("localtion", (element.f2.f0+","+ element.f2.f1));
json.put("time", element.f1);
json.put("cnt", element.f3);
return Requests.indexRequest()
.index("taxi_ride_idx")
.type("test_type")
.source(json);
}
@Override
public void process(Tuple4<Integer, Long, Tuple2<Double,Double>, Integer> element, RuntimeContext ctx, RequestIndexer indexer) { // 处理index
indexer.add(createIndexRequest(element));
}
});
builder.setBulkFlushMaxActions(1);
cntByLocaltion.addSink(builder.build());
}
env.execute("a simple es stream demo ");
}
// 辅助参数
public static double LonEast = -73.7;
public static double LonWest = -74.05;
public static double LatNorth = 41.0;
public static double LatSouth = 40.5;
public static double LonWidth = 74.05 - 73.7;
public static double LatHeight = 41.0 - 40.5;
public static double DeltaLon = 0.0014;
public static double DeltaLat = 0.00125;
public static int CellCntX = 250;
public static double CellCntY = 400;
// 辅助方法: 用于一个中心点转为坐标经纬度
private static Tuple2<Double, Double> getGridCellCenter(int gridCellId){
int xIndex = gridCellId % CellCntX;
double lon = (Math.abs(LonWest) - (xIndex * DeltaLon) - (DeltaLon / 2)) * -1.0f;
int yIndex = (gridCellId - xIndex) / CellCntX;
double lat = (LatNorth - (yIndex * DeltaLat) - (DeltaLat / 2));
return new Tuple2<Double, Double>(lon, lat);
}
}
3.源码
github提供完整实例