Java学习大数据学习

Java操作Hbase

2020-08-07  本文已影响0人  xiaogp
Hbase Java API.png

简单测试

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.filter.Filter;

import java.io.IOException;
import java.util.*;


public class HbaseTest {
//    public static Configuration conf;
//    public static Connection conn;
//
//    static {
//        conf = HBaseConfiguration.create();
//        conf.set("hbase.zookeeper.property.clientPort", "2181");
//        try {
//            conn = ConnectionFactory.createConnection(conf);
//        } catch (IOException e) {
//            e.printStackTrace();
//        }
//    }

    public static void main(String[] args) throws IOException {
        // 获得连接对象
        Connection connection = initHbase();
        // 获得table对象
        Table table = connection.getTable(TableName.valueOf("test:gp"));
        // 获得admin对象
        Admin admin = connection.getAdmin();

        System.out.println("对table的操作");
        // 获得rowkey的所有cell
        Map res = getOneRow(table, "rk001");
        System.out.println(res);
        System.out.println(res.get("name"));

        // 指定rowkey, 列和限定符
        String res2 = getOneRowColumn(table, "rk001", "info", "age");
        System.out.println(res2);

        // 多个rowkey批量查询
        List<String> res3 = getMultiRowColumn(table, "info", "name", "rk001", "rk002", "rk003");
        System.out.println(res3);

        // 插入数据, 指定rowkey,列簇,列,值
        insertDataTest(table, "rk004", "info", "name", "xgp");

        // 指定rowkey插入多个列
        insertDataTest2(table, "rk003", "info");

        // 指定batch批量插入
        insertBatchTest(table);

        // 删除数据, 指定rowkey删除数据
        deleteRowkey(table, "rk004");

        // 删除指定的cell
        deleteRowkeyCell(table, "rk003", "info", "age");

        // rowkey前缀搜索  PrefixFilter
        System.out.println("filter scan");
        PrefixFilter filter = new PrefixFilter("rk".getBytes());
        ResultScanner resultScanner = fuzzyScan(table, filter);
        for (Result result : resultScanner) {
            System.out.println(Bytes.toString(result.getRow()));
            System.out.println(Bytes.toString(result.getValue("info".getBytes(), "name".getBytes())));
        }

        // 根据rowkey范围查询
        // 大于等于rk001, 小于rk003
        rowkeyRangeScanTest(table,"rk001", "rk003");

        System.out.println("对admin的操作");
        // 建表
        createTable(admin, "test:gp2", "info1", "info2");
        // 删除表
        deleteTable(admin, "test:gp2");

    }

    /**
     * 连接hbase
     */
    public static Connection initHbase() throws IOException {

        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "cloudera01,cloudera02,cloudera03");
        Connection connection = ConnectionFactory.createConnection(configuration);
        return connection;
    }

    /**
     * 通过rowkey查询一条数据的所有单元格
     */
    public static Map getOneRow(Table table, String rowKey) throws IOException {
//        Get get = new Get(rowKey.getBytes());
        Get get = new Get(Bytes.toBytes(rowKey));
        Result result = table.get(get);
        Map<String, String> map = new HashMap<>();
        for (Cell cell : result.listCells()) {
            String colName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
            String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
            map.put(colName, value);
        };
        return map;
    }

    /**
     * 根据rowkeu查询指定的cell内容
     */
    public static String getOneRowColumn(Table table, String rowkey, String columnFamily, String column) throws IOException {
        Get get = new Get(rowkey.getBytes());
        get.addColumn(columnFamily.getBytes(), column.getBytes());
        Result result = table.get(get);
        String res = Bytes.toString(result.getValue(columnFamily.getBytes(), column.getBytes()));
        return res;
    }

    /**
     * 查询多个rowkey
     */
    public static List<String> getMultiRowColumn(Table table, String columnFamily, String column, String... rowkeys) throws IOException {
        List<Get> getList = new ArrayList<>();
        Arrays.stream(rowkeys).forEach(x -> getList.add(new Get(x.getBytes())));
        Result[] result = table.get(getList);

        // 直接获取值的List
        List<String> resList = new ArrayList<>();
        for (Result res : result) {
            resList.add(Bytes.toString(res.getValue(columnFamily.getBytes(), column.getBytes())));
        }

        // 获取kv对
        Map<String, Result> resMap = new HashMap<>();
        for (Result res : result) {
            resMap.put(Bytes.toString(res.getRow()), res);
        }
        System.out.println(resMap);
        System.out.println(Bytes.toString(resMap.get("rk001").getValue("info".getBytes(), "name".getBytes())));

        return resList;
    }

    /**
     * scan的filter
     */
    public static ResultScanner fuzzyScan(Table table, Filter... filters) throws IOException {
        Scan s = new Scan();
        for (Filter filter : filters) {
            s.setFilter(filter);
        }
        return table.getScanner(s);
    }

    /**
     * 根据rowkey范围查询
     */
    public static void rowkeyRangeScanTest(Table table, String startRow, String endRow) throws IOException {
        Scan s = new Scan(startRow.getBytes(), endRow.getBytes());
        ResultScanner res = table.getScanner(s);

        for (Result result : res) {
            System.out.println(Bytes.toString(result.getRow()));
            System.out.println(Bytes.toString(result.getValue("info".getBytes(), "name".getBytes())));
            System.out.println(result.size());  // size返回cell数
        }
    }

    /**
     * 插入数据
     */
    public static void insertDataTest(Table table, String rowkey, String columnFamily, String column, String value) throws IOException {
        Put put = new Put(rowkey.getBytes());
        put.addColumn(columnFamily.getBytes(), column.getBytes(), value.getBytes());
        table.put(put);
    }

    /**
     * 一个rowkey插入多个列
     */
    public static void insertDataTest2(Table table, String rowkey, String columnFamily) throws IOException {
        Map<String, String> data = new HashMap() {{
            put("name", "gp");
            put("age", "12");
            put("city", "suz");
        }};
        System.out.println(data);

        Put put = new Put(rowkey.getBytes());
        for (Map.Entry<String, String> items : data.entrySet()) {
            put.addColumn(columnFamily.getBytes(), items.getKey().getBytes(), items.getValue().getBytes());
            table.put(put);
        }
    }

    /**
     * 批量插入
     */
    public static void insertBatchTest(Table table) throws IOException {
        List<String> data = new ArrayList<>(Arrays.asList("1", "2", "3", "4", "a", "6", "8", "10", "e"));
        List<Put> putList = new ArrayList<>();

        for (String item : data) {
            Put put = new Put(("rk_" + item).getBytes());
            put.addColumn("info".getBytes(), "name".getBytes(), ("name_" + item).getBytes());
            put.addColumn("info".getBytes(), "age".getBytes(), ("age_" + item).getBytes());
            putList.add(put);

            if (putList.size() > 3) {
                table.put(putList);
                putList.clear();
            }
        }
        table.put(putList);
        putList.clear();
    }

    /**
     * 指定rowkey删除rowkey的所有cell数据
     */
    public static void deleteRowkey(Table table, String rowkey) throws IOException {
        Delete delete = new Delete(rowkey.getBytes());
        table.delete(delete);
    }

    /**
     * 删除指定rowkey的某个cell
     */
    public static void deleteRowkeyCell(Table table, String rowkey, String columnFamily, String column) throws IOException {
        Delete delete = new Delete(rowkey.getBytes());
        delete.addColumn(columnFamily.getBytes(), column.getBytes());
        table.delete(delete);
    }

    /**
     * 使用admin建表, 指定表名和列簇名
     */
    public static void createTable(Admin admin, String tableName, String... cols) throws IOException {
        if (admin.tableExists(TableName.valueOf(tableName))) {
            System.out.println("表已存在!");
        } else {
            HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
            for (String col : cols) {
                HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(col);
                tableDesc.addFamily(hColumnDescriptor);
            }
            admin.createTable(tableDesc);
            System.out.println(tableName + "创建成功");
        }
    }

    /**
     * 删除表
     */
    public static void deleteTable(Admin admin, String tableName) throws IOException {
        admin.disableTable(TableName.valueOf(tableName));
        admin.deleteTable(TableName.valueOf(tableName));
    }

}

创建单例模式


import com.amarsoft.Main;
import com.amarsoft.model.entModel;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;


public class HBaseUtils {
    private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);

    private static HBaseUtils instance;
    private static final byte[] INFO = Bytes.toBytes("info");
    private static final byte[] RID = Bytes.toBytes("rid");
    private static final byte[] SITE_NAME = Bytes.toBytes("sitename");
    private static final byte[] RID_COUNT = Bytes.toBytes("id_count");
    private static final byte[] SITE_NAME_COUNT = Bytes.toBytes("sitename_count");
    private Table table;

    private HBaseUtils() {
        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", Config.getString("hbase.zookeeper.quorum"));
        try {
            Connection conn = ConnectionFactory.createConnection(configuration);
            table = conn.getTable(TableName.valueOf(Config.getString("hbase.tableName")));
        } catch (Exception e) {
            LOGGER.info("HBase链接失败", e);
        }

    }

    public static HBaseUtils getInstance() {
        if (instance == null) {
            synchronized (HBaseUtils.class) {
                if (instance == null) {
                    instance = new HBaseUtils();
                }
            }
        }
        return instance;
    }

    public Map<String, entModel> getBatchRow(String... rowKeys) throws IOException {
        List<Get> getList = new ArrayList<>();
        Arrays.stream(rowKeys).forEach(x -> getList.add(new Get(x.getBytes())));
        Result[] result = table.get(getList);

        Map<String, entModel> existData = new HashMap<>();
        for (Result res : result) {
            if (!res.isEmpty()) {
                existData.put(Bytes.toString(res.getRow()), new entModel() {{
                    setAllRid(Bytes.toString(res.getValue(INFO, RID)));
                    setAllSiteName(Bytes.toString(res.getValue(INFO, SITE_NAME)));
                }});
            }
        }
        return existData;
    }

    public void putBatchRow(Map<String, entModel> batchData) throws IOException {
        List<Put> putList = new ArrayList<>();
        for (Map.Entry<String, entModel> items : batchData.entrySet()) {
            Set<String> ridSet = items.getValue().getRid();
            Set<String> siteName = items.getValue().getSiteName();

            Put put = new Put(items.getKey().getBytes());
            put.addColumn(INFO, RID, String.join(",", ridSet).getBytes());
            put.addColumn(INFO, SITE_NAME, String.join(",", siteName).getBytes());
            put.addColumn(INFO, RID_COUNT, String.valueOf(ridSet.size()).getBytes());
            put.addColumn(INFO, SITE_NAME_COUNT, String.valueOf(siteName.size()).getBytes());
            putList.add(put);

            System.out.println(putList.size());
            if (putList.size() > Integer.parseInt(Config.getString("hbase.put.batch"))) {
                table.put(putList);
                putList.clear();
            }
        }
        table.put(putList);
        putList.clear();
    }

    public static void main(String[] args) {

    }

}

上一篇下一篇

猜你喜欢

热点阅读