尚硅谷大数据技术之电信客服
2) 新建测试单元类 :HBaseScanTest2
|
package com.atguigu;
import com.atguigu.utils.DateTimeUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import java.io.IOException;
import java.text.ParseException;
public class HBaseScanTest2 {
private static Configuration conf = null;
static {
conf = HBaseConfiguration.create();
}
@Test
public void scanTest() throws IOException, ParseException {
String call = "14473548449";
String startPoint = "2017-01-01";
String stopPoint = "2017-09-01";
HTable hTable = new HTable(conf, "ns_telecom:calllog");
Scan scan = new Scan();
ScanRowkeyUtil scanRowkeyUtil = new ScanRowkeyUtil (call, startPoint, stopPoint);
while (scanRowkeyUtil.hasNext()) {
String[] rowKeys = scanRowkeyUtil.next();
scan.setStartRow(Bytes.toBytes(rowKeys[0]));
scan.setStopRow(Bytes.toBytes(rowKeys[1]));
System.out.println("时间范围" + rowKeys[0].substring(15, 21) + "---" + rowKeys[1].substring(15, 21));
ResultScanner resultScanner = hTable.getScanner(scan);
//每一个rowkey对应一个result
for (Result result : resultScanner) {
//每一个rowkey里面包含多个cell
Cell[] cells = result.rawCells();
StringBuilder sb = new StringBuilder();
sb.append(Bytes.toString(result.getRow())).append(",");
for (Cell c : cells) {
sb.append(Bytes.toString(CellUtil.cloneValue(c))).append(",");
}
System.out.println(sb.toString());
}
}
}
}
|
3) 运行测试
观察是否已经按照时间范围查询出对应的数据。
3.2.5 数据消费方案优化
现在我们要使用
使用HBase查找数据时,尽可能的使用rowKey去精准的定位数据位置,而非使用ColumnValueFilter或者SingleColumnValueFilter,按照单元格Cell中的Value过滤数据,这样做在数据量巨大的情况下,效率是极低的——如果要涉及到全表扫描。所以尽量不要做这样可怕的事情。注意,这并非ColumnValueFilter就无用武之地。现在,我们将使用协处理器,将数据一分为二。
思路:
a) 编写协处理器类,用于协助处理HBase的相关操作(增删改查)
b) 在协处理器中,一条主叫日志成功插入后,将该日志切换为被叫视角再次插入一次,放入到与主叫日志不同的列族中。
c) 重新创建hbase表,并设置为该表设置协处理器。
d) 编译项目,发布协处理器的jar包到hbase的lib目录下,并群发该jar包
e) 修改hbase-site.xml文件,设置协处理器,并群发该hbase-site.xml文件
编码:
1) 新建协处理器类:CalleeWriteObserver,并覆写postPut方法,该方法会在数据成功插入之后被回调。
|
package com.atguigu.coprocessor;
import com.atguigu.utils.HBaseUtil;
import com.atguigu.utils.PropertiesUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
/**
- 用于实现主叫日志插入成功之后,同时插入一条被叫日志
*/
public class CalleeWriteObserver extends BaseRegionObserver{
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
super.postPut(e, put, edit, durability);
//1、获取需要操作的表
String targetTableName = PropertiesUtil.getProperty("hbase.table.name");
//2、获取当前操作的表
String currentTableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
//3、判断需要操作的表是否就是当前表,如果不是,则return
if (!StringUtils.equals(targetTableName, currentTableName)) return;
//4、得到当前插入数据的值并封装新的数据,oriRowkey举例:01_15369468720_20170727081033_13720860202_1_0180
String oriRowKey = Bytes.toString(put.getRow());
String[] splits = oriRowKey.split("_");
String flag = splits[4];
//如果当前插入的是被叫数据,则直接返回(因为默认提供的数据全部为主叫数据)
if(StringUtils.equals(flag, "0")) return;
//当前插入的数据描述
String caller = splits[1];
String callee = splits[3];
String dateTime = splits[2];
String duration = splits[5];
String timestamp = null;
try {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
timestamp = String.valueOf(sdf.parse(dateTime).getTime());
} catch (ParseException e1) {
e1.printStackTrace();
}
//组装新的数据所在分区号
int regions = Integer.valueOf(PropertiesUtil.getProperty("hbase.regions.count"));
String regionHash = HBaseUtil.genPartitionCode(callee, dateTime, regions);
String newFlag = "0";
String rowKey = HBaseUtil.genRowKey(regionHash, callee, dateTime, caller, newFlag, duration);
//开始存放被叫数据
Put newPut = new Put(Bytes.toBytes(rowKey));
newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("call1"), Bytes.toBytes(callee));
newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("call2"), Bytes.toBytes(caller));
newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("date_time"), Bytes.toBytes(dateTime));
newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("date_time_ts"), Bytes.toBytes(timestamp));
newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("duration"), Bytes.toBytes(duration));
newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("flag"), Bytes.toBytes(newFlag));
HTableInterface hTable = e.getEnvironment().getTable(TableName.valueOf(targetTableName));
hTable.put(newPut);
hTable.close();
}
}
|
2) 重新创建hbase****表,并设置为该表设置协处理器。在“表描述器”中调用addCoprocessor****方法进行协处理器的设置,大概是这样的:(你需要找到你的建表的那部分代码,添加如下逻辑)
|
tableDescriptor.addCoprocessor("com.atguigu.coprocessor.CalleeWriteObserver");
|
本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。