浅谈Apache Kylin二次开发
1、生产环境
Apache Kylin-1.5.3 for CDH
CDH 5.7
由于工作的需要,仅仅在页面操作Apache Kylin,其实时性以及所要花费的人力成本远远达不到预期的目的。
2、需求
需要对Apache kylin的Resetful API进行充分调研,开发一个类似客户端的功能,大致可以实现kafka更新到HBase中的数据可以实时或者定时地进行自动化的更新。
3、原始设计方案
总体设计分为两部分,第一部分为与kafka到HBase中程序对接的部分,第二个部分为与Kylin直接交互的部分。
整体说明:事实表即为主表,lookup表即为分表。
HBase作为Hive的外表
Apache kylin web界面汉化
在整个设计的过程中,需要涉及到两份配置文件:
[图片上传中。。。(2)]配置文件说明
下面,分别以两张截图,对这两个配置文件做进一步的说明:
①配置文件1
以上图为例,此时在kylin中有一个名为cube3的cube,它包含Hive中test数据库的两张表OR1和OR3,其中OR3为事实表,OR1为lookup表,事实表的时间字段为approvaltime,事实表与lookup表的关联方式为left join,关联字段均为两表中的workflownumber字段,事实表与lookup表的主键均为id。配置文件中的不同的列间均以“\t”分隔,由上图不难看出,第一行第四列不存在SQL语句,第二行的第四列是有SQL语句的。这是为什么呢?其实是这样子的,为了适应实时refresh(cube)的需要,拟定一开始的segment以一个统一大时间段规定,但之后的segment均以天做区分。一个cube只对应一张事实表,所以kylin中的以天作为分区,说具体点,其实就是依据事实表的时间字段,然后再以时间作分区。再次回到上图,你会发现,第一行中的第一列与第三列的值是相等的,都是OR1,但第二行的第一列与第三列的值并不相等,它的第一列的值为OR3。之所以,出现这样的情况,是因为,kafka传过来的数据,既包含了事实表,又包含了lookup表,而配置文件中的第一列对应的就是kafka传过来的数据中待建模的表,而第三列对应的是,各表所对应的事实表。如果待建模的表如第一行所示,就是事实表,那么我需要刷新的对应的segment时间段,就是这个表本身带有的时间字段,毫无问题。但如果待建模的表是lookup表呢?能直接拿着它本身的时间字段去refresh或者build吗?显然不可以,因为之前说了,kylin中以天做分区,它的时间依据是事实表的时间字段,并不是lookup表的时间字段,所以需要通过第四列的SQL语句去查询该lookup表关联事实表的时间字段,自然第一行就不需要SQL语句了。
tip1:这里判断传过来的数据是分表还是主表,主要是对配置文件1进行处理,遍历按行读取,然后在每行的基础上再判断该行的第一列与第三列是否相等,相等则为事实表,无需查询,否则为lookup表,需要查询。具体实现的时候,一开始打算用二维数组,后来经同事提醒,二维数组每次比较的时候都需要遍历,开销太大,所以就转用HashMap了。具体的kylin查询语句如下,需要两个入参:SQL语句以及SQL中等于号后分表id具体值:
public void testCubeQuery(String SQL,String lookupid) throws JSchException, IOException
{
String cmd = "curl -X POST -H \"Authorization: Basic QURNSU46S1lMSU4=\" -H 'Content-Type: application/json' -d '{\"sql\":\""+SQL+lookupid+"\",\"offset\":0,\"limit\":500,\"acceptPartial\":false, \"project\":\"Test\"}' http://168.168.207.3:7070/kylin/api/query";
JschUtil.exec(cmd);
}
②配置文件2
配置文件2中包含2列,第一列为对应的需更新的数据时间,第二列为要更新数据对应事实表的名称。该数据源来自两处,其一是根据配置文件,判断更新数据为事实表数据后,直接将kafka传来的数据时间以及数据的表名写入配置文件2,否则则通过配置文件1中查得lookup表关联主表的时间字段,将其与主表名合并成'时间+“ ”+主表名’的格式写入配置文件。
设计的第一部分
旨在将kafka传来的数据,持久化到磁盘成为供更新的数据格式。这里的更新分为两种操作,对旧数据的Refresh,以及对新数据的AddSegment。
设计的第二部分
在更新的时候需要着重的注意的是两处。
其一:Kylin中的Refresh操作,是对一个cube中已有的segment进行Refresh,如果segment不存在,会报找不到某个segment的错误,因此会导致无法刷新。同时Refresh的时间点,也必须严格按照之前所建的进行操作,由于数据需要经常进行refresh,确保新增的segment可以以天区间长久存在,所以我们忍痛将kylin默认的7天一小merge,28天一大merge取消掉了。同时kylin中如果页面所建的segment是今天0点到次日0点,通过resetful api调用时,均要改为8点。为了较强的控制更新这一机制,使之模式化,我的思路是这样子的:
①首先对每个业务流程的数据,以统一的大segment建立一个cube,比如对应5个业务流程,分别建立5个cube,每个cube起初的build开始结束时间即最初的segment时间区间都为”1970-01-01 00:00:00”到“2016-09-18 00:00:00”,确保最开始大的segment的时间跨度包含当前所有数据的时间跨度。以后针对新增数据以新增数据对应的天数新建一个segment,比如新增数据为2016-09-20,则segment为“2016-09-20 00:00:00”到“2016-09-21 00:00:00”。这样子当kafka的数据持久化到文件中后,对文件中的数据大概会有三种操作:
**a、首先判断文件中的时间对应的segment在需要刷新的cube中是否存在,如果存在,说明要进行的是Refresh操作,否则进行的是新增及Build操作:
/*
*获取某个cube的信息,目的在于获取该cube已有的segment。
*/
public StringBuffer Cubeseg(String cubename) throws JSchException, IOException{
String cmd = "curl -X GET -H \"Authorization: Basic QURNSU46S1lMSU4=\" -H 'Content-Type: application/json' http://168.168.207.3:7070/kylin/api/cubes/"+cubename;
cubeseg=JschUtil.exec(cmd);
return cubeseg;
}
**b.判断是Refresh操作后,如果对应的segment的时间小于“2016-09-18”也即大segment的结束时间,则Refresh大segment,否则refresh小segment
/*
*Refresh原始的大segment,拟定大segment的起始时间为:1970-01-01 08:00:00,结束时间为:2016-09-012 08:00:00
*/
public StringBuffer RefreshOriginalCube(String ocube) throws ParseException {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
try {
Date start = format.parse("1970-01-01 08:00:00");
Date end = format.parse("2016-09-18 08:00:00");
String cmd = "curl -X PUT -H \"Authorization: Basic QURNSU46S1lMSU4=\" -H 'Content-Type: application/json' -d '{\"startTime\":'"+start.getTime()+"', \"endTime\":'"+end.getTime()+"', \"buildType\":\"REFRESH\"}' http://168.168.207.3:7070/kylin/api/cubes/"+ocube+"/rebuild";
buffer1=JschUtil.exec(cmd);
}catch (JSchException e){
e.printStackTrace();
}catch (IOException e){
e.printStackTrace();
}
return buffer1;
}
/*
*Refresh以天做分区的小Segment,默认小Segment的起始时间为当天的0点,结束时间为次日0点。
*/
public StringBuffer RefreshCube(String rstarttime,String rendtime,String rcube) throws ParseException {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
try {
Date start = format.parse(rstarttime+" "+"08:00:00");
Date end = format.parse(rendtime+" "+"08:00:00");
buffer2 = new StringBuffer();
String cmd = "curl -X PUT -H \"Authorization: Basic QURNSU46S1lMSU4=\" -H 'Content-Type: application/json' -d '{\"startTime\":'"+start.getTime()+"', \"endTime\":'"+end.getTime()+"', \"buildType\":\"REFRESH\"}' http://168.168.207.3:7070/kylin/api/cubes/"+rcube+"/rebuild";
buffer2=JschUtil.exec(cmd);}
catch (JSchException e){
e.printStackTrace();
}
catch (IOException e){
e.printStackTrace();
}
return buffer2;
}
其二:Segment的忙时等待设计:
①需求
每次读取数据,要检查该数据对应segment的状态,当查得有job在刷segment的时候,不能再刷,以防漏刷数据。
②问题
查询segment是通过查询cube信息获得segment信息,(以查询cube3信息为例)
public void testCube() throws JSchException, IOException
{
String cmd = "curl -X GET -H \"Authorization: Basic QURNSU46S1lMSU4=\" -H 'Content-Type: application/json' http://168.168.207.3:7070/kylin/api/cubes/cube3";
JschUtil.exec(cmd);
}
经测试发现,如果某个cube中存在segment,则该segment的状态无论是cube在enable还是disable或者该seg是否正在刷新,都显示READY状态,不具有辨识性。
segment状态
③拟定解决方案
如果一个segment正在刷新,此时再请求刷新该segment时,返回的buf会提示overlap(重叠)信息,如下图所示:
可以先自定义个字符串(overlap),每次请求操作数据时,将返回的buf内容与该字符串进行比较,如果返回的buf中存在此字符串时,说明该数据对应的segment正在刷新,将这条数据存放到set集合中。匹配过程拟用如下代码实现:
package testone;
public class compare {
public static boolean containsAny(String str,String searchChars){
return str.contains(searchChars);
}
public static void main(String[] args){
String str="{\"url\":\"http://168.168.207.3:7070/kylin/api/cubes/cube3/rebuild\",\"exception\":\"Segments overlap: cube3[20140101000000_20150101155959] and cube3[20140101000000_20150101155959]\"}";
String searchChars="overlap";
boolean a=containsAny(str,searchChars);
System.out.println(a);
}
}
Compare
如果segment正在刷新的时候,连续来了多条该segment的数据,由于是set集合,只保留一条,因此不存在冗余,定时从徐铉的文件中得到数据去刷,并且set里面的内容也定时的去刷。Set的存在在这里起到了一个分流的作用,数据从文件中读取后,一批对应的segment没在进行的,拿去刷了,另外一批存到了set中,set本身也在遍历请求刷新,之前忙碌过段时间空闲的segment,也从set中拿去刷了,还忙碌的依旧保留,当然随着定时读取文件的操作,也会有新的数据源源不断的补充到set中,这样的一个闭环,目的在于,一个都不能少。
在第二部分,除了着重注意的两处,还有一些也是需要考虑的,就是对kylin job的监控措施,kylin对于job的监控,提供了2个api,但这两个api在kylin-1.5.3 for cdh里均涉及到job id,但获取job id的api,kylin却没有提供,所以这里我们采取的是改变kylin的源码,改变后的获取job id的方法如下:
/*
*获取某个cube中的所有job的id
*/
public StringBuffer jobid(String cubename) throws JSchException, IOException {
StringBuffer job = new StringBuffer();
String cmd = "curl -X GET -H \"Authorization: Basic QURNSU46S1lMSU4=\" -H 'Content-Type: application/json' http://168.168.207.3:7070/kylin/api/jobs/list/OADepartment/"+cubename;
job= JschUtil.exec(cmd);
return job;
}
同时,为了提高程序的可用性,当程序崩溃了,可能Set集合待更新的数据就会丢失,我们又在遍历set前,将其持久化到了本地。
4、更新后的设计方案
更新后的设计方案对原始设计方案的第一部分以及配置文件的设置,没有大的改动,主要是对第二部分的刷新机制以及一些小的细节方面做了优化,这里,主要讲一下更新后的刷新机制。在原始设计方案中,对kafka到hbase中的数据,主要进行了3步操作,更新后减少为2步,通过一个返回bool值的方法,判断我要更新的数据,是否在对应cube中已经存在Segment,如果已经存在则执行Refresh操作,否则则执行Build操作。刷新的开始结束时间,如果是Refresh操作的话,则是该数据对应segment的开始结束时间,如果不是,则直接将数据的时间作为参数传进去,具体代码如下:
/*
*判断需要建模的表对应的segment是否在其对应cube中存在,存在则执行Refresh操作,否则则执行Build操作。
*/
public boolean useLoop(String cubename,String compare) throws JSONException, JSchException, IOException {boolean ok = false;json.jsonseg(cubename);for(int j=0;j0&&compare.compareTo(json.end1[j])<0);
starttime.put(compare,json.start1[j]);
endtime.put(compare, json.end1[j]);
ok=true;//如果存在,则将ok置为true
}
return ok;
}
这里if判断为最关键性地步骤,通过它来判断数据是进行Refresh还是Build操作。设置一个bool值变量ok,初始化为false,一旦数据对应的Segment已经在cube中存在,则置为true,在主函数中执行Refresh操作,否则为false时执行Build操作。
经过测试,发现方案一无论在性能上和功能上都存在一些不足,例如我有一个大的segment,其区间为1970-01-01~2016-09-12,这时候假设我们需要操作的数据为2个,其对应的时间分别为2016-04-20,2015-08-06,通过方案一的useLoop方法,会判断出这两条数据对应的segment已经存在,所以需要执行的均是refresh操作,但是它会执行两次,即2016-04-20会拿去refresh,这时候2015-08-06也会拿去refrsh,但是当后者做refresh请求时,发现该segment正在忙碌,因为去刷2016-04-20了,所以需要等待,2条其实在性能瓶颈上看的还不是特别明显,那100条呢?1000条呢?所以必须重新设计一个方案针对批量数据的处理。
方案2应运而生,并且该方案是我们最终采纳的方案,通过一个数据清洗方法-------cleanData,在请求kylin之前,对拿到的那个文件夹中的数据统一做一次处理,即将待刷新的数据放到一个refreshSet中,而新增数据则存放至addSegment中,之后只要分别遍历这两个set,再执行对应的请求就行了。具体方法如下:
/**
* 将经Kafka到HBase处理过后的数据分为需Refresh操作和需新增操作这两批
* @param cubeName
* @param dataTime
*/
public void cleanData(String cubeName, String dataTime) {
logger.info("cleanData" + ":" + cubeName + "--------" + dataTime);
json.compileSegmentJson(cubeName);
for (int j = 0; j < json.sSize; j++) {
if (dataTime.compareTo(json.segStartTime[j]) >= 0 && dataTime.compareTo(json.segEndTime[j]) < 0) {
refreshSet.add(json.segStartTime[j]);
refreshMap.put(json.segStartTime[j], json.segEndTime[j]);
CompareTime compareTime = new CompareTime(dataTime);
if (timeMap.get(json.segStartTime[j])==null) {
timeMap.put(json.segStartTime[j], new HashMap());
}
timeMap.get(json.segStartTime[j]).put(dataTime, compareTime);
}
else {
addSet.add(dataTime);
addMap.put(dataTime, time.getNextDate(dataTime));
}
}
}
其中方案2在功能以及可靠性上也做了一些改进,功能上,之前是将经kafka到HBase生成的数据文件,读到一个map中,以时间列为key值,以主表名为value值,但是一个key只能对应一个value,如果出现不同的表需要操作的数据时间相同,显然之前的设计就不适用,这里做了一点改进,将value即主表名读到一个map中,这样子,就可以在获取一个对应时间后,将它涉及到的所有主表都进行操作,具体操作如下:
Map <String,Map> initMap = new HashMap>();
class MainTableInfo{
String table;
public MainTableInfo(String table) {
super();
this.table = table;
}
}
//读取处理kafka数据后生成的文件,读取完毕后删除
public void readfile1(String file) {
String refreshFolderPath=file;
File file1 = new File(refreshFolderPath);
//获得当前文件夹所有的文件
String [] fileNames = file1.list();
FileInputStream fis = null;
InputStreamReader isr = null;
BufferedReader br = null;
String line = null;
List thisFileNames=new ArrayList();
Arrays.sort(fileNames);
for (int i = 0; i < fileNames.length; i++) {
if (!fileNames[i].contains("tmp")) {
thisFileNames.add(fileNames[i]);
}
}
//遍历读取文件
for (String fileName:thisFileNames){
try {
fis = new FileInputStream(refreshFolderPath+fileName);
isr = new InputStreamReader(fis);
br = new BufferedReader(isr);
}
catch (FileNotFoundException e) {
e.printStackTrace();
}
try {
while ((line = br.readLine()) != null) {
String[] infos = line.split(" ");
if (infos.length==2) {
String time = infos[0];
String maintablename = infos[1];
timeSet.add(time);
MainTableInfo mainTableInfo =new MainTableInfo(maintablename);
if (initMap.get(time)==null) {
initMap.put(time, new HashMap());
}
initMap.get(time).put(maintablename, mainTableInfo);
}
}
}
catch (IOException e){
e.printStackTrace();
}
finally {
try{
br.close();
isr.close();
fis.close();
}
catch (IOException e){
e.printStackTrace();
}
}
}
//删除这批读取的文件
for (String fileName:thisFileNames)
{
File sonFile=new File(refreshFolderPath+fileName);
sonFile.delete();
}
}
在可靠性上主要做了两点的改进
其一,之前是通过setfile将正在忙碌的数据持久化到本地,但是并未做删除操作,只是通过“覆盖写入”这种方式保证每次存放至它中的文件是最新一批需要等待的,但是这会引起一个问题,就是如果从kafka那边不来数据了,setfile中的值会保持不变,会一直拿去刷,但是只要可以成功刷一遍就行了哈,所以显然之前的设计是不合理的,所以对setfile又执行了一次删除操作,每次刷完删除,有新数据来了,才会生成,但这又陷入了性能和可靠性相博弈的永久话题中,如果加入这个删除操作,那么在程序读取后并删除到新的等待生成这一段时间中,程序挂掉了,那么原先的可靠性保障便不适用了。所以采取的是业内通用的方法-----冗余备份,将需要等待的数据写入两个文件,两个文件中的内容均为最近批次的待等待数据,一直读取删除其中一份,另一份只少一个删除步骤,这样如果程序挂了(只要不是运行第一次就挂了),我总可以得到最近一批次的待等待数据,可能在这最近一批次中也有一部分可以剔除了,但是为了可靠,冗余的体现就是,哪怕我的cube多刷一遍,我也不能让我的数据少更新一条!!!具体代码如下:
/**
* Segment忙时等待设计,将存放等待数据的waitSet持久化到本地
*/
public void waitSegment() {
for (String waiting : waitSet) {
logger.info("需要等待的数据为:" + " " + waiting);
Map mainTableMap=rw.initMap.get(waiting);
if (mainTableMap!=null) {
Set mainTableSet=mainTableMap.keySet();
for (String mainTable:mainTableSet) {
waitBuffer.append(waiting + " " + mainTable + "\r\n");
}
}
}
String waitString = waitBuffer.toString();
logger.info(waitString);
// rw.writeToSetFile("/xx/setfile.txt", waitString);// 将Set集合中的内容持久化到磁盘文件
// rw.writeToSetFile("/xx/setfile1.txt", waitString);
rw.writeToSetFile("C://setfile.txt", waitString);
rw.writeToSetFile("C://setfile1.txt", waitString);
//清空相应的内容,包括buffer值、Set值以及Map
waitBuffer.setLength(0);
waitSet.clear();
rw.timeSet.clear();
refreshSet.clear();
addSet.clear();
refreshMap.clear();
addMap.clear();
rw.initMap.clear();
}
可靠性改动的另一方面在于,如果正在操作的segment,无论refresh或者add,出现错误,job状态变为ERROR时,如果不做处理,就会一直阻塞,该segment就会搁置,后续想对这个segment进行操作也不行,必须先discarded掉。所以新增了一个定时监听程序,遍历所有cube,如果发现某个job处于ERROR状态,则先discarded掉该job,然后重新执行该job对应的操作,具体代码如下:
public void run() {
json.compileCubeJson();
rw.readConfigFile("C://one.txt");
for(String cubeName : json.cubeSet){
json.compileJobJson(rw.cubeProject.get(cubeName), cubeName);
Map UuidMap=json.jobIdMap.get("ERROR");
if (UuidMap!=null) {
Set jobIdSet=UuidMap.keySet();
for (String jobIdSet1:jobIdSet) {
try {
je.discardJob(jobIdSet1);
} catch (JSchException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
String jobName=json.jobSegmentMap.get(jobIdSet1);
time.getSegmentTime(jobName);
if(judge.isSegmentExist(idata.refreshCube(time.startTime,time.endTime,cubeName).toString()));{
idata.addSegment(time.startTime,time.endTime,cubeName);
}
logger.info(time.startTime+"-------"+jobName);
}
}
}
}
}
5、注意事项
①Kylin-1.5.3 for CDH不支持一个Cube中的多个segment同时刷,需要修改源码;
②kylin-1.5.3 for CDH没有开放出获取job id的接口,同样需要修改源码。