hbase批量提交与批量读取

2018-12-03  本文已影响0人  愤怒的小猥琐

需求1

大量数据(10W个点,每个点为double类型)实时存入hbase,hbase为单机版

实现核心代码

  1. put
public static long put(String tablename, List<Put> puts) throws Exception {
        long currentTime = System.currentTimeMillis();
        Connection conn = getConnection();
        final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
            @Override
            public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
                for (int i = 0; i < e.getNumExceptions(); i++) {
                    System.out.println("Failed to sent put " + e.getRow(i) + ".");
                    logger.error("Failed to sent put " + e.getRow(i) + ".");
                }
            }
        };
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename))
                .listener(listener);
        params.writeBufferSize(5 * 1024 * 1024);

        final BufferedMutator mutator = conn.getBufferedMutator(params);
        try {
            mutator.mutate(puts);
            mutator.flush();
        } finally {
            mutator.close();
            closeConnect(conn);
        }
        return System.currentTimeMillis() - currentTime;
    }
  1. 多线程操作
 threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    HBaseUtil.put(tableName, puts);
                } catch (Exception e) {
                    logger.error("batchPut failed . ", e);
                }
            }
        });

        if(waiting){
            try {
                threadPool.awaitTermination();
            } catch (InterruptedException e) {
                logger.error("HBase put job thread pool await termination time out.", e);
            }
        }

需求2

批量数据读取及优化

  1. 读取代码
 public List<TimeValue> getPeroidDate(String point, DateTime startTime, DateTime endTime) throws IOException {
        List<TimeValue> series = new ArrayList<>();
        byte[] start = Bytes.toBytes(startTime.getMillis());
        byte[] end = Bytes.toBytes(endTime.getMillis());
        byte[] id = Bytes.toBytes(point);
        Scan scan = new Scan(id, id);
        scan.addFamily(Bytes.toBytes("history"));
        Filter f = new ColumnRangeFilter(start, true, end, false);
        scan.setFilter(f);
        scan.setCaching(5000);
        scan.setBatch(5000);
        scan.setCacheBlocks(false);
        //TODO Add parameters to support the pagination
        ResultScanner r = null;
        try {
            r = table.getScanner(scan);
        } catch (IOException e) {
            e.printStackTrace();
        }
        for (Result result : r) {
            for (Cell cell : result.rawCells()) {
                series.add(new TimeValue(new Timestamp(Bytes.toLong(CellUtil.cloneQualifier(cell))), Bytes.toString(CellUtil.cloneValue(cell))));
            }
        }
        return series;
    }
  1. 优化
    多线程将时间切割为一段一段分别读取并合并
上一篇 下一篇

猜你喜欢

热点阅读