HBase 入门

2019-09-26  本文已影响0人  djm猿

1 HBase 概述

1.1 什么是 HBase?

HBase 是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用 HBase 技术可在廉价 PC Server 上搭建起大规模结构化存储集群。

HBase 的目标是存储并处理大型的数据,更具体来说是仅需使用普通的硬件配置,就能够处理由成千上万的行和列所组成的大型数据。

1.2 HBase 的特点

1、海量存储

HBase适合存储 PB 级别的海量数据,在 PB 级别的数据以及采用廉价 PC 存储的情况下,能在几十到百毫秒内返回数据,这与HBase的易扩展性息息相关,正式因为HBase良好的扩展性,才为海量数据的存储提供了便利。

2、列式存储

这里的列式存储其实说的是列族(ColumnFamily)存储,HBase是根据列族来存储数据的,列族下面可以有非常多的列,列族在创建表的时候就必须指定。

3、易扩展

HBase 的扩展性主要体现在两个方面,一个是基于上层处理能力(RegionServer)的扩展,一个是基于存储的扩展(HDFS)。
通过横向添加 RegionServer 的机器,进行水平扩展,提升 HBase 上层的处理能力,提升 HBase 服务更多 Region 的能力。

4、高并发

由于目前大部分使用 HBase 的架构,都是采用的廉价 PC ,因此单个 IO 的延迟其实并不小,一般在几十到上百 ms 之间,这里说的高并发,主要是在并发的情况下,HBase 的单个 IO 延迟下降并不多。能获得高并发、低延迟的服务。

5、稀疏

稀疏主要是针对 HBase 列的灵活性,在列族中,你可以指定任意多的列,在列数据为空的情况下,是不会占用存储空间的。

1.3 架构

image

1、Client

Client 包含了访问HBase的接口,另外 Client 还维护了对应的 cache 来加速 HBase 的访问

2、ZooKeeper

HBase通过 ZooKeeper 来做 Master 的高可用、RegionServer 的监控、元数据的入口以及集群配置的维护等工作,具体工作如下:

3、 HMaster

该节点的主要职责如下:

4、 HRegionServer

HRegionServer 直接对接用户的读写请求,是真正的干活的节点。它的功能概括如下:

5、 HDFS

HDFS 为 HBase 提供最终的底层数据存储服务,同时为 HBase 提供高可用( Hlog 存储在 HDFS )的支持,具体功能概括如下:

1.4 HBase中的角色

1、 HMaster

2、RegionServer

3、其他组件

2 HBase安装部署

1、解压

[djm@hadoop102 software]$ tar -zxvf hbase-1.3.1-bin.tar.gz -C /opt/module

3、将目录修改为 hbase

[djm@hadoop102 software]$ sudo mv /opt/module/hbase-1.3.1 /opt/module/hbase

2、修改 hbase-env.sh

[djm@hadoop102 conf]$ vim hbase-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
export HBASE_MANAGES_ZK=false
# Configure PermSize. Only needed in JDK7. You can safely remove it for JDK8+
# export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m"
# export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS -XX:PermSize=128m -XX:MaxPermSize=128m"

3、修改 hbase-site.xml

[djm@hadoop102 conf]$ vim hbase-site.xml
<configuration>
    <property>     
        <name>hbase.rootdir</name>     
        <value>hdfs://hadoop102:9000/hbase</value>   
    </property>

    <property>   
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>

   <!-- 0.98后的新变动,之前版本没有.port,默认端口为60000 -->
    <property>
        <name>hbase.master.port</name>
        <value>16000</value>
    </property>

    <property>   
        <name>hbase.zookeeper.quorum</name>
         <value>hadoop102:2181,hadoop103:2181,hadoop104:2181</value>
    </property>

    <property>   
        <name>hbase.zookeeper.property.dataDir</name>
         <value>/opt/module/zookeeper-3.4.10/zkData</value>
    </property>
</configuration>

4、修改 regionservers

[djm@hadoop102 conf]$ vim regionservers
hadoop102
hadoop103
hadoop104

5、软连接 hadoop 配置文件到 hbase

[djm@hadoop102 hadoop]$ ln -s core-site.xml /opt/module/hbase/conf/core-site.xml
[djm@hadoop102 hadoop]$ ln -s hdfs-site.xml /opt/module/hbase/conf/hdfs-site.xml

6、分发

[djm@hadoop102 hadoop]$ xsync /opt/module/hbase

7、启动和停止集群

[djm@hadoop102 hbase]$ bin/start-hbase.sh
[djm@hadoop102 hbase]$ bin/stop-hbase.sh

8、Web 页面

http://hadoop102:16010 

3 HBase Shell

进入HBase客户端命令行

[djm@hadoop102 hbase]$ bin/hbase shell

查看帮助命令

hbase(main):001:0> help

查看当前数据有有哪些表

hbase(main):001:0> list

创建表

hbase(main):002:0> create 'student','info'

插入数据到表

hbase(main):003:0> put 'student','1001','info:sex','male'
hbase(main):004:0> put 'student','1001','info:age','18'
hbase(main):005:0> put 'student','1002','info:name','Janna'
hbase(main):006:0> put 'student','1002','info:sex','female'
hbase(main):007:0> put 'student','1002','info:age','20'

扫描查看表数据

hbase(main):008:0> scan 'student'
hbase(main):009:0> scan 'student',{STARTROW => '1001', STOPROW  => '1001'}
hbase(main):010:0> scan 'student',{STARTROW => '1001'}

查看表结构

hbase(main):011:0> describe 'student'

更新指定字段的数据

hbase(main):012:0> put 'student','1001','info:name','Nick'
hbase(main):013:0> put 'student','1001','info:age','100'

查看指定行或指定列族:列的数据

hbase(main):014:0> get 'student','1001'
hbase(main):015:0> get 'student','1001','info:name'

统计表数据行数

hbase(main):021:0> count 'student'

删除数据

hbase(main):016:0> deleteall 'student','1001'
hbase(main):017:0> delete 'student','1002','info:sex'

清空表数据

hbase(main):018:0> truncate 'student'

删除表

首先需要先让该表为disable状态:
hbase(main):019:0> disable 'student'
然后才能drop这个表:
hbase(main):020:0> drop 'student'
提示:如果直接drop表,会报错:ERROR: Table student is enabled. Disable it first.

变更表信息

hbase(main):022:0> alter 'student',{NAME=>'info',VERSIONS=>3}
hbase(main):022:0> get 'student','1001',{COLUMN=>'info:name',VERSIONS=>3}

4 HBase数据结构

4.1 RowKey

RowKey 是用来检索记录的主键,访问 table 中的行,只有三种方式:

RowKey 可以是任意字符串(最长不能超过 64KB),在 HBase 中,RowKey 被保存为字节数组,存储时,数据按照 RowKey 的字典序(byte order)排序存储。

4.2 Column Family

列族,表中的每个列,都归属于某个列族,列族是表的 Schema 的一部分(而列不是),必须在使用表之前定义,列名都以列族作为前缀。

4.3 Cell

由 {rowkey, column Family:columu,version} 唯一确定的单元,Cell 中的数据是没有类型的,全部是字节码形式存储。

4.4 Time Starmp

HBase 中通过 RowKey 和 Columns 确定的为一个存贮单元称为 Cell,每个 Cell 都保存着同一份数据的多个版本,版本通过时间戳来索引,时间戳的类型是 64 位整型,时间戳可以由 HBase(在数据写入时自动)赋值,此时时间戳是精确到毫秒的当前系统时间,时间戳也可以由客户显式赋值,如果应用程序要避免数据版本冲突,就必须自己生成具有唯一性的时间戳,每个 Cell 中,不同版本的数据按照时间倒序排序,即最新的数据排在最前面。

为了避免数据存在过多版本造成的的管理 (包括存贮和索引)负担,HBase 提供了两种数据版本回收方式,一是保存数据的最后 n个版本,二是保存最近一段时间内的版本(比如最近七天),用户可以针对每个列族进行设置。

4.5 NameSpace

image

1、Table:表,所有的表都是命名空间的成员,即表必属于某个命名空间,如果没有指定,则在 default 中

2、RegionServer Group:一个命名空间包含了默认的 RegionServer Group。

3、Permission:权限,命名空间能够让我们来定义访问控制列表 ACL(Access Control List)

4、Quota:限额,可以强制一个命名空间可包含的 Region 的数量

5 HBase 原理

5.1 读

image

1、Client 先访问 ZooKeeper ,从 meta 表读取 Region 的位置,然后读取 meta 表中的数据,meta 中又存储了用户表的Region 信息

2、根据 NameSpace、表名和 RowKey 在 meta 表中找到对应的 Region 信息

3、找到这个 Region 对应的 RegionServer

4、查找对应的 Region

5、先从 MemStore 找数据,如果没有,再到 BlockCache 里面读

6、BlockCache 还没有,再到 StoreFile 上读

7、如果是从 StoreFile 里面读取的数据,不是直接返回给客户端,而是先写入 BlockCache,再返回给客户端

5.2 写

image

1、Client 向 HregionServer 发送写请求

2、HregionServer 将数据写到 HLog(write ahead log)

3、HregionServer 将数据写到内存(MemStore)

4、反馈写成功

5.3 Flush

1、当 MemStore 数据达到阈值(默认是128M,老版本是 64M),将数据刷到硬盘,将内存中的数据删除,同时删除HLog 中的历史数据

2、并将数据存储到 HDFS 中

3、在 HLog 中做标记点

5.4 合并

1、当数据块达到 3 块,HMaster 触发合并操作,Region 将数据块加载到本地,进行合并

2、当合并的数据超过 256M,进行拆分,将拆分后的 Region 分配给不同的 HregionServer 管理

3、当 HregionServer 宕机后,将 HregionServer 上的 HLog 拆分,然后分配给不同的 HregionServer 加载,修改 meta

注意:HLog 会同步到 HDFS

6 Java 操作 HBase

引入依赖

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.3.1</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.3.1</version>
</dependency>

6.1 HBaseApi

HBaseUtil

package com.djm.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class HBaseUtil {

    private static ThreadLocal<Connection> connHolder = new ThreadLocal<Connection>();

    private HBaseUtil() {

    }

    public static Connection makeHBaseConnection() throws IOException {
        Connection conn = connHolder.get();
        if (conn == null) {
            Configuration conf = HBaseConfiguration.create();
            conn = ConnectionFactory.createConnection(conf);
            connHolder.set(conn);
        }
        return conn;
    }

    /**
     * 生成分区键
     * @param regionCount
     * @return
     */
    public static byte[][] genRegionKeys(int regionCount) {
        byte[][] bs = new byte[regionCount - 1][];
        for (int i = 0; i < regionCount - 1; i++) {
            bs[i] = Bytes.toBytes(i + "|");
        }
        return bs;
    }

    /**
     * 生成分区号
     *
     * @param rowKey
     * @param regionCount
     * @return
     */
    public static String genRegionNum(String rowKey, int regionCount) {
        int regionNum;
        int hash = rowKey.hashCode();
        if (regionCount > 0 && (regionCount & (regionCount - 1)) == 0) {
            regionNum = hash & (regionCount - 1);
        } else {
            regionNum = hash % (regionCount - 1);
        }
        return regionNum + "_" + rowKey;
    }

    public static void close() throws IOException {
        Connection conn = connHolder.get();
        if (conn != null) {
            conn.close();
            connHolder.remove();
        }
    }
}

HBaseApi

package com.djm.hbase;

import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class HBaseApi {

    private static Connection conn = null;

    private static Admin admin = null;

    static {
        try {
            conn = HBaseUtil.makeHBaseConnection();
            admin = conn.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static boolean isTableExist(String tableName) throws IOException {
        return admin.tableExists(TableName.valueOf(tableName));
    }

    public static void creteTable(String tableName, String... cloumnFamily) throws IOException {
        if (!isTableExist(tableName)) {
            HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
            descriptor.addCoprocessor("com.djm.hbase.InsertStudentCoprocesser");
            for (String family : cloumnFamily) {
                descriptor.addFamily(new HColumnDescriptor(family));
            }
            admin.createTable(descriptor);
        } else {
            System.out.println("-> can't create");
        }
    }

    public static void dropTable(String tableName) throws IOException {
        if (isTableExist(tableName)) {
            TableName table = TableName.valueOf(tableName);
            admin.disableTable(table);
            admin.deleteTable(table);
        } else {
            System.out.println("-> can't remove");
        }
    }

    public static void insertRowData(String tableName, String rowKey, String family, String column, String value) throws IOException {
        HTable hTable = new HTable(TableName.valueOf(tableName), conn);
        Put put = new Put(Bytes.toBytes(rowKey));
        put.add(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
        hTable.put(put);
        hTable.close();
    }

    public static void deleteMultiRow(String tableName, String... rows) throws IOException {
        HTable hTable = new HTable(TableName.valueOf(tableName), conn);
        List<Delete> deleteList = new ArrayList<Delete>();
        for (String row : rows) {
            deleteList.add(new Delete(Bytes.toBytes(row)));
        }
        hTable.delete(deleteList);
        hTable.close();
    }

    public static void getRow(String tableName, String rowKey) throws IOException {
        HTable table = new HTable(TableName.valueOf(tableName), conn);
        Get get = new Get(Bytes.toBytes(rowKey));
        Result result = table.get(get);
        for (Cell cell : result.rawCells()) {
            System.out.print("行键:" + Bytes.toString(CellUtil.cloneRow(cell)) + "\t");
            System.out.print("列族" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
            System.out.print("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
            System.out.print("值:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\t");
            System.out.print("时间戳:" + cell.getTimestamp());
        }
    }

    public static void getAllRows(String tableName) throws IOException {
        HTable hTable = new HTable(TableName.valueOf(tableName), conn);
        Scan scan = new Scan();
        ResultScanner resultScanner = hTable.getScanner(scan);
        for (Result result : resultScanner) {
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                System.out.print("行键:" + Bytes.toString(CellUtil.cloneRow(cell)) + "\t");
                System.out.print("列族" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
                System.out.print("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
                System.out.print("值:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\n");
            }
        }
    }

    public static void searchData(String tableName, String family) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        BinaryComparator binaryComparator = new BinaryComparator(Bytes.toBytes("1001"));
        RegexStringComparator regexStringComparator = new RegexStringComparator("^\\d{4}$");
        Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, binaryComparator);
        // MUST_PASS_ALL and
        // MUST_PASS_ONE or
        FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        scan.setFilter(filter);
        ResultScanner scanner = table.getScanner(scan);
        for (Result result : scanner) {
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                System.out.print("行键:" + Bytes.toString(CellUtil.cloneRow(cell)) + "\t");
                System.out.print("列族" + Bytes.toString(CellUtil.cloneFamily(cell)) + "\t");
                System.out.print("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "\t");
                System.out.print("值:" + Bytes.toString(CellUtil.cloneValue(cell)) + "\n");
            }
        }
    }
}

6.2 MapReduce

首先配置 HBase、Hadooop 的环境变量

然后配置 hadoop-env.sh,添加如下信息:

export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib/*

6.2.1 将 HDFS 中的数据写入到 HBase 表中

ReadFileMapper

package com.djm.mr1.mapper;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ReadFileMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

    @Override
    protected void map(LongWritable key, Text line, Context context) throws IOException, InterruptedException {
        String rowKey = "";
        String[] values = line.toString().split(",");
        rowKey = values[0];
        byte[] bs = Bytes.toBytes(rowKey);
        Put put = new Put(bs);
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(values[1]));
        context.write(new ImmutableBytesWritable(bs), put);
    }
}

InsertDataReduce

package com.djm.mr1.reduce;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

public class InsertDataReduce extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {

    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put put : values) {
            context.write(NullWritable.get(), put);
        }
    }
}

File2HBaseTool

package com.djm.mr1.tool;

import com.djm.mr1.mapper.ReadFileMapper;
import com.djm.mr1.reduce.InsertDataReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;

public class File2HBaseTool implements Tool {

    public int run(String[] args) throws Exception {
        Job job = Job.getInstance();
        job.setJarByClass(File2HBaseTool.class);
        // format
        Path path = new Path("hdfs://hadoop102:9000/data/student.csv");
        FileInputFormat.addInputPath(job, path);
        // mapper
        job.setMapperClass(ReadFileMapper.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);
        // reduce
        TableMapReduceUtil.initTableReducerJob("user", InsertDataReduce.class, job);
        // 执行
        boolean wait = job.waitForCompletion(true);
        return wait ? JobStatus.State.SUCCEEDED.getValue() : JobStatus.State.FAILED.getValue();
    }

    public void setConf(Configuration conf) {

    }

    public Configuration getConf() {
        return null;
    }
}

File2HBaseApplication

package com.djm.mr1;

import com.djm.mr1.tool.File2HBaseTool;
import org.apache.hadoop.util.ToolRunner;

public class File2HBaseApplication {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new File2HBaseTool(), args);
    }
}

6.2.1 将 HBase 表中的数据写入到 MySQL 表中

CacheData

package com.djm.hbase.bean;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class CacheData implements WritableComparable<CacheData> {

    private String name;

    private int count;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    public int compareTo(CacheData o) {
        return name.compareTo(o.name);
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(name);
        out.writeInt(count);
    }

    public void readFields(DataInput in) throws IOException {
        name = in.readUTF();
        count = in.readInt();
    }
}

ScanHBaseMapper

package com.djm.hbase.mapper;

import com.djm.hbase.bean.CacheData;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class ScanHBaseMapper extends TableMapper<Text, CacheData> {

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        for (Cell cell : value.rawCells()) {
            String name = Bytes.toString(CellUtil.cloneValue(cell));
            CacheData data = new CacheData();
            data.setName(name);
            data.setCount(1);
            context.write(new Text(name), data);
        }
    }
}

HBase2MysqlReducer

package com.djm.hbase.reducer;

import com.djm.hbase.bean.CacheData;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class HBase2MysqlReducer extends Reducer<Text, CacheData, Text, CacheData> {

    @Override
    protected void reduce(Text key, Iterable<CacheData> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (CacheData data : values) {
            sum = sum + data.getCount();
        }
        CacheData sumData = new CacheData();
        sumData.setName(key.toString());
        sumData.setCount(sum);
        context.write(key, sumData);
    }
}

MysqlOutputFormat

package com.djm.hbase.output;

import com.djm.hbase.bean.CacheData;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class MysqlOutputFormat extends OutputFormat<Text, CacheData> {

    class MysqlRecordWriter extends RecordWriter<Text, CacheData> {

        private static final String MYSQL_DRIVE_CLASS = "com.mysql.jdbc.Driver";
        private static final String MYSQL_URL = "jdbc:mysql://hadoop102:3306/company?useUnicode=true&characterEncoding=UTF-8";
        private static final String MYSQL_USERNAME = "root";
        private static final String MYSQL_PASSWORD = "123456";

        private Connection connection;

        public MysqlRecordWriter() {
            try {
                Class.forName(MYSQL_DRIVE_CLASS);
                connection = DriverManager.getConnection(MYSQL_URL, MYSQL_USERNAME, MYSQL_PASSWORD);
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            } catch (SQLException e) {
                e.printStackTrace();
            }

        }

        public void write(Text key, CacheData data) throws IOException, InterruptedException {
            String sql = "insert into statresult (name, sumcnt) values(?, ?)";
            PreparedStatement preparedStatement = null;
            try {
                preparedStatement = connection.prepareStatement(sql);
                preparedStatement.setObject(1, key.toString());
                preparedStatement.setObject(2, data.getCount());
                preparedStatement.executeUpdate();
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                if (preparedStatement != null) {
                    try {
                        preparedStatement.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public RecordWriter<Text, CacheData> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        return new MysqlRecordWriter();
    }

    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {

    }

    private FileOutputCommitter committer = null;

    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
        if (committer == null) {
            Path output = getOutputPath(context);
            committer = new FileOutputCommitter(output, context);
        }
        return committer;
    }

    public static Path getOutputPath(JobContext job) {
        String name = job.getConfiguration().get(FileOutputFormat.OUTDIR);
        return name == null ? null : new Path(name);
    }
}

HBaseMysqlTool

package com.djm.hbase.tool;

import com.djm.hbase.output.MysqlOutputFormat;
import com.djm.hbase.bean.CacheData;
import com.djm.hbase.mapper.ScanHBaseMapper;
import com.djm.hbase.reducer.HBase2MysqlReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.util.Tool;

public class HBaseMysqlTool implements Tool {

    public int run(String[] args) throws Exception {
        Job job = Job.getInstance();
        job.setJarByClass(HBaseMysqlTool.class);
        TableMapReduceUtil.initTableMapperJob("student", new Scan(), ScanHBaseMapper.class, Text.class, CacheData.class, job);
        job.setReducerClass(HBase2MysqlReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(CacheData.class);
        job.setOutputFormatClass(MysqlOutputFormat.class);
        boolean wait = job.waitForCompletion(true);
        return wait ? JobStatus.State.SUCCEEDED.getValue() : JobStatus.State.FAILED.getValue();
    }

    public void setConf(Configuration conf) {

    }

    public Configuration getConf() {
        return null;
    }
}

HBase2MysqlApplication

package com.djm.hbase;

import com.djm.hbase.tool.HBaseMysqlTool;
import org.apache.hadoop.util.ToolRunner;

public class HBase2MysqlApplication {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new HBaseMysqlTool(), args);
    }
}

6.2.3 协处理器

package com.djm.hbase;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;

import java.io.IOException;

/**
 * 协处理器
 */
public class InsertStudentCoprocesser extends BaseRegionObserver {

    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        Table table = e.getEnvironment().getTable(TableName.valueOf("student"));
        table.put(put);
        table.close();
    }
}

然后将表删除,重新创建表,在创建表时添加上协处理器即可。

6.3 集成 Hive

6.3.1 HBase & Hive

Hive

HBase

6.3.2 HBase 与 Hive 集成使用

1、重新编译 hive-hbase-handler-1.2.2.jar

2、拷贝 jar 包

export HBASE_HOME=/opt/module/hbase
export HIVE_HOME=/opt/module/hive
ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar  $HIVE_HOME/lib/hbase-common-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbase-server-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar
ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar
ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar $HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar

编辑 hive-site.xml,增加如下信息:

<property>
  <name>hive.zookeeper.quorum</name>
  <value>hadoop102,hadoop103,hadoop104</value>
  <description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
<property>
  <name>hive.zookeeper.client.port</name>
  <value>2181</value>
  <description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>

建立 Hive 表,关联 HBase 表,插入数据到 Hive 表的同时能够影响 HBase 表

1、在 Hive 中创建表同时关联 HBase

CREATE TABLE hive_hbase_emp_table(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");

2、在 Hive 中创建临时中间表,用于 load 文件中的数据

CREATE TABLE emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
row format delimited fields terminated by '\t';

3、向 Hive 中间表中 load 数据

hive> load data local inpath '/home/admin/softwares/data/emp.txt' into tab

4、通过 insert 命令将中间表中的数据导入到 Hive 关联 HBase 的那张表中

hive> select * from hive_hbase_emp_table;

在 HBase 中已经存储了某一张表 hbase_emp_table,然后在 Hive 中创建一个外部表来关联 HBase 中的hbase_emp_table 这张表,使之可以借助 Hive 来分析 HBase 这张表中的数据

1、在 Hive 中创建外部表

CREATE EXTERNAL TABLE relevance_hbase_emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY 
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = 
":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno") 
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
上一篇下一篇

猜你喜欢

热点阅读