尚硅谷大数据技术之电信客服

2018-12-25  本文已影响24人  尚硅谷教育

2) 新建测试单元类 :HBaseScanTest2

|

package com.atguigu;

import com.atguigu.utils.DateTimeUtil;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.CellUtil;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.ResultScanner;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.util.Bytes;

import org.junit.Test;

import java.io.IOException;

import java.text.ParseException;

public class HBaseScanTest2 {

private static Configuration conf = null;

static {

conf = HBaseConfiguration.create();

}

@Test

public void scanTest() throws IOException, ParseException {

String call = "14473548449";

String startPoint = "2017-01-01";

String stopPoint = "2017-09-01";

HTable hTable = new HTable(conf, "ns_telecom:calllog");

Scan scan = new Scan();

ScanRowkeyUtil scanRowkeyUtil = new ScanRowkeyUtil (call, startPoint, stopPoint);

while (scanRowkeyUtil.hasNext()) {

String[] rowKeys = scanRowkeyUtil.next();

scan.setStartRow(Bytes.toBytes(rowKeys[0]));

scan.setStopRow(Bytes.toBytes(rowKeys[1]));

System.out.println("时间范围" + rowKeys[0].substring(15, 21) + "---" + rowKeys[1].substring(15, 21));

ResultScanner resultScanner = hTable.getScanner(scan);

//每一个rowkey对应一个result

for (Result result : resultScanner) {

//每一个rowkey里面包含多个cell

Cell[] cells = result.rawCells();

StringBuilder sb = new StringBuilder();

sb.append(Bytes.toString(result.getRow())).append(",");

for (Cell c : cells) {

sb.append(Bytes.toString(CellUtil.cloneValue(c))).append(",");

}

System.out.println(sb.toString());

}

}

}

}

|

3) 运行测试

观察是否已经按照时间范围查询出对应的数据。

3.2.5 数据消费方案优化

现在我们要使用

使用HBase查找数据时,尽可能的使用rowKey去精准的定位数据位置,而非使用ColumnValueFilter或者SingleColumnValueFilter,按照单元格Cell中的Value过滤数据,这样做在数据量巨大的情况下,效率是极低的——如果要涉及到全表扫描。所以尽量不要做这样可怕的事情。注意,这并非ColumnValueFilter就无用武之地。现在,我们将使用协处理器,将数据一分为二。

思路:

a) 编写协处理器类,用于协助处理HBase的相关操作(增删改查)

b) 在协处理器中,一条主叫日志成功插入后,将该日志切换为被叫视角再次插入一次,放入到与主叫日志不同的列族中。

c) 重新创建hbase表,并设置为该表设置协处理器。

d) 编译项目,发布协处理器的jar包到hbase的lib目录下,并群发该jar包

e) 修改hbase-site.xml文件,设置协处理器,并群发该hbase-site.xml文件

编码:

1) 新建协处理器类:CalleeWriteObserver,并覆写postPut方法,该方法会在数据成功插入之后被回调。

|

package com.atguigu.coprocessor;

import com.atguigu.utils.HBaseUtil;

import com.atguigu.utils.PropertiesUtil;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.client.*;

import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;

import org.apache.hadoop.hbase.coprocessor.ObserverContext;

import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;

import org.apache.hadoop.hbase.regionserver.wal.WALEdit;

import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

import java.text.ParseException;

import java.text.SimpleDateFormat;

/**

*/

public class CalleeWriteObserver extends BaseRegionObserver{

@Override

public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {

super.postPut(e, put, edit, durability);

//1、获取需要操作的表

String targetTableName = PropertiesUtil.getProperty("hbase.table.name");

//2、获取当前操作的表

String currentTableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();

//3、判断需要操作的表是否就是当前表,如果不是,则return

if (!StringUtils.equals(targetTableName, currentTableName)) return;

//4、得到当前插入数据的值并封装新的数据,oriRowkey举例:01_15369468720_20170727081033_13720860202_1_0180

String oriRowKey = Bytes.toString(put.getRow());

String[] splits = oriRowKey.split("_");

String flag = splits[4];

//如果当前插入的是被叫数据,则直接返回(因为默认提供的数据全部为主叫数据)

if(StringUtils.equals(flag, "0")) return;

//当前插入的数据描述

String caller = splits[1];

String callee = splits[3];

String dateTime = splits[2];

String duration = splits[5];

String timestamp = null;

try {

SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");

timestamp = String.valueOf(sdf.parse(dateTime).getTime());

} catch (ParseException e1) {

e1.printStackTrace();

}

//组装新的数据所在分区号

int regions = Integer.valueOf(PropertiesUtil.getProperty("hbase.regions.count"));

String regionHash = HBaseUtil.genPartitionCode(callee, dateTime, regions);

String newFlag = "0";

String rowKey = HBaseUtil.genRowKey(regionHash, callee, dateTime, caller, newFlag, duration);

//开始存放被叫数据

Put newPut = new Put(Bytes.toBytes(rowKey));

newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("call1"), Bytes.toBytes(callee));

newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("call2"), Bytes.toBytes(caller));

newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("date_time"), Bytes.toBytes(dateTime));

newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("date_time_ts"), Bytes.toBytes(timestamp));

newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("duration"), Bytes.toBytes(duration));

newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("flag"), Bytes.toBytes(newFlag));

HTableInterface hTable = e.getEnvironment().getTable(TableName.valueOf(targetTableName));

hTable.put(newPut);

hTable.close();

}

}

|

2) 重新创建hbase****表,并设置为该表设置协处理器。在“表描述器”中调用addCoprocessor****方法进行协处理器的设置,大概是这样的:(你需要找到你的建表的那部分代码,添加如下逻辑)

|

tableDescriptor.addCoprocessor("com.atguigu.coprocessor.CalleeWriteObserver");

|

本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。

上一篇 下一篇

猜你喜欢

热点阅读