离线计算组件篇-Spark-读写Hbase

2022-12-07  本文已影响0人  CoderInsight
5).Spark-core读写HBase的数据
<!--此时需要的Maven依赖 -->
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>wyq</groupId>
    <artifactId>ScalaReadHBase</artifactId>
    <version>1.0-SNAPSHOT</version>

<properties>
        <spark.version>2.0.0</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-common -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>1.2.11</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.11</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.11</version>
        </dependency>

    </dependencies>


    <build>
         <plugins>
             <!--当前插件是用来让maven能够编译、测试、运行scala项目的 -->
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

# 进入spark的安装目录下,有一个专门放jar包的路径(在spark-2.10之后)
cd /usr/local/spark/jars
# 将HBase的jar包拷贝到当前目录下
cp /usr/local/hbase/lib/hbase*.jar ./ # 将hbase开头的所有jar包全部导入到jars目录下
cp /usr/local/hbase/lib/guava-12.0.1.jar ./
cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./
cp /usr/local/hbase/lib/metrics-core-2.2.0.jar ./

import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object SparkOperateHBase {
def main(args: Array[String]) {
    // 创建HBase的配置文件对象
    val conf = HBaseConfiguration.create()
    // 初始化一个SparkContext对象,传入对应的SparkConf()对象
    val sc = new SparkContext(new SparkConf())
    // 设置查询的表名
    conf.set(TableInputFormat.INPUT_TABLE, "student")
    // 传入配置文件对象,其余的是固定的
    val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[ImmutableBytesWritable],
  classOf[Result])
    // 查询当前数据集中有多少行数据
    val count = stuRDD.count()
    // 测试打印输出
    println("Students RDD Count:" + count)
    // 将数据持久化到内存,一般是直接调用cache()方法,它会默认调用persist(MEMORY_ONLY);防止在这之前调用了行动操作,然后会从头执行,在之后的行动操作中可以直接调用内存中保存的变量,而不用再从头执行
    stuRDD.cache()
    // 遍历输出
    /*
    1. case (_,result):中的"_"是占位符,表示其他的参数,是没有实际作用,都是给系统的变量,我们只用后边的result变量,其中保存了HBase中的数据
    2.result.getRow:取出所有的行键
    3.result.getValue(列族,某一列);
    参数必须是字节数组对象,可以使用 getBytes方法转换为
    */
    stuRDD.foreach({ case (_,result) =>
        val key = Bytes.toString(result.getRow)
        val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes))
        val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes))
        val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes))
        println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age)
    })
}
}

spark-submit \
--class SparkOperateHBase \
/usr/local/spark/mycode/simple-project_2.11-1.0.jar
# 该命令执行的时候会自动的读取jars目录下的所有包,如果是在jars文件夹下又单独建立的文件存储hbase的jar包,则需要使用以下参数进行指定
--driver-class-path /usr/local/spark/jars/hbase/*:/usr/local/hbase/conf 

import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}

object SparkWriteHBase {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    val tableName = "student"
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tableName)

    val job = new Job(sc.hadoopConfiguration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    val indataRDD = sc.makeRDD(Array("3,dufu,M,26","4,xingzhesun,M,27"))
    val rdd = indataRDD.map(_.split(',')).map{arr=>{
      // 行健的值
      val put = new Put(Bytes.toBytes(arr(0)))
      // info:name列的值
      put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
      // info:gender列的值
      put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2)))
      // info:age列的值
      put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3)))
      // 表示返回数据类型,以及数据(put对象)
      (new ImmutableBytesWritable, put)
    }}
    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
  }
}

上一篇下一篇

猜你喜欢

热点阅读