对HBase进行过滤
2016-12-13 本文已影响443人
_helloliang
包括表过滤、列标签过滤、值过滤
HBaseAdmin admin = null;
List<Filter> filters = new ArrayList<>();
// 值过滤
// 保留列族cf:列标签cq="lunch"的行
Filter lunchFilter = new SingleColumnValueFilter(
Bytes.toBytes("cf"),
Bytes.toBytes("cq"),
CompareOp.EQUAL,
Bytes.toBytes("lunch"));
// 保留列族cf:列标签cq="pageview"的行
Filter pageviewFilter = new SingleColumnValueFilter(
Bytes.toBytes("cf"),
Bytes.toBytes("cq"),
CompareOp.EQUAL,
Bytes.toBytes("pageview"));
// 添加过滤条件
filters.add(lunchFilter);
filters.add(pageviewFilter);
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE, filters);
// 列标签过滤
// 只获取部分列(列标签)
String[] columns = new String[]{"UUID", "SERVER_TIME", "PLATFORM", "BROWSER_NAME"};
// 添加过滤条件
filterList.addFilter(this.getColumnFilter(columns));
// 设置scan
List<Scan> scans = new ArrayList<Scan>();
try {
admin = new HBaseAdmin(conf);
byte[] tableName = Bytes.toBytes("test");
if (admin.tableExists(tableName)) {
// 如果表存在
Scan scan = new Scan();
// 对针对特定表进行过滤
scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, tableName);
scan.setFilter(filterList);
// 可添加多个Scan对象
scans.add(scan);
}
} catch (Exception e) {
throw new RuntimeException("创建HBaseAdmin发生异常", e);
} finally {
if (admin != null) {
try {
admin.close();
} catch (IOException e) {
// nothings
}
}
}
if (scans.isEmpty()) {
throw new IOException("没有表存在,无法创建scan集合");
}
TableMapReduceUtil.initTableMapperJob(scans, ActiveVisitorMapper.class, UserStatisticD.class, Text.class, job, false);
private Filter getColumnFilter(String[] columns) {
int length = columns.length;
byte[][] filter = new byte[length][];
for (int i = 0; i < length; i++) {
filter[i] = Bytes.toBytes(columns[i]);
}
return new MultipleColumnPrefixFilter(filter);
}