filebeat+kafka+Flink+ElasticSear
由于近期在研究ELK和最新的实时计算框架Flink,所以把以前热力图项目flume+kafka+SparkStreaming+mysql+ssm+高德地图热力图项目换组件重构一下。效果不会变好,可能更麻烦,性能更低,纯属应用一下最近研究的新组件和新计算框架。
项目环境:
filebeat 6.2.0
kafka 0.8.2
Flink 1.6.1
ElasticSearch 6.4.0
springboot 2.1.5
scala 2.11
项目顺序:
1.python写个脚本模拟下数据。正常情况的真实数据是,我们每个人的手机会不停发送你的经纬度,比如你坐火车到别的省份会收到一条短信,例如:山东省济南市欢迎您。就是这个道理。
import random
import time
phone = [
"13869555210",
"18542360152",
"15422556663",
"18852487210",
"13993584664",
"18754366522",
"15222436542",
"13369568452",
"13893556666",
"15366698558"
]
location = [
"123.449169, 41.740567",
"123.450169, 41.740705",
"123.451169, 41.741405",
"123.452169, 41.741805",
"123.453654, 41.742405",
"123.454654, 41.742805",
"123.455654, 41.743405",
"123.458654, 41.743705"
]
def sample_phone():
return random.sample(phone, 1)[0]
def sample_location():
return random.sample(location, 1)[0]
def generator_log(count=10):
time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
f = open("/var/log/lyh.log", "a+")
while count >= 1:
query_log = "{phone}\t{location}\t{date}".format(phone=sample_phone(), location=sample_location(),
date=time_str)
f.write(query_log + "\n")
# print query_log
count = count - 1
if __name__ == '__main__':
while True:
generator_log(100)
time.sleep(5)
把代码上传到linux环境运行,脚本功能向/var/log/lyh.log文件内 每隔5秒随即生成100条数据。内容就是电话号码+经纬度+时间,后期用Flink实时处理时候需要拿到经纬度信息。
2.用filebeat组件抓取/var/log/lyh.log文件中不停增加的数据,然后输出到kafka中
filebeat是ELK日志收集系统体系里抓取日志的插件,我们这里为了应用一下用他来抓取我们上面Python脚本生成的数据。
修改filebeat.yml配置文件,配置监控抓取信息的文件,和输出的位置
filebeat.prospectors:
- type: log #抓取信息后以log格式json字符串输出
paths:
- /var/log/lyh.log #监控抓取数据的文件
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
#如果不适用logstash对日志进行过滤,也可以直接输出到es
#output.elasticsearch:
# hosts: ["172.24.112.17:9200"]
# #输出到kafka
output.kafka:
hosts: ["hadoop1:9092", "hadoop2:9092", "hadoop3:9092"]
topic: 'log'
注意:
filebeat和kafka的版本一定要兼容否者报错,具体哪个版本之间互相兼容参考官方文档https://www.elastic.co/guide/en/beats/filebeat/6.4/kafka-output.html
启动filebeat命令
sudo -u elk ./filebeat -e -c filebeat.yml -d "publish"
3.编写Flink代码。从kafka消费数据,清洗数据拿到自己要的数据,存入到ElasticSearch中
pom依赖:
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.8 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.36</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.6.1</version>
</dependency>
</dependencies>
Flink代码:
import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import com.alibaba.fastjson.JSON
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.Requests
object Flink_kafka {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 非常关键,一定要设置启动检查点!!
env.enableCheckpointing(5000)
//配置kafka信息
val props = new Properties()
props.setProperty("bootstrap.servers", "192.168.199.128:9092,192.168.199.131:9092,192.168.199.132:9092")
props.setProperty("zookeeper.connect", "192.168.199.128:2181,192.168.199.131:2181,192.168.199.132:2181")
props.setProperty("group.id", "test")
//读取数据,第一个参数是kafka的topic,也就是上面filebeat配置文件里面设定的topic叫log
val consumer = new FlinkKafkaConsumer08[String]("log", new SimpleStringSchema(), props)
//设置只读取最新数据
consumer.setStartFromLatest()
//添加kafka为数据源
//18542360152 116.410588, 39.880172 2019-05-24 23:43:38
val stream = env.addSource(consumer).map(
x=>{
JSON.parseObject(x)
}
).map(x=>{
x.getString("message")
}).map(x=>{
val jingwei=x.split("\\t")(1)
val wei=jingwei.split(",")(0).trim
val jing=jingwei.split(",")(1).trim
//调一下时间格式,es里面存储时间默认是UTC格式日期,+0800是设置成北京时区
val sdf=new SimpleDateFormat("yyyy-MM-dd\'T\'HH:mm:ss.SSS+0800")
val time=sdf.format(new Date())
val resultStr=wei+","+jing+","+time
resultStr
})
stream.print() //数据清洗以后是这种样子 123.450169,41.740705,2019-05-26T19:03:59.281+0800
//把清洗好的数据存入es中,数据入库
val httpHosts = new java.util.ArrayList[HttpHost]
httpHosts.add(new HttpHost("192.168.199.128", 9200, "http"))//es的client通过http请求连接到es进行增删改查操作
val esSinkBuilder = new ElasticsearchSink.Builder[String](
httpHosts,
new ElasticsearchSinkFunction[String]{ //参数element就是上面清洗好的数据格式
def createIndexRequest(element: String):IndexRequest={
val json = new java.util.HashMap[String, String]
json.put("wei", element.split(",")(0))
json.put("jing", element.split(",")(1))
json.put("time", element.split(",")(2))
return Requests.indexRequest()
.index("location-index")
.`type`("location")
.source(json)
}
override def process(element: String, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(element))
}
}
)
//批量请求的配置;这将指示接收器在每个元素之后发出请求,否则将对它们进行缓冲。
esSinkBuilder.setBulkFlushMaxActions(1)
stream.addSink(esSinkBuilder.build())
env.execute("Kafka_Flink")
}
}
注意:
ES存储时间时候的格式和时区问题
elasticsearch原生支持date类型,json格式通过字符来表示date类型。
所以在用json提交日期至elasticsearch的时候,es会隐式转换,把es认为是date类型的字符串直接转为date类型。
date类型是包含时区信息的,如果我们没有在json代表日期的字符串中显式指定时区,对es来说没什么问题,
但是如果通过kibana显示es里的数据时,就会出现问题,数据的时间会晚8个小时。
kibana在通过浏览器展示的时候,会通过js获取当前客户端机器所在的时区,也就是东八区,所以kibana会把从es得到的日期数据减去8小时。
最佳实践方案就是:往es提交日期数据时,直接提交带有时区信息的日期字符串,
如:“2016-07-15T12:58:17.136+0800”
。 这个是世界协调时间(UTC)格式-es默认支持的格式
java格式化:
String FULL_FORMAT="yyyy-MM-dd\'T\'HH:mm:ss.SSS+0800";
Date now=new Date();
new SimpleDateFormat(FULL_FORMAT).format(now)
4.目前数据已经入库,用springboot创建一个web项目,从es里查出数据,在前台高德热力图组件里动态展示
整个web项目的pom依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--JavaServer Pages Standard Tag Library,JSP标准标签库-->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jstl</artifactId>
</dependency>
<!--内置tocat对Jsp支持的依赖,用于编译Jsp-->
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-jasper</artifactId>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.4.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.36</version>
</dependency>
</dependencies>
4.1我们要从es中查出距离当前时间20秒以内的所有数据,并且按经纬度聚合统计数量。
es的查询语句:
QQ截图20190526193643.png
使用聚合查询之前要先设置一下mapping,把jing和wei的属性fielddata设置成true,默认是false。不改成true进行聚会查询会报错。
上面语句是先查出距离当前时间20秒内的所有数据,然后根据jing和wei数据进行聚合也就是sql里的group by,聚会以后统计总数。意思就是当前经纬度内的总人数,数越大代表该区域人越多。
web代码里面我们要把上述查询语句通过es的api换成java代码实现。
先创建一个Location实体类,来存放查询出来的数据,总数,经度,纬度。
public class Location {
private Integer count;
private double wei;
private double jing;
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public double getWei() {
return wei;
}
public void setWei(double wei) {
this.wei = wei;
}
public double getJing() {
return jing;
}
public void setJing(double jing) {
this.jing = jing;
}
}
写一个es的工具类,创建和es连接的client,一些基本增删改查方法,以及上面查询语句的java代码实现
import com.test.flink_web_show.controller.Location;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.script.mustache.SearchTemplateRequest;
import org.elasticsearch.script.mustache.SearchTemplateResponse;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class RestClientUtils {
private RestHighLevelClient client = null;
public RestClientUtils() {
if (client == null){
synchronized (RestHighLevelClient.class){
if (client == null){
client = getClient();
}
}
}
}
private RestHighLevelClient getClient(){
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("192.168.199.128", 9200, "http"),
new HttpHost("192.168.199.128", 9201, "http")));
return client;
}
public void closeClient(){
try {
if (client != null){
client.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/*------------------------------------------------ search Api 多条件查询 start ----------------------------------------------*/
/**
* 查询模板
* @throws Exception
*/
public List<Location> searchTemplate(String indexName, String JsonStr, Map<String, Object> scriptParams) throws Exception{
//Inline Templates
SearchTemplateRequest request = new SearchTemplateRequest();
request.setRequest(new SearchRequest(indexName));
request.setScriptType(ScriptType.INLINE);
request.setScript(JsonStr);
request.setScriptParams(scriptParams);
//Synchronous Execution
SearchTemplateResponse response = client.searchTemplate(request, RequestOptions.DEFAULT);
//SearchTemplate Response
SearchResponse searchResponse = response.getResponse();
//Retrieving SearchHits 获取结果数据
SearchHits hits = searchResponse.getHits();
long totalHits = hits.getTotalHits();
float maxScore = hits.getMaxScore();
System.out.println("totalHits: " + totalHits);
System.out.println("maxScore: " + maxScore);
System.out.println("------------------------------------------");
SearchHit[] searchHits = hits.getHits();
/*for (SearchHit hit : searchHits) {
// do something with the SearchHit
String index = hit.getIndex();
String type = hit.getType();
String id = hit.getId();
float score = hit.getScore();
String sourceAsString = hit.getSourceAsString();
System.out.println("index: " + index);
System.out.println("type: " + type);
System.out.println("id: " + id);
System.out.println("score: " + score);
System.out.println(sourceAsString);
System.out.println("------------------------------------------");
}*/
//得到aggregations下内容
ArrayList<Location> locations = new ArrayList<>();
Aggregations aggregations = searchResponse.getAggregations();
if(aggregations!=null){
Map<String, Aggregation> aggregationMap = aggregations.getAsMap();
Terms companyAggregation = (Terms) aggregationMap.get("group_by_jing");
List<? extends Terms.Bucket> buckets = companyAggregation.getBuckets();
for(Terms.Bucket bk:buckets){
Location location = new Location();
Object key = bk.getKey();
long docCount = bk.getDocCount();
System.out.println("key: "+key.toString());
System.out.println("doc_count: "+docCount);
String jingdu = key.toString().split("#split#")[0];
String substring_jing = jingdu.substring(1, jingdu.length() - 1);
location.setJing(Double.parseDouble(substring_jing));
String weidu = key.toString().split("#split#")[1];
String substring_wei = weidu.substring(1, weidu.length() - 1);
location.setWei(Double.parseDouble(substring_wei));
location.setCount((int)docCount);
locations.add(location);
}
}
return locations;
}
}
es的java api比较复杂具体参考我的另一篇简书ElasticSearch java API
Controller代码:
import com.alibaba.fastjson.JSON;
import com.test.flink_web_show.es_utils.RestClientUtils;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.servlet.ModelAndView;
import javax.servlet.http.HttpServletResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Controller
public class HomeController {
@RequestMapping("/")
public ModelAndView home()
{
ModelAndView modelAndView = new ModelAndView();
modelAndView.setViewName("index");
return modelAndView;
}
@RequestMapping("/get_map")
public void getMap(HttpServletResponse response) throws Exception{
RestClientUtils restClientUtils = new RestClientUtils();
String searchJSON="{\n" +
" \"query\": {\n" +
" \"bool\": {\n" +
" \"filter\": {\n" +
" \"range\": {\n" +
" \"{{time}}\": {\n" +
" \"{{gte}}\": \"{{value1}}\", \n" +
" \"{{lt}}\": \"{{now}}\"\n" +
" }\n" +
" }\n" +
" }\n" +
" }\n" +
" },\n" +
" \"aggs\": {\n" +
" \"{{group_by_jing}}\": {\n" +
" \"terms\": {\n" +
" \"script\": \"{{doc['jing'].values +'#split#'+ doc['wei'].values}}\"\n" +
" }\n" +
" }\n" +
" }\n" +
"}";
Map<String, Object> map = new HashMap<>();
map.put("time","time");
map.put("gte","gte");
map.put("value1","now-20s");
map.put("lt","lt");
map.put("now","now");
map.put("group_by_jing","group_by_jing");
map.put("doc['jing'].values +'#split#'+ doc['wei'].values","doc['jing'].values +'#split#'+ doc['wei'].values");
List<Location> locations = restClientUtils.searchTemplate("location-index", searchJSON, map);
restClientUtils.closeClient();
String json = JSON.toJSONString(locations);
response.getWriter().print(json);
}
}
前台jsp代码:
<%--
Created by IntelliJ IDEA.
User: ttc
Date: 2018/7/6
Time: 14:06
To change this template use File | Settings | File Templates.
--%>
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8"/>
<title>高德地图</title>
<link rel="stylesheet" href="http://cache.amap.com/lbs/static/main1119.css"/>
</head>
<body>
<script src="https://cdn.bootcss.com/echarts/4.1.0.rc2/echarts.min.js"></script>
<script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
<script src="http://webapi.amap.com/maps?v=1.4.9&key=d16808eab90b7545923a1c2f4bb659ef"></script>
<div id="container"></div>
<script>
//定义地图第一次打开的中心位置
var map = new AMap.Map("container", {
resizeEnable: true,
center: [123.453169, 41.742567], //中心点的经纬度
zoom: 17 //初始地图的缩放度
});
var heatmap;
map.plugin(["AMap.Heatmap"],function() { //加载热力图插件
heatmap = new AMap.Heatmap(map,{
raduis:50,
opacity:[0,0.7]
}); //在地图对象叠加热力图
//具体参数见接口文档
});
//定时函数每1秒就发送一个ajax请求,去es里面查询数据赋值给points对象,从而更新heatmap对象来给热力图添加数据
setInterval(function (args) {
var points =(function a(){ //<![CDATA[
var city=[];
$.ajax({
type:"POST",
url:"/get_map",
dataType:'json',
async:false, //
success:function(result){
for(var i=0;i<result.length;i++){
//alert("调用了");
city.push({"lng":result[i].wei,"lat":result[i].jing,"count":result[i].count});
}
}
});
return city;
})();//]]>
heatmap.setDataSet({data:points,max:70}); //设置热力图数据集
},1000)
// var map = new AMap.Map('container', {
// pitch:75, // 地图俯仰角度,有效范围 0 度- 83 度
// viewMode:'3D' // 地图模式
//});
</script>
</body>
</html>
上述为全部代码部分
按顺序启动项目全部流程
启动zookeeper
zkServer.sh start
启动kafka
bin/kafka-server-start.sh config/server.properties
启动es
sudo -u elk bin/elasticsearch
启动filebeat
sudo -u elk ./filebeat -e -c filebeat.yml -d "publish"
启动Python脚本生成模拟数据
python phoneData_every5second.py
启动Flink项目,实时接收并处理数据存入到es
启动web项目完成动态地图展示
QQ截图20190526201432.png
不会截取gif动图,数据模拟也不是很好,图中的热力图圆圈颜色会根据模拟数据一直不停变化。