大数据运维程序员

大数据运维问题记录(五)

2016-07-28  本文已影响94人  火车飞侠

问题描述:集群中原有采集程序从源文件入hbase出现积压,优化修改程序都无济于事,需要赶紧出个方案进行解决

问题解决:集群中的采集程序也有一条线是从源文件入到hdfs的,所以计划以hdfs里的数据为源数据采用mapreduce生成hfile后通过bulkload的方式入hbase避免了原始数据的清洗操作

以下是开发的程序

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class HFileGenerator {

    public static class HFileMapper extends
            Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String symbol = "_";
            if ("".equals(line)||null == line) {
                return;
            }
            String[] items = line.split("\\|", -1);
            //根据业务需要组合rowkey
            byte[] row = Bytes.toBytes(items[0]+symbol+items[1]+symbol+items[2]+symbol+items[3]+symbol+items[4]);  
            ImmutableBytesWritable rowkey = new ImmutableBytesWritable(row);
            System.out.println(rowkey);
            KeyValue kv = new KeyValue(row,
                    "f1".getBytes(), "column1".getBytes(),
                    System.currentTimeMillis(), Bytes.toBytes(line));
            if (null != kv) {
                System.out.println("kv"+kv);
                context.write(rowkey, kv);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Table table = null;
        try{
        Configuration conf = HBaseConfiguration.create();
        Connection connection = ConnectionFactory.createConnection(conf);
        table = connection.getTable(TableName.valueOf("hbase_test"));
        Job job = Job.getInstance(conf);
        job.setJobName("HFile bulk load test");
        job.setJarByClass(HFileGenerator.class);

        job.setOutputKeyClass(ImmutableBytesWritable.class);    
        job.setOutputValueClass(KeyValue.class);
        
        job.setMapperClass(HFileMapper.class);
        job.setReducerClass(KeyValueSortReducer.class);

        job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
         // 判断output文件夹是否存在,如果存在则删除  
        Path path = new Path("hdfs://lip1:8020/user/lipeng/hbase/output");
        FileSystem fileSystem = path.getFileSystem(conf); 
        if (fileSystem.exists(path)) {  
            fileSystem.delete(path, true); 
        }  
       Path path1 =  new Path("hdfs://lip1:8020/user/lipeng/hbase/output");
        FileInputFormat.addInputPath(job, new Path("hdfs://lip1:8020/user/lipeng/hbase/input"));
        FileOutputFormat.setOutputPath(job, path1);
        HFileOutputFormat.configureIncrementalLoad(job, (HTable) table);
         if (job.waitForCompletion(true)) {  
             FsShell shell = new FsShell(conf);  
             try {  
                 //将该目录赋予777权限
                 shell.run(new String[]{"-chmod", "-R", "777", "hdfs://lip1:8020/user/lipeng/hbase/output"});  
             } catch (Exception e) {  
                 throw new IOException(e);  
             }  
             //加载到hbase表  
             LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);  
             loader.doBulkLoad(path1, (HTable) table);  
         } else {  
             System.exit(1);  
         }  
        }catch(Exception e){
           e.printStackTrace();
        }finally{
             if (table != null) {  
                 table.close();
                }  
        }
    }
}

执行的时候需要将hbase的classpath添加到hadoop的hadoop-env.sh中,要不然会报找不到hbase相关的类的错

上一篇下一篇

猜你喜欢

热点阅读