Sequence File

2019-03-10  本文已影响0人  436048bfc6a1
  1. 代码中使用Sequence File
val file = sc.sequenceFile[BytesWritable, String]("hdfs://hadoop:12345/user/hive/warehouse/spark_seq/000000_0")
file.map(x => (x._1.copyBytes(), x._2)).foreach(println)

1.1 代码输出

([B@291d5e78,AT Austria)
([B@52a6f1dc,BE Belgium)
([B@688e2655,BY Belarus)
([B@578acd05,EE Estonia)
([B@1427b712,FR France)
([B@716a8c5b,DE Germany)
([B@68cfb0ad,GB United Kingdom)
([B@692cba54,US United States)

1.2 上述代码存在的问题

key是其hashcode值
工作中无法使用, 所以不使用该方式

1.3 对以上代码的改进

对之前代码的x_2根据\t进行拆分

1.4 注意
(1)

val file = sc.textFile("hdfs://hadoop:9000/user/hive/warehouse/spark_seq/000000_0")

输出结果为

BY Belarus
EE  Estonia FR  France
DE  GermanyGB   United KingdomUS    United States
SEQ"org.apache.hadoop.io.BytesWritableorg.apache.hadoop.io.TextjoK]g
AT Austria
BE  Belgium

使用textFile函数对其进行读取,无法对sequencefile进行正确的读取,因为不同的format底层存储是不一样的

(2) 由于sequence file是key-value形式的,有一些head信息,所以建议是[byteWritable: String]形式

byteWritable
  A byte sequence that is usable as a key or value
  (一个byte sequence,以key或者value形式使用)
  1. Sequence file源码与解析

2.1 源码

def sequenceFile[K, V]
        //(参数是指定的路径, 最小Partitions)
        //输入数据文件的路径
       (path: String, minPartitions: Int = defaultMinPartitions)
       (implicit km: ClassTag[K], vm: ClassTag[V],
        kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {
    withScope {
      assertNotStopped()
      val kc = clean(kcf)()
      val vc = clean(vcf)()
      val format = classOf[SequenceFileInputFormat[Writable, Writable]]
      val writables = hadoopFile(path, format,
        kc.writableClass(km).asInstanceOf[Class[Writable]],
        vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions)
      writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
    }
}

2.2 获得hadoop的sequence file的函数的实现

//Get an RDD for a Hadoop SequenceFile with given key and value types
def sequenceFile[K, V](path: String,
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int
      ): RDD[(K, V)] = withScope {
    assertNotStopped()
    val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
    hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
}

源代码解释

(1) 底层调用hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)

(2) 只要有inputFormat结果集就都能读取出来

因为val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
其调用SequenceFileInputFormat类
SequenceFileInputFormat类继承FileInputFormat
FileInputFormat实现了InputFormat
  1. 如何创建sequenceFile

3.1 在hive中创建表

create table spark_seq_raw(code string, name string) 
row format delimited fields terminated by '\t' ;
load data local inpath '/home/hadoop/data/seq.txt' 
overwrite into spark_seq_raw;

3.2 存储成sequenceFile

create table spark_seq(code string, name string) 
row format delimited fields terminated by '\t' stored as sequencefile;
insert into table spark_seq select * from spark_seq_raw;

3.3 查看sequencefile

hadoop fs -text /user/hive/warehouse/spark_seq/*
上一篇下一篇

猜你喜欢

热点阅读