Java操作Hbase
2020-08-07 本文已影响0人
xiaogp
Hbase Java API.png
简单测试
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.filter.Filter;
import java.io.IOException;
import java.util.*;
public class HbaseTest {
// public static Configuration conf;
// public static Connection conn;
//
// static {
// conf = HBaseConfiguration.create();
// conf.set("hbase.zookeeper.property.clientPort", "2181");
// try {
// conn = ConnectionFactory.createConnection(conf);
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
public static void main(String[] args) throws IOException {
// 获得连接对象
Connection connection = initHbase();
// 获得table对象
Table table = connection.getTable(TableName.valueOf("test:gp"));
// 获得admin对象
Admin admin = connection.getAdmin();
System.out.println("对table的操作");
// 获得rowkey的所有cell
Map res = getOneRow(table, "rk001");
System.out.println(res);
System.out.println(res.get("name"));
// 指定rowkey, 列和限定符
String res2 = getOneRowColumn(table, "rk001", "info", "age");
System.out.println(res2);
// 多个rowkey批量查询
List<String> res3 = getMultiRowColumn(table, "info", "name", "rk001", "rk002", "rk003");
System.out.println(res3);
// 插入数据, 指定rowkey,列簇,列,值
insertDataTest(table, "rk004", "info", "name", "xgp");
// 指定rowkey插入多个列
insertDataTest2(table, "rk003", "info");
// 指定batch批量插入
insertBatchTest(table);
// 删除数据, 指定rowkey删除数据
deleteRowkey(table, "rk004");
// 删除指定的cell
deleteRowkeyCell(table, "rk003", "info", "age");
// rowkey前缀搜索 PrefixFilter
System.out.println("filter scan");
PrefixFilter filter = new PrefixFilter("rk".getBytes());
ResultScanner resultScanner = fuzzyScan(table, filter);
for (Result result : resultScanner) {
System.out.println(Bytes.toString(result.getRow()));
System.out.println(Bytes.toString(result.getValue("info".getBytes(), "name".getBytes())));
}
// 根据rowkey范围查询
// 大于等于rk001, 小于rk003
rowkeyRangeScanTest(table,"rk001", "rk003");
System.out.println("对admin的操作");
// 建表
createTable(admin, "test:gp2", "info1", "info2");
// 删除表
deleteTable(admin, "test:gp2");
}
/**
* 连接hbase
*/
public static Connection initHbase() throws IOException {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "cloudera01,cloudera02,cloudera03");
Connection connection = ConnectionFactory.createConnection(configuration);
return connection;
}
/**
* 通过rowkey查询一条数据的所有单元格
*/
public static Map getOneRow(Table table, String rowKey) throws IOException {
// Get get = new Get(rowKey.getBytes());
Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);
Map<String, String> map = new HashMap<>();
for (Cell cell : result.listCells()) {
String colName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
map.put(colName, value);
};
return map;
}
/**
* 根据rowkeu查询指定的cell内容
*/
public static String getOneRowColumn(Table table, String rowkey, String columnFamily, String column) throws IOException {
Get get = new Get(rowkey.getBytes());
get.addColumn(columnFamily.getBytes(), column.getBytes());
Result result = table.get(get);
String res = Bytes.toString(result.getValue(columnFamily.getBytes(), column.getBytes()));
return res;
}
/**
* 查询多个rowkey
*/
public static List<String> getMultiRowColumn(Table table, String columnFamily, String column, String... rowkeys) throws IOException {
List<Get> getList = new ArrayList<>();
Arrays.stream(rowkeys).forEach(x -> getList.add(new Get(x.getBytes())));
Result[] result = table.get(getList);
// 直接获取值的List
List<String> resList = new ArrayList<>();
for (Result res : result) {
resList.add(Bytes.toString(res.getValue(columnFamily.getBytes(), column.getBytes())));
}
// 获取kv对
Map<String, Result> resMap = new HashMap<>();
for (Result res : result) {
resMap.put(Bytes.toString(res.getRow()), res);
}
System.out.println(resMap);
System.out.println(Bytes.toString(resMap.get("rk001").getValue("info".getBytes(), "name".getBytes())));
return resList;
}
/**
* scan的filter
*/
public static ResultScanner fuzzyScan(Table table, Filter... filters) throws IOException {
Scan s = new Scan();
for (Filter filter : filters) {
s.setFilter(filter);
}
return table.getScanner(s);
}
/**
* 根据rowkey范围查询
*/
public static void rowkeyRangeScanTest(Table table, String startRow, String endRow) throws IOException {
Scan s = new Scan(startRow.getBytes(), endRow.getBytes());
ResultScanner res = table.getScanner(s);
for (Result result : res) {
System.out.println(Bytes.toString(result.getRow()));
System.out.println(Bytes.toString(result.getValue("info".getBytes(), "name".getBytes())));
System.out.println(result.size()); // size返回cell数
}
}
/**
* 插入数据
*/
public static void insertDataTest(Table table, String rowkey, String columnFamily, String column, String value) throws IOException {
Put put = new Put(rowkey.getBytes());
put.addColumn(columnFamily.getBytes(), column.getBytes(), value.getBytes());
table.put(put);
}
/**
* 一个rowkey插入多个列
*/
public static void insertDataTest2(Table table, String rowkey, String columnFamily) throws IOException {
Map<String, String> data = new HashMap() {{
put("name", "gp");
put("age", "12");
put("city", "suz");
}};
System.out.println(data);
Put put = new Put(rowkey.getBytes());
for (Map.Entry<String, String> items : data.entrySet()) {
put.addColumn(columnFamily.getBytes(), items.getKey().getBytes(), items.getValue().getBytes());
table.put(put);
}
}
/**
* 批量插入
*/
public static void insertBatchTest(Table table) throws IOException {
List<String> data = new ArrayList<>(Arrays.asList("1", "2", "3", "4", "a", "6", "8", "10", "e"));
List<Put> putList = new ArrayList<>();
for (String item : data) {
Put put = new Put(("rk_" + item).getBytes());
put.addColumn("info".getBytes(), "name".getBytes(), ("name_" + item).getBytes());
put.addColumn("info".getBytes(), "age".getBytes(), ("age_" + item).getBytes());
putList.add(put);
if (putList.size() > 3) {
table.put(putList);
putList.clear();
}
}
table.put(putList);
putList.clear();
}
/**
* 指定rowkey删除rowkey的所有cell数据
*/
public static void deleteRowkey(Table table, String rowkey) throws IOException {
Delete delete = new Delete(rowkey.getBytes());
table.delete(delete);
}
/**
* 删除指定rowkey的某个cell
*/
public static void deleteRowkeyCell(Table table, String rowkey, String columnFamily, String column) throws IOException {
Delete delete = new Delete(rowkey.getBytes());
delete.addColumn(columnFamily.getBytes(), column.getBytes());
table.delete(delete);
}
/**
* 使用admin建表, 指定表名和列簇名
*/
public static void createTable(Admin admin, String tableName, String... cols) throws IOException {
if (admin.tableExists(TableName.valueOf(tableName))) {
System.out.println("表已存在!");
} else {
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
for (String col : cols) {
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(col);
tableDesc.addFamily(hColumnDescriptor);
}
admin.createTable(tableDesc);
System.out.println(tableName + "创建成功");
}
}
/**
* 删除表
*/
public static void deleteTable(Admin admin, String tableName) throws IOException {
admin.disableTable(TableName.valueOf(tableName));
admin.deleteTable(TableName.valueOf(tableName));
}
}
创建单例模式
import com.amarsoft.Main;
import com.amarsoft.model.entModel;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
public class HBaseUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
private static HBaseUtils instance;
private static final byte[] INFO = Bytes.toBytes("info");
private static final byte[] RID = Bytes.toBytes("rid");
private static final byte[] SITE_NAME = Bytes.toBytes("sitename");
private static final byte[] RID_COUNT = Bytes.toBytes("id_count");
private static final byte[] SITE_NAME_COUNT = Bytes.toBytes("sitename_count");
private Table table;
private HBaseUtils() {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", Config.getString("hbase.zookeeper.quorum"));
try {
Connection conn = ConnectionFactory.createConnection(configuration);
table = conn.getTable(TableName.valueOf(Config.getString("hbase.tableName")));
} catch (Exception e) {
LOGGER.info("HBase链接失败", e);
}
}
public static HBaseUtils getInstance() {
if (instance == null) {
synchronized (HBaseUtils.class) {
if (instance == null) {
instance = new HBaseUtils();
}
}
}
return instance;
}
public Map<String, entModel> getBatchRow(String... rowKeys) throws IOException {
List<Get> getList = new ArrayList<>();
Arrays.stream(rowKeys).forEach(x -> getList.add(new Get(x.getBytes())));
Result[] result = table.get(getList);
Map<String, entModel> existData = new HashMap<>();
for (Result res : result) {
if (!res.isEmpty()) {
existData.put(Bytes.toString(res.getRow()), new entModel() {{
setAllRid(Bytes.toString(res.getValue(INFO, RID)));
setAllSiteName(Bytes.toString(res.getValue(INFO, SITE_NAME)));
}});
}
}
return existData;
}
public void putBatchRow(Map<String, entModel> batchData) throws IOException {
List<Put> putList = new ArrayList<>();
for (Map.Entry<String, entModel> items : batchData.entrySet()) {
Set<String> ridSet = items.getValue().getRid();
Set<String> siteName = items.getValue().getSiteName();
Put put = new Put(items.getKey().getBytes());
put.addColumn(INFO, RID, String.join(",", ridSet).getBytes());
put.addColumn(INFO, SITE_NAME, String.join(",", siteName).getBytes());
put.addColumn(INFO, RID_COUNT, String.valueOf(ridSet.size()).getBytes());
put.addColumn(INFO, SITE_NAME_COUNT, String.valueOf(siteName.size()).getBytes());
putList.add(put);
System.out.println(putList.size());
if (putList.size() > Integer.parseInt(Config.getString("hbase.put.batch"))) {
table.put(putList);
putList.clear();
}
}
table.put(putList);
putList.clear();
}
public static void main(String[] args) {
}
}