javaapi 访问 hbase
Hbase介绍
HBASE是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBASE技术可在廉价PC Server上搭建起大规模结构化存储集群。
HBASE的目标是存储并处理大型的数据,更具体来说是仅需使用普通的硬件配置,就能够处理由成千上万的行和列所组成的大型数据。
HBASE是Google Bigtable的开源实现,但是也有很多不同之处。比如:Google Bigtable利用GFS作为其文件存储系统,HBASE利用Hadoop HDFS作为其文件存储系统;Google运行MAPREDUCE来处理Bigtable中的海量数据,HBASE同样利用Hadoop MapReduce来处理HBASE中的海量数据;Google Bigtable利用Chubby作为协同服务,HBASE利用Zookeeper作为对应。
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>0.99.2</version>
</dependency>
1、创建表
public class HBaseTest {
Configuration config = null;
private Connection connection = null;
private Table table = null;
@Before
public void init() throws Exception {
config = HBaseConfiguration.create();// 配置
config.set("hbase.zookeeper.quorum", "192.168.25.127,192.168.25.129,192.168.25.130");// zookeeper地址
config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
connection = ConnectionFactory.createConnection(config);
table = connection.getTable(TableName.valueOf("user1"));
}
/**
* 创建表
* @throws Exception
*/
@Test
public void testCreateTable() throws Exception{
//创建表管理类
HBaseAdmin admin = new HBaseAdmin(config);
//创建表描述类
TableName tableName = TableName.valueOf("user2");
HTableDescriptor descriptor = new HTableDescriptor(tableName);
//创建列族描述类
HColumnDescriptor info1 = new HColumnDescriptor("info1");
//列族加入表中
descriptor.addFamily(info1);
HColumnDescriptor info2 = new HColumnDescriptor("info2");
descriptor.addFamily(info2);
//创建表
admin.createTable(descriptor);
}
}
如果执行之后发现程序卡住不动,或者过了很久之后出现下面的异常
org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=35, exceptions:
...
Caused by: org.apache.hadoop.hbase.MasterNotRunningException: com.google.protobuf.ServiceException: java.net.UnknownHostException: unknown host: mini1
...
Caused by: com.google.protobuf.ServiceException: java.net.UnknownHostException: unknown host: mini1
...
首先确保关闭了hadoop的安全模式,然后linux下的ip地址跟主机名必须对应,最后windows下的ip地址跟主机名也要对应,我这在linux下/etc/hosts文件中
[root@mini1 ~]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 localhost.jinbm
192.168.25.127 mini1
192.168.25.129 mini2
192.168.25.130 mini3
但是当时出现windows下的hosts文件没有配置后三个映射导致出现上面的异常。
执行之后去hbase集群查看
user2表已经创建
hbase(main):002:0> list
TABLE
user1
user2
2 row(s) in 0.0130 seconds
=> ["user1", "user2"]
2、删除表
删除表跟shell命令一样,也是要先disable表之后才能删除
@Test
public void testDeleteTable() throws Exception{
HBaseAdmin admin = new HBaseAdmin(config);
admin.disableTable("user2");
admin.deleteTable("user2");
}
3、单条插入(修改)
/**
* 向表中插入数据
* 单条插入(包括修改)
* @throws Exception
*/
@Test
public void testPut() throws Exception{
//rowkey
Put put = new Put(Bytes.toBytes("1234"));
//列族,列,值
put.add(Bytes.toBytes("info1"), Bytes.toBytes("gender"), Bytes.toBytes("1"));
put.add(Bytes.toBytes("info2"), Bytes.toBytes("name"), Bytes.toBytes("wangwu"));
table.put(put);
//提交
table.flushCommits();
}
查看
hbase(main):008:0> scan 'user1'
ROW COLUMN+CELL
1234 column=info2:age, timestamp=1509315527064, value=18
1234 column=info2:name, timestamp=1509315500250, value=zhangsan
12345 column=info2:age, timestamp=1509315533683, value=18
12345 column=info2:name, timestamp=1509315548481, value=lisi
2 row(s) in 0.1030 seconds
hbase(main):009:0> scan 'user1'
ROW COLUMN+CELL
1234 column=info1:gender, timestamp=1509315890353, value=1
1234 column=info2:age, timestamp=1509315527064, value=18
1234 column=info2:name, timestamp=1509315890353, value=wangwu
12345 column=info2:age, timestamp=1509315533683, value=18
12345 column=info2:name, timestamp=1509315548481, value=lisi
2 row(s) in 0.0440 seconds
发现添加了一条数据和修改了一条数据
4、批量插入数据
Table有2个重载的方法,一个是table.put(Put put)也就是单条插入,一个是table.put(list)list泛型是Put,这就是批量插入。
/**
* 向表中插入数据
* 多条插入,使用list
* @throws Exception
*/
@Test
public void testPut2() throws Exception{
//可以通过将自动刷新设置为false来激活缓冲区
table.setAutoFlushTo(false);
//设置数据将被写入的缓冲区大小
table.setWriteBufferSize(534534534);
List<Put> putList = new ArrayList<>();
for (int i=20;i<=30;i++){
//rowkey
Put put = new Put(Bytes.toBytes("jbm_"+i));
//列族,列,值
put.add(Bytes.toBytes("info1"), Bytes.toBytes("age"), Bytes.toBytes(i));
put.add(Bytes.toBytes("info1"), Bytes.toBytes("name"), Bytes.toBytes("lucy"+i));
putList.add(put);
}
table.put(putList);
//提交
table.flushCommits();
}
执行之后查看
hbase(main):010:0> scan 'user1'
ROW COLUMN+CELL
1234 column=info1:gender, timestamp=1509315890353, value=1
1234 column=info2:age, timestamp=1509315527064, value=18
1234 column=info2:name, timestamp=1509315890353, value=wangwu
12345 column=info2:age, timestamp=1509315533683, value=18
12345 column=info2:name, timestamp=1509315548481, value=lisi
jbm_20 column=info1:age, timestamp=1509316527223, value=\x00\x00\x00\x14
jbm_20 column=info1:name, timestamp=1509316527223, value=lucy20
jbm_21 column=info1:age, timestamp=1509316527223, value=\x00\x00\x00\x15
jbm_21 column=info1:name, timestamp=1509316527223, value=lucy21
jbm_22 column=info1:age, timestamp=1509316527223, value=\x00\x00\x00\x16
jbm_22 column=info1:name, timestamp=1509316527223, value=lucy22
jbm_23 column=info1:age, timestamp=1509316527223, value=\x00\x00\x00\x17
jbm_23 column=info1:name, timestamp=1509316527223, value=lucy23
jbm_24 column=info1:age, timestamp=1509316527223, value=\x00\x00\x00\x18
jbm_24 column=info1:name, timestamp=1509316527223, value=lucy24
jbm_25 column=info1:age, timestamp=1509316527223, value=\x00\x00\x00\x19
jbm_25 column=info1:name, timestamp=1509316527223, value=lucy25
jbm_26 column=info1:age, timestamp=1509316527223, value=\x00\x00\x00\x1A
jbm_26 column=info1:name, timestamp=1509316527223, value=lucy26
jbm_27 column=info1:age, timestamp=1509316527223, value=\x00\x00\x00\x1B
jbm_27 column=info1:name, timestamp=1509316527223, value=lucy27
jbm_28 column=info1:age, timestamp=1509316527223, value=\x00\x00\x00\x1C
jbm_28 column=info1:name, timestamp=1509316527223, value=lucy28
jbm_29 column=info1:age, timestamp=1509316527223, value=\x00\x00\x00\x1D
jbm_29 column=info1:name, timestamp=1509316527223, value=lucy29
jbm_30 column=info1:age, timestamp=1509316527223, value=\x00\x00\x00\x1E
jbm_30 column=info1:name, timestamp=1509316527223, value=lucy30
5、修改数据
/**
* 修改数据
* @throws Exception
*/
@Test
public void testUpdate() throws Exception{
Put put = new Put(Bytes.toBytes("1234"));
put.add(Bytes.toBytes("info2"), Bytes.toBytes("name"), Bytes.toBytes("tom"));
table.put(put);
table.flushCommits();
}
执行之后查看
hbase(main):010:0> scan 'user1'
ROW COLUMN+CELL
1234 column=info1:gender, timestamp=1509315890353, value=1
1234 column=info2:age, timestamp=1509315527064, value=18
1234 column=info2:name, timestamp=1509315890353, value=wangwu
...
hbase(main):013:0> scan 'user1'
ROW COLUMN+CELL
1234 column=info1:gender, timestamp=1509315890353, value=1
1234 column=info2:age, timestamp=1509315527064, value=18
1234 column=info2:name, timestamp=1509316978243, value=tom
发现wangwu已被改为了tom
6、删除整行数据
删除rowkey为1234整行数据
/**
* 删除数据
* @throws Exception
*/
@Test
public void testDeleteData() throws Exception{
Delete delete = new Delete(Bytes.toBytes("1234"));
table.delete(delete);
table.flushCommits();
}
7、单条查询
/**
* 单条查询
* @throws Exception
*/
@Test
public void testGetSingle() throws Exception{
//rowkey
Get get = new Get(Bytes.toBytes("12345"));
Result result = table.get(get);
//列族,列名
byte[] name = result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name"));
byte[] age = result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"));
System.out.println(Bytes.toString(name));
System.out.println(Bytes.toString(age));
}
执行后hbase查看和控制台查看
hbase(main):001:0> scan 'user1'
12345 column=info2:age, timestamp=1509315533683, value=18
12345 column=info2:name, timestamp=1509315548481, value=lisi
...
控制台输出
lisi
18
8、多条查询
这里叫做扫描更适合吧,先用全表扫描,和命令行的scan ‘表名’一样
/**
* 多条查询
* 全表扫描
* @throws Exception
*/
@Test
public void testGetMany() throws Exception{
Scan scan = new Scan();
//字典序 类似于分页
scan.setStartRow(Bytes.toBytes("jbm_20"));
scan.setStopRow(Bytes.toBytes("jbm_30"));
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
//Single row result of a Get or Scan query. Result
//Result 一次获取一个rowkey对应的记录
//列族,列名
byte[] name = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"));
byte[] age = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("age"));
System.out.print(Bytes.toString(name)+",");
System.out.print(Bytes.toInt(age));
System.out.println();
}
}
执行控制台输出结果
lucy20,20
lucy21,21
lucy22,22
lucy23,23
lucy24,24
lucy25,25
lucy26,26
lucy27,27
lucy28,28
lucy29,29
9、Hbase过滤器
1)、列值过滤器
SingleColumnValueFilter
过滤列值的相等、不等、范围等
/**
* 全表扫描过滤器
* 列值过滤器
* @throws Exception
*/
@Test
public void testFilter() throws Exception{
Scan scan = new Scan();
//列值过滤器
SingleColumnValueFilter columnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("info1"),
Bytes.toBytes("name"), CompareOp.EQUAL, Bytes.toBytes("lisi"));
//设置过滤器
scan.setFilter(columnValueFilter);
//获取结果集
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
byte[] name = result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("name"));
byte[] age = result.getValue(Bytes.toBytes("info2"), Bytes.toBytes("age"));
System.out.print(Bytes.toString(name)+",");
System.out.print(Bytes.toString(age));
System.out.println();
}
}
执行查看输出
hbase(main):001:0> scan 'user1'
ROW COLUMN+CELL
12345 column=info2:age, timestamp=1509315533683, value=18
12345 column=info2:name, timestamp=1509315548481, value=lisi
jbm_20 column=info1:age, timestamp=1509316527223, value=\x00\x00\x00\x14
...
控制台输出
lisi,18
2)、rowkey过滤器
RowFilter 通过正则,过滤rowKey值。
/**
* 全表扫描过滤器
* rowkey过滤
* @throws Exception
*/
@Test
public void testRowkeyFilter() throws Exception{
Scan scan = new Scan();
//rowkey过滤器
//匹配以jbm开头的
RowFilter filter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("^jbm"));
//设置过滤器
scan.setFilter(filter);
//获取结果集
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
byte[] name = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"));
byte[] age = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("age"));
System.out.print(Bytes.toString(name)+",");
System.out.print(Bytes.toInt(age));
System.out.println();
}
}
控制台输出
lucy20,20
lucy21,21
lucy22,22
lucy23,23
lucy24,24
lucy25,25
lucy26,26
lucy27,27
lucy28,28
lucy29,29
lucy30,30
3)、列名前缀过滤器
ColumnPrefixFilter列名前缀过滤
/**
* 全表扫描过滤器
* 列名前缀过滤
* @throws Exception
*/
@Test
public void testColumnPrefixFilter() throws Exception{
Scan scan = new Scan();
//列名前缀过滤器 列名前缀为na(注:不是指值的前缀)
ColumnPrefixFilter filter = new ColumnPrefixFilter(Bytes.toBytes("na"));
//设置过滤器
scan.setFilter(filter);
//获取结果集
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
byte[] name = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"));
byte[] age = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("age"));
if(name!=null){
System.out.print(Bytes.toString(name)+" ");
}
if(age!=null){
System.out.print(age);
}
System.out.println();
}
}
从输出结果就能看到,只会拿到name列,age列是拿不到的
lucy20
lucy21
lucy22
lucy23
lucy24
lucy25
lucy26
lucy27
lucy28
lucy29
lucy30
4)、过滤器集合
/**
* 全表扫描过滤器
* 过滤器集合
* @throws Exception
*/
@Test
public void testFilterList() throws Exception{
Scan scan = new Scan();
//过滤器集合:MUST_PASS_ALL(and),MUST_PASS_ONE(or)
FilterList filterList = new FilterList(Operator.MUST_PASS_ALL);
//ROWKEY过滤器
RowFilter rowFilter = new RowFilter(CompareOp.EQUAL, new RegexStringComparator("^jbm"));
//列值过滤器 age大于25
SingleColumnValueFilter columnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("info1"),
Bytes.toBytes("age"), CompareOp.GREATER, Bytes.toBytes(25));
filterList.addFilter(columnValueFilter);
filterList.addFilter(rowFilter);
//设置过滤器
scan.setFilter(filterList);
//获取结果集
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
byte[] name = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("name"));
byte[] age = result.getValue(Bytes.toBytes("info1"), Bytes.toBytes("age"));
if(name!=null){
System.out.print(Bytes.toString(name)+" ");
}
if(age!=null){
System.out.print(Bytes.toInt(age)+" ");
}
System.out.println();
}
}
输出
lucy26 26
lucy27 27
lucy28 28
lucy29 29
lucy30 30