玩转大数据程序员成长园大数据,机器学习,人工智能

HBase使用

2019-06-24  本文已影响6人  若与

一、HBase介绍

1、基本概念

HBase是一种Hadoop数据库,经常被描述为一种稀疏的,分布式的,持久化的,多维有序映射,它基于行键、列键和时间戳建立索引,是一个可以随机访问的存储和检索数据的平台。HBase不限制存储的数据的种类,允许动态的、灵活的数据模型,不用SQL语言,也不强调数据之间的关系。HBase被设计成在一个服务器集群上运行,可以相应地横向扩展。

2、HBase使用场景和成功案例

上面简单介绍一下hbase, 至于hbase的原理,以及架构, 后面我整理完, 再发出来。 现在只是对hbase会使用。 就先从使用开始入门。

二、 HBase使用

hbase是数据库, 数据库那就是存储数据的, 那就离不开curd.
类似mysql, 有shell客户端以及语言的sdk方式。

2.1 HBASE shell

hbase shell 类似mysql的客户端

help可以查看所有的命名帮助

下面是命令分组:

COMMAND GROUPS:
  Group name: general
  Commands: processlist, status, table_help, version, whoami

  Group name: ddl
  Commands: alter, alter_async, alter_status, create, create_layered, describe, disable, disable_all, drop, drop_all, enable, enable_all, exists, get_table, is_disabled, is_enabled, list, list_regions, locate_region, show_filters

  Group name: namespace
  Commands: alter_namespace, create_namespace, describe_namespace, drop_namespace, list_namespace, list_namespace_tables

  Group name: dml
  Commands: append, count, delete, deleteall, get, get_counter, get_splits, incr, put, scan, truncate, truncate_preserve

  Group name: tools
  Commands: assign, balance_switch, balancer, balancer_enabled, catalogjanitor_enabled, catalogjanitor_run, catalogjanitor_switch, cleaner_chore_enabled, cleaner_chore_run, cleaner_chore_switch, clear_block_cache, clear_compaction_queues, clear_deadservers, close_region, compact, compact_rs, compaction_state, flush, is_in_maintenance_mode, list_deadservers, major_compact, merge_region, move, normalize, normalizer_enabled, normalizer_switch, split, splitormerge_enabled, splitormerge_switch, trace, unassign, wal_roll, zk_dump

  Group name: replication
  Commands: add_peer, append_peer_namespaces, append_peer_tableCFs, disable_peer, disable_table_replication, enable_peer, enable_table_replication, get_peer_config, list_peer_configs, list_peers, list_replicated_tables, remove_peer, remove_peer_namespaces, remove_peer_tableCFs, set_peer_bandwidth, set_peer_exclude_namespaces, set_peer_exclude_tableCFs, set_peer_namespaces, set_peer_replicate_all, set_peer_tableCFs, show_peer_tableCFs, update_peer_config

  Group name: snapshots
  Commands: clone_snapshot, delete_all_snapshot, delete_snapshot, delete_table_snapshots, list_snapshots, list_table_snapshots, restore_snapshot, snapshot

  Group name: configuration
  Commands: update_all_config, update_config

  Group name: quotas
  Commands: list_quota_snapshots, list_quota_table_sizes, list_quotas, list_snapshot_sizes, set_quota

  Group name: security
  Commands: grant, list_security_capabilities, revoke, user_permission

  Group name: procedures
  Commands: abort_procedure, list_locks, list_procedures

  Group name: visibility labels
  Commands: add_labels, clear_auths, get_auths, list_labels, set_auths, set_visibility

  Group name: rsgroup
  Commands: add_rsgroup, balance_rsgroup, get_rsgroup, get_server_rsgroup, get_table_rsgroup, list_rsgroups, move_namespaces_rsgroup, move_servers_namespaces_rsgroup, move_servers_rsgroup, move_servers_tables_rsgroup, move_tables_rsgroup, remove_rsgroup, remove_servers_rsgroup

1. 常规命名:

  1. 集群状态 status
hbase(main):005:0> status
1 active master, 0 backup masters, 1 servers, 0 dead, 793.0000 average load
Took 0.9453 seconds
  1. 版本 version
hbase(main):006:0> version
2.0.2, rc6f16dff66b5d7c4fb66d3bf7eda4f56515c63f3, Fri Jan 25 19:23:41 CST 2019
Took 0.0004 seconds

2. DDL命令

命令 命令含义 命令使用示例
alter 修改表的列族的描述属性 aliter 't1',NAME => 'f1',VERSIONS => 5
alter_async 异步修改表的列族的描述属性,并不需要等待所有Region都完成操作。用法和alter命令相同 alter_async 't1',NAME => 'f1',VERSIONS => 5
alter_status 获取alter命令的状态,会标注已经有多少region更改了Schema。 命令的参数是表名 alter_status 't1'
create 创建表 create 't1' ,{NAME => 'f1', VERSIONS => 5}; create 't1','f1','f2', 'f3'
describe 获取表的元数据信息和是否可用的的状态 describe 't1'
disable 禁用某个表 disable 't1'
disable_all 禁用所有正则匹配的表 disable_all 't1.*'
drop 删除表 drop 't1'
enable 启用表 enable 't1'
enable_all 启用正则匹配的表 enable_all 't1.*'
exists 判断表是否存在 exists 't1'
is_disable 判断表是否是禁用的 is_disable 't1'
is_enbale 判断表是否是启用的 is_disable 't1'
show_filter 查看所支持的所有过滤器的名称 show_filters
list 列出所有表的名称 list

DML

  1. count
    统计表的总行数
count 't1'
count 't1', INTERVAL => 1000
count 't1', CACHE => 1000,
count 't1', INTERVAL => 10, CACHE => 1000

  1. delete
    删除一个单元格
delete 't1', 'r1', 'c1', ts1
  1. deleteall
    删除一行或一列
deleteall 't1','r1'
deleteall 't1','r1','c1'
deleteall 't1', 'r1','c1', ts1
  1. get
    单行读
  hbase> get 'ns1:t1', 'r1'
  hbase> get 't1', 'r1'
  hbase> get 't1', 'r1', {TIMERANGE => [ts1, ts2]}
  hbase> get 't1', 'r1', {COLUMN => 'c1'}
  hbase> get 't1', 'r1', {COLUMN => ['c1', 'c2', 'c3']}
  hbase> get 't1', 'r1', {COLUMN => 'c1', TIMESTAMP => ts1}
  hbase> get 't1', 'r1', {COLUMN => 'c1', TIMERANGE => [ts1, ts2], VERSIONS => 4}
  hbase> get 't1', 'r1', {COLUMN => 'c1', TIMESTAMP => ts1, VERSIONS => 4}
  hbase> get 't1', 'r1', {FILTER => "ValueFilter(=, 'binary:abc')"}
  hbase> get 't1', 'r1', 'c1'
  hbase> get 't1', 'r1', 'c1', 'c2'
  hbase> get 't1', 'r1', ['c1', 'c2']
  hbase> get 't1', 'r1', {COLUMN => 'c1', ATTRIBUTES => {'mykey'=>'myvalue'}}
  hbase> get 't1', 'r1', {COLUMN => 'c1', AUTHORIZATIONS => ['PRIVATE','SECRET']}
  hbase> get 't1', 'r1', {CONSISTENCY => 'TIMELINE'}
  hbase> get 't1', 'r1', {CONSISTENCY => 'TIMELINE', REGION_REPLICA_ID => 1}
  1. get_counter
    读取计数器
  hbase> get_counter 'ns1:t1', 'r1', 'c1'
  hbase> get_counter 't1', 'r1', 'c1'
  1. incr
    自增写入
  hbase> incr 'ns1:t1', 'r1', 'c1'
  hbase> incr 't1', 'r1', 'c1'
  hbase> incr 't1', 'r1', 'c1', 1
  hbase> incr 't1', 'r1', 'c1', 10
  hbase> incr 't1', 'r1', 'c1', 10, {ATTRIBUTES=>{'mykey'=>'myvalue'}}
  hbase> incr 't1', 'r1', 'c1', {ATTRIBUTES=>{'mykey'=>'myvalue'}}
  hbase> incr 't1', 'r1', 'c1', 10, {VISIBILITY=>'PRIVATE|SECRET'}

  1. put
    数据写入
  hbase> put 'ns1:t1', 'r1', 'c1', 'value'
  hbase> put 't1', 'r1', 'c1', 'value'
  hbase> put 't1', 'r1', 'c1', 'value', ts1
  hbase> put 't1', 'r1', 'c1', 'value', {ATTRIBUTES=>{'mykey'=>'myvalue'}}
  hbase> put 't1', 'r1', 'c1', 'value', ts1, {ATTRIBUTES=>{'mykey'=>'myvalue'}}
  hbase> put 't1', 'r1', 'c1', 'value', ts1, {VISIBILITY=>'PRIVATE|SECRET'}
  1. scan
    扫描表
  hbase> scan 'hbase:meta'
  // 显示指定列
  hbase> scan 'hbase:meta', {COLUMNS => 'info:regioninfo'}
  
  // limit start
  hbase> scan 'ns1:t1', {COLUMNS => ['c1', 'c2'], LIMIT => 10, STARTROW => 'xyz'}
  hbase> scan 't1', {COLUMNS => ['c1', 'c2'], LIMIT => 10, STARTROW => 'xyz'}
  
 //  时间范围
  hbase> scan 't1', {COLUMNS => 'c1', TIMERANGE => [1303668804000, 1303668904000]}
  hbase> scan 't1', {REVERSED => true}
  hbase> scan 't1', {ALL_METRICS => true}
  hbase> scan 't1', {METRICS => ['RPC_RETRIES', 'ROWS_FILTERED']}
  
  // 使用过滤器, show_filters查看所有可以使用的过滤器
  hbase> scan 't1', {ROWPREFIXFILTER => 'row2', FILTER => "
    (QualifierFilter (>=, 'binary:xyz')) AND (TimestampsFilter ( 123, 456))"}
  hbase> scan 't1', {FILTER =>
    org.apache.hadoop.hbase.filter.ColumnPaginationFilter.new(1, 0)}
  hbase> scan 't1', {CONSISTENCY => 'TIMELINE'}
For setting the Operation Attributes
  hbase> scan 't1', { COLUMNS => ['c1', 'c2'], ATTRIBUTES => {'mykey' => 'myvalue'}}
  hbase> scan 't1', { COLUMNS => ['c1', 'c2'], AUTHORIZATIONS => ['PRIVATE','SECRET']}
  1. truncate
    清空表
truncate 't1'

还有其他命令, 就不多介绍了, 自己使用 help查看了

2.2 go操作 hbase

介绍一下go操作hbase

Install

go get github.com/tsuna/gohbase

Create a client

client := gohbase.NewClient("localhost")

Insert a cell

// Values maps a ColumnFamily -> Qualifiers -> Values.
values := map[string]map[string][]byte{"cf": map[string][]byte{"a": []byte{0}}}
putRequest, err := hrpc.NewPutStr(context.Background(), "table", "key", values)
rsp, err := client.Put(putRequest)

Get an entire row

getRequest, err := hrpc.NewGetStr(context.Background(), "table", "row")
getRsp, err := client.Get(getRequest)

Get a specific cell

// Perform a get for the cell with key "15", column family "cf" and qualifier "a"
family := map[string][]string{"cf": []string{"a"}}
getRequest, err := hrpc.NewGetStr(context.Background(), "table", "15",
    hrpc.Families(family))
getRsp, err := client.Get(getRequest)

Get a specific cell with a filter

pFilter := filter.NewKeyOnlyFilter(true)
family := map[string][]string{"cf": []string{"a"}}
getRequest, err := hrpc.NewGetStr(context.Background(), "table", "15",
    hrpc.Families(family), hrpc.Filters(pFilter))
getRsp, err := client.Get(getRequest)

Scan with a filter

pFilter := filter.NewPrefixFilter([]byte("7"))
scanRequest, err := hrpc.NewScanStr(context.Background(), "table",
        hrpc.Filters(pFilter))
scanRsp, err := client.Scan(scanRequest)

我们看一下代码架构

├── AUTHORS
├── COPYING
├── Makefile
├── README.md
├── admin_client.go
├── caches.go
├── check_line_len.awk
├── client.go
├── discovery_test.go
├── filter
├── hrpc
├── install_ci.sh
├── integration_test.go
├── metacache_test.go
├── pb
├── region
├── rpc.go
├── rpc_test.go
├── scanner.go
├── scanner_test.go
├── table_test.go
├── test
└── zk

上面代码的整理的很有条理,
hrpc主要是rpc调用的方法
filter是get或scan的filter过滤器
region是 region的一些接口
cache是缓存,hbase中为了提高性能,很多地方都采用cache方式。
zk就是zookeeper相关的。

我们下面阅读以下源码
gohbase操作的入口主要是 clientadmin_client

我们围绕 clientadmin_client

// AdminClient to perform admistrative operations with HMaster
type AdminClient interface {
    CreateTable(t *hrpc.CreateTable) error
    DeleteTable(t *hrpc.DeleteTable) error
    EnableTable(t *hrpc.EnableTable) error
    DisableTable(t *hrpc.DisableTable) error
    ClusterStatus() (*pb.ClusterStatus, error)
}



// CreateTable represents a CreateTable HBase call
type CreateTable struct {
    base

    families  map[string]map[string]string
    splitKeys [][]byte
}

// NewCreateTable creates a new CreateTable request that will create the given
// table in HBase. 'families' is a map of column family name to its attributes.
// For use by the admin client.
func NewCreateTable(ctx context.Context, table []byte,
    families map[string]map[string]string,
    options ...func(*CreateTable)) *CreateTable {
    ct := &CreateTable{
        base: base{
            table:    table,
            ctx:      ctx,
            resultch: make(chan RPCResult, 1),
        },
        families: make(map[string]map[string]string, len(families)),
    }
    for _, option := range options {
        option(ct)
    }
    for family, attrs := range families {
        ct.families[family] = make(map[string]string, len(defaultAttributes))
        for k, dv := range defaultAttributes {
            if v, ok := attrs[k]; ok {
                ct.families[family][k] = v
            } else {
                ct.families[family][k] = dv
            }
        }
    }
    return ct
}

主要是DDL

再看 client

// Client a regular HBase client
type Client interface {
    Scan(s *hrpc.Scan) hrpc.Scanner
    Get(g *hrpc.Get) (*hrpc.Result, error)
    Put(p *hrpc.Mutate) (*hrpc.Result, error)
    Delete(d *hrpc.Mutate) (*hrpc.Result, error)
    Append(a *hrpc.Mutate) (*hrpc.Result, error)
    Increment(i *hrpc.Mutate) (int64, error)
    CheckAndPut(p *hrpc.Mutate, family string, qualifier string,
        expectedValue []byte) (bool, error)
    Close()
}

主要是DML相关的。
我们看一下put

// NewPut creates a new Mutation request to insert the given
// family-column-values in the given row key of the given table.
func NewPut(ctx context.Context, table, key []byte,
    values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
    m, err := baseMutate(ctx, table, key, values, options...)
    if err != nil {
        return nil, err
    }
    m.mutationType = pb.MutationProto_PUT
    return m, nil
}

// NewPutStr is just like NewPut but takes table and key as strings.
func NewPutStr(ctx context.Context, table, key string,
    values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
    return NewPut(ctx, []byte(table), []byte(key), values, options...)
}


其中

// baseMutate returns a Mutate struct without the mutationType filled in.
func baseMutate(ctx context.Context, table, key []byte, values map[string]map[string][]byte,
    options ...func(Call) error) (*Mutate, error) {
    m := &Mutate{
        base: base{
            table:    table,
            key:      key,
            ctx:      ctx,
            resultch: make(chan RPCResult, 1),
        },
        values:    values,
        timestamp: MaxTimestamp,
    }
    err := applyOptions(m, options...)
    if err != nil {
        return nil, err
    }
    return m, nil
}


// 注意
func applyOptions(call Call, options ...func(Call) error) error {
    call.(withOptions).setOptions(options)
    for _, option := range options {
        err := option(call)
        if err != nil {
            return err
        }
    }
    return nil
}

其中option的使用如下:


    client := gohbase.NewClient("localhost")

    pFilter := filter.NewKeyOnlyFilter(true)
    family := map[string][]string{"cf": []string{"a"}}
    getRequest, _ := hrpc.NewGetStr(context.Background(), "table", "15",
        hrpc.Families(family), hrpc.Filters(pFilter), hrpc.MaxVersions(2))
    _, _ := client.Get(getRequest)



    values := map[string]map[string][]byte{"cf": map[string][]byte{"a": []byte{0}}}
    putRequest, err := hrpc.NewPutStr(context.Background(), "table", "key", values, hrpc.Timestamp(time.Time{}), hrpc.MaxVersions(1))
    rsp, err := client.Put(putRequest)
}
上一篇下一篇

猜你喜欢

热点阅读