APIhbase

Hbase——java API

2023-03-20  本文已影响0人  小波同学

HBase API

1 环境准备

新建项目后在 pom.xml 中添加依赖:

注意:会报错 javax.el 包不存在,是一个测试用的依赖,不影响使用。

<dependencies>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>2.4.11</version>
        <exclusions>
        <exclusion>
        <groupId>org.glassfish</groupId>
        <artifactId>javax.el</artifactId>
        </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.glassfish</groupId>
        <artifactId>javax.el</artifactId>
        <version>3.0.1-b06</version>
    </dependency>
</dependencies>

2 创建连接

根据官方 API 介绍,HBase 的客户端连接由 ConnectionFactory 类来创建,用户使用完成 之后需要手动关闭连接。同时连接是一个重量级的,推荐一个进程使用一个连接,对 HBase 的命令通过连接中的两个属性 Admin 和 Table 来实现。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;

/**
 * @Author: huangyibo
 * @Date: 2023/3/21 14:24
 * @Description: Hbase操作工具
 */

public class HBaseConnection {

    public static Configuration config;//管理HBase的配置信息
    public static Connection connection;//管理HBase的连接
    public static Admin admin;//管理HBase数据库的连接

    /**
     * 创建相关连接
     *
     * @throws IOException 可能出现的异常
     */
    public static void init() throws IOException {
        config = HBaseConfiguration.create();
        //配置Zookeeper的ip地址
        config.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");

        connection = ConnectionFactory.createConnection(config);
        admin = connection.getAdmin();
    }


    /**
     * 关闭所有连接
     *
     * @throws IOException 可能出现的异常
     */
    public static void close() throws IOException {
        if (admin != null){
            admin.close();
        }
        if (connection != null){
            connection.close();
        }
    }
}

3 添加配置文件

在 resources 文件夹中创建配置文件 hbase-site.xml,添加以下内容

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-->
<configuration>
    <!-- HBase数据在HDFS中的存放的路径 -->
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://node1.yibo.cn:8020/hbase</value>
    </property>
    <!-- Hbase的运行模式。false是单机模式,true是分布式模式。若为false,Hbase和Zookeeper会运行在同一个JVM里面 -->
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <!-- ZooKeeper的地址 -->
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>hadoop102,hadoop103,hadoop104</value>
    </property>
    <!-- ZooKeeper快照的存储位置 -->
    <property>
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/export/server/apache-zookeeper-3.6.0-bin/data</value>
    </property>
    <!--  V2.1版本,在分布式情况下, 设置为false -->
    <property>
        <name>hbase.unsafe.stream.capability.enforce</name>
        <value>false</value>
    </property>
        <property>
            <name>phoenix.schema.isNamespaceMappingEnabled</name>
            <value>true</value>
        </property>

</configuration>

在 resources 文件夹中创建配置文件 core-site.xml,添加以下内容

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->

<!-- Put site-specific property overrides in this file. -->

<configuration>
<!-- 用于设置Hadoop的文件系统,由URI指定 -->
 <property>
        <name>fs.defaultFS</name>
        <value>hdfs://node1.yibo.cn:8020</value>
 </property>
<!-- 配置Hadoop存储数据目录,默认/tmp/hadoop-${user.name} -->
 <property>
       <name>hadoop.tmp.dir</name>
       <value>/export/server/hadoop-2.7.5/data/tempDatas</value>
</property>

<!--  缓冲区大小,实际工作中根据服务器性能动态调整 -->
 <property>
       <name>io.file.buffer.size</name>
       <value>4096</value>
 </property>

<!--  开启hdfs的垃圾桶机制,删除掉的数据可以从垃圾桶中回收,单位分钟 -->
 <property>
       <name>fs.trash.interval</name>
       <value>10080</value>
 </property>
<property>
    <name>hadoop.proxyuser.root.hosts</name>
        <value>*</value>
    </property>
    <property>
        <name>hadoop.proxyuser.root.groups</name>
        <value>*</value>
    </property>
</configuration>

如果代码设置了hbase.zookeeper.quorum,并且可以与自己的集群正常交互,那么可以不加hbase-site.xml等配置文件;

那如果不能正常交互,还是将集群的hbase-site.xml、core-site.xml、mapred-site.xml添加到resources下吧。

4 DDL

创建 HBaseDDL 类,添加静态方法即可作为工具类

public class HBaseDDLUtil {

    // 添加静态属性 connection 指向单例连接
    public static Connection connection = HBaseConnection.connection;
    
}

4.1 创建命名空间

/**
 * 创建命名空间
 * @param namespace 命名空间名称
 */
public static void createNamespace(String namespace) throws IOException {
    //1.获取admin
    //admin 连接是轻量级的,不是线程安全的  不推荐池化,或者缓存这个连接
    Admin admin = connection.getAdmin();

    //2.调用方法,创建命名空间
   /* 代码相对shel1更加底层 所以shel1能够实现的功能代码一定能实现
    11所以需要填写完整的命名空间描述*/

    //2.1 创建命名空间描述建造者  => 设计师
    NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(namespace);
    //2.2 给命名空间添加需求
    builder.addConfiguration("user","mazankang");

    //2.3 使用builder构造出对应的NamespaceDescriptor添加完参数的对象
    //完成创建
    // 创建命名空间出现的问题  都属于本方法自身的问题  不应该抛出异常

    try {
        admin.createNamespace(builder.build());
    } catch (IOException e) {
        System.out.println("命名空间已经存在");
        e.printStackTrace();
    }

    //3 关闭admin
    admin.close();
}

4.2 判断表格是否存在

/**
 *判断表格是否存在
 * @param namespace 命名空间名称
 * @param tableName 表格名称
 * @return true表示存在
 */
public static boolean isTableExists(String namespace,String tableName) throws IOException {
    //1.获取admin
    Admin admin = connection.getAdmin();

    //2.使用对象的方法

    boolean exist = false;
    try {
        exist = admin.tableExists(TableName.valueOf(namespace, tableName));
    } catch (IOException e) {
        e.printStackTrace();
    }

    admin.close();
    return exist;
}

4.3 创建表

/**
 * 创建表格
 * @param namespace 命名空间名称
 * @param tableName  表格名称
 * @param columnFamilys 列族名称     可以有多个
 */
public static void createTable(String namespace,String tableName,String... columnFamilys) throws IOException {
    //判断至少有一个列族
    if (columnFamilys.length ==0 ){
        System.out.println("创建表格需要至少一个列族");
        return;
    }
    //判断表格是否存在
    if (isTableExists(namespace,tableName)){
        System.out.println("表格已经存在");
        return;
    }

    //获取admin
    Admin admin = connection.getAdmin();

    //2.调用方法创建表格
    //2.1创建表格描述的建造者
    TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(namespace, tableName));

    //2.2添加参数
    for (String columnFamily : columnFamilys) {
        //2.3创建列族描述的建造者
        ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));
        //2.4对应当前列族添加版本
        //添加版本参数
        columnFamilyDescriptorBuilder.setMaxVersions(5);//版本
        // ------>>>>>>> 在这里可以加创造表属性所需要的所以方法
        //2.5创建添加完参数的列族描述
        tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
    }

    //2.6创建对应的表格描述
    try {
        admin.createTable(tableDescriptorBuilder.build());
    } catch (IOException e) {
        e.printStackTrace();
    }

    admin.close();
}

4.4 修改表

/**
 * 修改表格中一个列族的版本
 * @param namespace 命名空间名称
 * @param tableName 表格名称
 * @param columnFamily 列族名称
 * @param version 版本号
 */
public static void modifyTable(String namespace,String tableName,String columnFamily,int version) throws IOException {
    //判断表格是否存在
    if (!isTableExists(namespace,tableName)){
        System.out.println("表格不存在");
        return;
    }

    //1.获取admin
    Admin admin = connection.getAdmin();

    //2.调用方法修改表格
    //2.0 获取之前的表格描述
    TableDescriptor descriptor = admin.getDescriptor(TableName.valueOf(namespace, tableName));
    // 需要填写旧的列族描述
    ColumnFamilyDescriptor columnFamily1 = descriptor.getColumnFamily(Bytes.toBytes(columnFamily));


    //如果使用填写tableName的方法相当于创建了一个新的表格描述建造者没有之前的信息
    //1如果想要修改之前的信息必须调用方法填写一个旧的表格描述
    //2.1 创建一个表格描述建造者
    //--------------------------
    TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(descriptor);
    //2.2 对应建造者进行表格数据的修改
    //创建列族描述建造者
    ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
            ColumnFamilyDescriptorBuilder.newBuilder(columnFamily1);

    //修改对应的版本
    columnFamilyDescriptorBuilder.setMaxVersions(version);

    //此处修改的时候 如果是新创建的 别的参数会初始化
    tableDescriptorBuilder.modifyColumnFamily(columnFamilyDescriptorBuilder.build());


    try {
        admin.modifyTable(tableDescriptorBuilder.build());
    } catch (IOException e) {
        throw new RuntimeException(e);
    }

    //关闭
    admin.close();
}

4.5 删除表

/**
 * 删除表格
 * @param namespace 命名空间名称
 * @param tableName 表格名称
 * @return true 表示删除成功
 */
public static boolean deleteTable(String namespace,String tableName) throws IOException {
    //1判断表格是否存在
    if (!isTableExists(namespace,tableName)) {
        System.out.println("表格不存在");
        return false;
    }
    //2.获取admin
    Admin admin = connection.getAdmin();
    // 3.调用 相关的方法删除表格

    try {
        //HBase删除表格之前 一定要标记表格为不可用disable
        TableName tableName1 = TableName.valueOf(namespace, tableName);
        admin.disableTable(tableName1);
        admin.deleteTable(tableName1);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
    //关闭admin
    admin.close();

    return true;
}

5 DML

创建类 HBaseDMLUtil

public class HBaseDMLUtil {

    //添加静态属性connection指向单例属性
    public static Connection connection = HBaseConnection.connection;
    
}

5.1 插入数据

/**
 * 插入数据
 * @param namespace 命名空间名称
 * @param tableName 表名称
 * @param rowKye 主键,
 * @param columnFamily 列族
 * @param columnName 列名
 * @param value 值
 */
public static void putCell(String namespace,String tableName,String rowKye,String columnFamily,String columnName,String value) throws IOException {
    //获取table
    Table table = connection.getTable(TableName.valueOf(namespace, tableName));

    //2,调用相关的方法往里面插入数据
    //创建put对象
    Put put = new Put(Bytes.toBytes(rowKye));
    //3,添加属性
    put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(value));
    //4.添加对象将对象写入相关的方法
    try {
        table.put(put);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }

    table.close();
}

5.2 读取数据

/**
 * 读取数据 读取对应的一行中的某一列
 * @param namespace 命名空间名称
 * @param tableName 表名称
 * @param rowKye 主键
 * @param columnFamily 列族
 * @param columnName 列名
 */
public static void getCells(String namespace,String tableName,String rowKye,String columnFamily,String columnName) throws IOException {
    //获取table
    Table table = connection.getTable(TableName.valueOf(namespace, tableName));

    //2.创建get的对象
    Get get = new Get(Bytes.toBytes(rowKye));
    //如果现在调用get方法读取数据,此时读一整行数据
    get.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName));

    //设置读取数据的版本
    get.readAllVersions();

    //读取数据得到result对象
    Result result = null;
    try {
        result = table.get(get);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
    //处理数据
    ///1测试方法:直接把读取的数据打印到空制台
    //||如果是实际开发需要再额外写方法对应处理数据
    Cell[] cells = result.rawCells();
    for (Cell cell : cells) {
        //ce11存储数据比较底层
        String value = new String(CellUtil.cloneValue(cell));
        System.out.println(value);
    }

    //关闭
    table.close();
}

5.3 扫描数据

/**
 * 扫描数据
 * @param namespace 命名空间名称
 * @param tableName 表格名称
 * @param startRow 开始的row
 * @param stopRow 结束的Row 左闭右开
 */
public static void scanRows(String namespace,String tableName,String startRow,String stopRow) throws IOException {
    //获取table
    Table table = connection.getTable(TableName.valueOf(namespace, tableName));

    //2.创建scan 对象
    Scan scan = new Scan();
    //如果此时直接调用,会直接扫描整张表

    //添加参数 来控制扫描的数据
    scan.withStartRow(Bytes.toBytes(startRow));
    scan.withStopRow(Bytes.toBytes(stopRow));

    //读取多行数据 获得scanner
    ResultScanner scanner = null;
    try {
        scanner = table.getScanner(scan);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }

    //ResultScanner来记录多行 result的数组
    for (Result result : scanner) {
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            System.out.print(new String(CellUtil.cloneRow(cell))+"---"+
                    new String(CellUtil.cloneFamily(cell))+"---"+
                    new String(CellUtil.cloneQualifier(cell))+"---"+
                    new String(CellUtil.cloneValue(cell))+"\t");
        }
        System.out.println();
    }

    table.close();
}

5.4 带过滤扫描

/**
 * 带过滤的扫描
 * @param namespace 命名空间名称
 * @param tableName 表名称
 * @param startRow 开始row
 * @param stopRow 结束row
 * @param columnFamily 列族
 * @param columnName 列名
 * @param value 值
 * @throws IOException 异常
 */
public static void filterScan(String namespace,String tableName,String startRow,String stopRow
        ,String columnFamily,String columnName,String value) throws IOException {
    //获取table
    Table table = connection.getTable(TableName.valueOf(namespace, tableName));

    //2.创建scan 对象
    Scan scan = new Scan();
    //如果此时直接调用,会直接扫描整张表

    //添加参数 来控制扫描的数据
    scan.withStartRow(Bytes.toBytes(startRow));
    scan.withStopRow(Bytes.toBytes(stopRow));

    //------>>>>>>
    //可以添加多个过滤
    FilterList filterList = new FilterList();
    //创建过滤器
    //(1)结果值保留当前列的数据
    ColumnValueFilter columnValueFilter = new ColumnValueFilter(
            //列族名称
            Bytes.toBytes(columnFamily),
            //列名
            Bytes.toBytes(columnName),
            // 比较关系
            CompareOperator.EQUAL,
            // 值
            Bytes.toBytes(value)
    );
    //(2)结果保留整行数据
    SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
            //列族名称
            Bytes.toBytes(columnFamily),
            //列名
            Bytes.toBytes(columnName),
            // 比较关系
            CompareOperator.EQUAL,
            // 值
            Bytes.toBytes(value)
    );


//        filterList.addFilter(columnValueFilter);
    filterList.addFilter(singleColumnValueFilter);
    //添加过滤
    scan.setFilter(filterList);
    //读取多行数据 获得scanner
    ResultScanner scanner = null;
    try {
        scanner = table.getScanner(scan);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }

    //ResultScanner来记录多行 result的数组
    for (Result result : scanner) {
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            System.out.print(new String(CellUtil.cloneRow(cell))+"---"+
                    new String(CellUtil.cloneFamily(cell))+"---"+
                    new String(CellUtil.cloneQualifier(cell))+"---"+
                    new String(CellUtil.cloneValue(cell))+"\t");
        }
        System.out.println();
    }

    table.close();
}

5.5 删除数据

/**
 * 删除一行中的一列数据
 * @param namespace 命名空间名称
 * @param tableName 表格名字
 * @param rowKey 主键
 * @param columnFamily 列族
 * @param columnName 列名
 */
public static void deleteColumn(String namespace,String tableName,String rowKey,String columnFamily,String columnName) throws IOException {
    //获取table
    Table table = connection.getTable(TableName.valueOf(namespace, tableName));

    //创建delete对象
    Delete delete = new Delete(Bytes.toBytes(rowKey));

    //添加列信息
    //addColumn删除一个版本的数据
    //addColumns删除多个版本的数据
    delete.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName));
    delete.addColumns(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName));

    try {
        table.delete(delete);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }

    //关闭
    table.close();
}

6 完全代码

HBaseConnection

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import java.io.IOException;

/**
 * @Author: huangyibo
 * @Date: 2023/3/21 14:24
 * @Description: Hbase操作工具
 */

public class HBaseConnection {

    public static Configuration config;//管理HBase的配置信息
    public static Connection connection;//管理HBase的连接
    public static Admin admin;//管理HBase数据库的连接

    /**
     * 创建相关连接
     *
     * @throws IOException 可能出现的异常
     */
    public static void init() throws IOException {
        config = HBaseConfiguration.create();
        //配置Zookeeper的ip地址
        config.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");

        connection = ConnectionFactory.createConnection(config);
        admin = connection.getAdmin();
    }


    /**
     * 关闭所有连接
     *
     * @throws IOException 可能出现的异常
     */
    public static void close() throws IOException {
        if (admin != null){
            admin.close();
        }
        if (connection != null){
            connection.close();
        }
    }
}

HBaseDDLUtil

import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;

/**
 * @Author: huangyibo
 * @Date: 2023/3/21 14:37
 * @Description: Hbase DDL操作工具
 */

public class HBaseDDLUtil {

    // 添加静态属性 connection 指向单例连接
    public static Connection connection = HBaseConnection.connection;

    /**
     * 创建命名空间
     * @param namespace 命名空间名称
     */
    public static void createNamespace(String namespace) throws IOException {
        //1.获取admin
        //admin 连接是轻量级的,不是线程安全的  不推荐池化,或者缓存这个连接
        Admin admin = connection.getAdmin();

        //2.调用方法,创建命名空间
       /* 代码相对shel1更加底层 所以shel1能够实现的功能代码一定能实现
        11所以需要填写完整的命名空间描述*/

        //2.1 创建命名空间描述建造者  => 设计师
        NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(namespace);
        //2.2 给命名空间添加需求
        builder.addConfiguration("user","mazankang");

        //2.3 使用builder构造出对应的NamespaceDescriptor添加完参数的对象
        //完成创建
        // 创建命名空间出现的问题  都属于本方法自身的问题  不应该抛出异常

        try {
            admin.createNamespace(builder.build());
        } catch (IOException e) {
            System.out.println("命名空间已经存在");
            e.printStackTrace();
        }

        //3 关闭admin
        admin.close();
    }

    /**
     *判断表格是否存在
     * @param namespace 命名空间名称
     * @param tableName 表格名称
     * @return true表示存在
     */
    public static boolean isTableExists(String namespace,String tableName) throws IOException {
        //1.获取admin
        Admin admin = connection.getAdmin();

        //2.使用对象的方法

        boolean exist = false;
        try {
            exist = admin.tableExists(TableName.valueOf(namespace, tableName));
        } catch (IOException e) {
            e.printStackTrace();
        }

        admin.close();
        return exist;
    }

    /**
     * 创建表格
     * @param namespace 命名空间名称
     * @param tableName  表格名称
     * @param columnFamilys 列族名称     可以有多个
     */
    public static void createTable(String namespace,String tableName,String... columnFamilys) throws IOException {
        //判断至少有一个列族
        if (columnFamilys.length ==0 ){
            System.out.println("创建表格需要至少一个列族");
            return;
        }
        //判断表格是否存在
        if (isTableExists(namespace,tableName)){
            System.out.println("表格已经存在");
            return;
        }

        //获取admin
        Admin admin = connection.getAdmin();

        //2.调用方法创建表格
        //2.1创建表格描述的建造者
        TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(namespace, tableName));

        //2.2添加参数
        for (String columnFamily : columnFamilys) {
            //2.3创建列族描述的建造者
            ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamily));
            //2.4对应当前列族添加版本
            //添加版本参数
            columnFamilyDescriptorBuilder.setMaxVersions(5);//版本
            // ------>>>>>>> 在这里可以加创造表属性所需要的所以方法
            //2.5创建添加完参数的列族描述
            tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptorBuilder.build());
        }

        //2.6创建对应的表格描述
        try {
            admin.createTable(tableDescriptorBuilder.build());
        } catch (IOException e) {
            e.printStackTrace();
        }

        admin.close();
    }

    /**
     * 修改表格中一个列族的版本
     * @param namespace 命名空间名称
     * @param tableName 表格名称
     * @param columnFamily 列族名称
     * @param version 版本号
     */
    public static void modifyTable(String namespace,String tableName,String columnFamily,int version) throws IOException {
        //判断表格是否存在
        if (!isTableExists(namespace,tableName)){
            System.out.println("表格不存在");
            return;
        }

        //1.获取admin
        Admin admin = connection.getAdmin();

        //2.调用方法修改表格
        //2.0 获取之前的表格描述
        TableDescriptor descriptor = admin.getDescriptor(TableName.valueOf(namespace, tableName));
        // 需要填写旧的列族描述
        ColumnFamilyDescriptor columnFamily1 = descriptor.getColumnFamily(Bytes.toBytes(columnFamily));


        //如果使用填写tableName的方法相当于创建了一个新的表格描述建造者没有之前的信息
        //1如果想要修改之前的信息必须调用方法填写一个旧的表格描述
        //2.1 创建一个表格描述建造者
        //--------------------------
        TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(descriptor);
        //2.2 对应建造者进行表格数据的修改
        //创建列族描述建造者
        ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder =
                ColumnFamilyDescriptorBuilder.newBuilder(columnFamily1);

        //修改对应的版本
        columnFamilyDescriptorBuilder.setMaxVersions(version);

        //此处修改的时候 如果是新创建的 别的参数会初始化
        tableDescriptorBuilder.modifyColumnFamily(columnFamilyDescriptorBuilder.build());


        try {
            admin.modifyTable(tableDescriptorBuilder.build());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        //关闭
        admin.close();
    }

    /**
     * 删除表格
     * @param namespace 命名空间名称
     * @param tableName 表格名称
     * @return true 表示删除成功
     */
    public static boolean deleteTable(String namespace,String tableName) throws IOException {
        //1判断表格是否存在
        if (!isTableExists(namespace,tableName)) {
            System.out.println("表格不存在");
            return false;
        }
        //2.获取admin
        Admin admin = connection.getAdmin();
        // 3.调用 相关的方法删除表格

        try {
            //HBase删除表格之前 一定要标记表格为不可用disable
            TableName tableName1 = TableName.valueOf(namespace, tableName);
            admin.disableTable(tableName1);
            admin.deleteTable(tableName1);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        //关闭admin
        admin.close();

        return true;
    }
}

HBaseDMLUtil

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.ColumnValueFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;

/**
 * @Author: huangyibo
 * @Date: 2023/3/21 14:58
 * @Description: Hbase DML操作工具
 */

public class HBaseDMLUtil {

    //添加静态属性connection指向单例属性
    public static Connection connection = HBaseConnection.connection;

    /**
     * 插入数据
     * @param namespace 命名空间名称
     * @param tableName 表名称
     * @param rowKye 主键,
     * @param columnFamily 列族
     * @param columnName 列名
     * @param value 值
     */
    public static void putCell(String namespace,String tableName,String rowKye,String columnFamily,String columnName,String value) throws IOException {
        //获取table
        Table table = connection.getTable(TableName.valueOf(namespace, tableName));

        //2,调用相关的方法往里面插入数据
        //创建put对象
        Put put = new Put(Bytes.toBytes(rowKye));
        //3,添加属性
        put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName),Bytes.toBytes(value));
        //4.添加对象将对象写入相关的方法
        try {
            table.put(put);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        table.close();
    }

    /**
     * 读取数据 读取对应的一行中的某一列
     * @param namespace 命名空间名称
     * @param tableName 表名称
     * @param rowKye 主键
     * @param columnFamily 列族
     * @param columnName 列名
     */
    public static void getCells(String namespace,String tableName,String rowKye,String columnFamily,String columnName) throws IOException {
        //获取table
        Table table = connection.getTable(TableName.valueOf(namespace, tableName));

        //2.创建get的对象
        Get get = new Get(Bytes.toBytes(rowKye));
        //如果现在调用get方法读取数据,此时读一整行数据
        get.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName));

        //设置读取数据的版本
        get.readAllVersions();

        //读取数据得到result对象
        Result result = null;
        try {
            result = table.get(get);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        //处理数据
        ///1测试方法:直接把读取的数据打印到空制台
        //||如果是实际开发需要再额外写方法对应处理数据
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            //ce11存储数据比较底层
            String value = new String(CellUtil.cloneValue(cell));
            System.out.println(value);
        }

        //关闭
        table.close();
    }

    /**
     * 扫描数据
     * @param namespace 命名空间名称
     * @param tableName 表格名称
     * @param startRow 开始的row
     * @param stopRow 结束的Row 左闭右开
     */
    public static void scanRows(String namespace,String tableName,String startRow,String stopRow) throws IOException {
        //获取table
        Table table = connection.getTable(TableName.valueOf(namespace, tableName));

        //2.创建scan 对象
        Scan scan = new Scan();
        //如果此时直接调用,会直接扫描整张表

        //添加参数 来控制扫描的数据
        scan.withStartRow(Bytes.toBytes(startRow));
        scan.withStopRow(Bytes.toBytes(stopRow));

        //读取多行数据 获得scanner
        ResultScanner scanner = null;
        try {
            scanner = table.getScanner(scan);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        //ResultScanner来记录多行 result的数组
        for (Result result : scanner) {
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                System.out.print(new String(CellUtil.cloneRow(cell))+"---"+
                        new String(CellUtil.cloneFamily(cell))+"---"+
                        new String(CellUtil.cloneQualifier(cell))+"---"+
                        new String(CellUtil.cloneValue(cell))+"\t");
            }
            System.out.println();
        }

        table.close();
    }

    /**
     * 带过滤的扫描
     * @param namespace 命名空间名称
     * @param tableName 表名称
     * @param startRow 开始row
     * @param stopRow 结束row
     * @param columnFamily 列族
     * @param columnName 列名
     * @param value 值
     * @throws IOException 异常
     */
    public static void filterScan(String namespace,String tableName,String startRow,String stopRow
            ,String columnFamily,String columnName,String value) throws IOException {
        //获取table
        Table table = connection.getTable(TableName.valueOf(namespace, tableName));

        //2.创建scan 对象
        Scan scan = new Scan();
        //如果此时直接调用,会直接扫描整张表

        //添加参数 来控制扫描的数据
        scan.withStartRow(Bytes.toBytes(startRow));
        scan.withStopRow(Bytes.toBytes(stopRow));

        //------>>>>>>
        //可以添加多个过滤
        FilterList filterList = new FilterList();
        //创建过滤器
        //(1)结果值保留当前列的数据
        ColumnValueFilter columnValueFilter = new ColumnValueFilter(
                //列族名称
                Bytes.toBytes(columnFamily),
                //列名
                Bytes.toBytes(columnName),
                // 比较关系
                CompareOperator.EQUAL,
                // 值
                Bytes.toBytes(value)
        );
        //(2)结果保留整行数据
        SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
                //列族名称
                Bytes.toBytes(columnFamily),
                //列名
                Bytes.toBytes(columnName),
                // 比较关系
                CompareOperator.EQUAL,
                // 值
                Bytes.toBytes(value)
        );


//        filterList.addFilter(columnValueFilter);
        filterList.addFilter(singleColumnValueFilter);
        //添加过滤
        scan.setFilter(filterList);
        //读取多行数据 获得scanner
        ResultScanner scanner = null;
        try {
            scanner = table.getScanner(scan);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        //ResultScanner来记录多行 result的数组
        for (Result result : scanner) {
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                System.out.print(new String(CellUtil.cloneRow(cell))+"---"+
                        new String(CellUtil.cloneFamily(cell))+"---"+
                        new String(CellUtil.cloneQualifier(cell))+"---"+
                        new String(CellUtil.cloneValue(cell))+"\t");
            }
            System.out.println();
        }

        table.close();
    }

    /**
     * 删除一行中的一列数据
     * @param namespace 命名空间名称
     * @param tableName 表格名字
     * @param rowKey 主键
     * @param columnFamily 列族
     * @param columnName 列名
     */
    public static void deleteColumn(String namespace,String tableName,String rowKey,String columnFamily,String columnName) throws IOException {
        //获取table
        Table table = connection.getTable(TableName.valueOf(namespace, tableName));

        //创建delete对象
        Delete delete = new Delete(Bytes.toBytes(rowKey));

        //添加列信息
        //addColumn删除一个版本的数据
        //addColumns删除多个版本的数据
        delete.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName));
        delete.addColumns(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName));

        try {
            table.delete(delete);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        //关闭
        table.close();
    }
}

参考:
https://blog.csdn.net/m0_62783212/article/details/126474913

https://www.jianshu.com/p/028064e0a3ec

https://www.cnblogs.com/wdh01/p/16290470.html

https://www.cnblogs.com/djq1234/articles/13510912.html

上一篇下一篇

猜你喜欢

热点阅读