大数据

NCDC气象大数据处理

2022-03-13  本文已影响0人  小胡_鸭

1、下载数据

  通过 ftp://ftp.ncdc.noaa.gov/pub/data/noaa 可以下载《hadoop权威指南》第4版的数据,每个年份的数据放在对应的目录下,如 1901 年数据的目录为 ftp://ftp.ncdc.noaa.gov/pub/data/noaa/1901, 使用 getdata.sh 脚本下载数据,内容如下:

#!/bin/bash

baseDir="$(cd `dirname $0`; pwd)"

# 打日志的函数,追加时间标识
log(){
  time=$(date "+%Y-%m-%d %H:%M:%S")
  echo "[$time] - $1"
}


log "========== start to download ncdc noaa data (Year: $1 ~ $2) ============"
for i in $(seq $1 $2)
do
    cd $baseDir
    rm -rf $i
    mkdir $i
    cd $i
    log "current dir is : `pwd`"
    wget --execute robots=off —accept=tar -r -np -nH --cut-dirs=4 - R index.html* ftp://ftp.ncdc.noaa.gov/pub/data/noaa/$i/
    log "download data of year $i success."
done
log "======================== download finished!!! =========================="

  先给脚本赋予执行权限

chmod +x getdata.sh

  指定要下载的年份区间作为参数传递给脚本,如 sh getdata.sh 1901 1930 ,下载从1901年到1930年的气象数据

sh getdata.sh 1901 1930


2、打包数据

  每个年份的目录下都会有很多 *.gz 文件,将其打包成 tar 文件,使用 mergedata.sh 脚本,内容如下:

#!/bin/bash

log() {
  time=$(date "+%Y-%m-%d %H:%M:%S")
  echo "[$time] - $1"
}

# 将某个年份下的所有压缩文件打包成一个tar
tardata() {
  log "tar $1/*.gz to $1/$1.tar"
  cd $1
  rm -rf $1.tar
  tar -cvf $1.tar .
}


baseDir="$(cd `dirname $0`; pwd)"

if [ $# -eq 1 ]
then
   cd $baseDir
   tardata $1
else
   for i in $(seq $1 $2)
   do
        cd $baseDir
        tardata $i
   done
fi

  可以单独指定某一年份,也可以指定年份区间,最终会将年份目录下的所有 gz 文件打包成一个文件名为年份的 tar 文件

sh mergedata.sh 1901 1930


3、上传数据到HDFS存储

  启动 hadoop 集群,使用 upload_data.sh 将2中生成的 tar 文件上传到 HDFS 中,脚本内容如下:

#!/bin/bash

hdfsDir="/hadoop/data/ncdc/noaa"

for i in $(seq $1 $2)
do
    hdfs dfs -rm -r $hdfsDir/$i
    echo "Clear dest dir $hdfsDir/$i"
    hdfs dfs -mkdir -p $hdfsDir/$i
    hdfs dfs -put $i/$i.tar $hdfsDir/$i
    echo "Upload $i/$i.tar to $hdfsDir/$i success!"
done

  脚本同样是指定年份区间,并且保存在 HDFS 的 /hadoop/data/ncdc/noaa 目录下,每个年份一个目录,目录下有一个2中打包的 tar 文件

sh upload_data.sh 1901 1930

  脚本执行成功后查看 HDFS web console,可以看到文件正确上传了


4、生成 ncdc_files.txt

  经过上述3步的处理,可以写个脚本为 HDFS 上的 tar 包生成一个 ncdc_files.txt,内容为所有 tar 包的存储路径,如:

/hadoop/data/ncdc/noaa/1901/1901.tar
/hadoop/data/ncdc/noaa/1902/1902.tar
/hadoop/data/ncdc/noaa/1903/1903.tar
/hadoop/data/ncdc/noaa/1904/1904.tar
/hadoop/data/ncdc/noaa/1905/1905.tar
...

  使用 create_ncdc_files.sh 脚本生成该文件,内容如下:

#!/bin/bash

rm -rf ncdc_files.txt
hdfsDir="/hadoop/data/ncdc/noaa"
hdfs dfs -rm -r $hdfsDir/ncdc_files.txt

for i in $(seq $1 $2)
do
   fileName=$hdfsDir/$i/$i.tar
   echo "echo $fileName to ncdc_files.txt"
   echo "$fileName" >> ncdc_files.txt
done

hdfs dfs -put ncdc_files.txt $hdfsDir


5、文件合并压缩

  将 4 生成的文件作为本步操作的数据源信息,写一个脚本 load_ncdc_map.sh ,实现以下功能:

PS. 这样做的目的是为了将每个年份下的小文件合并成一个大文件,提高 MapReduce 程序处理的效率。

#!/bin/bash

# [测试] 只处理ncdc_files.txt中的第一行数据指定的文件
#read hdfs_file
#echo "$hdfs_file"

# [hadoop streaming] 运行MR程序执行合并操作
read offset hdfs_file
echo -e "$offset\t$hdfs_file"

# 1.从HDFS获取ncdc_files.txt中指定的tar文件下载到本地当前目录
#
# Retrieve file from HDFS to local disk
echo "reporter:status:Retrieving $hdfs_file" >&2
hdfs dfs -get $hdfs_file .


# 2.创建本地目录,eg:为1901.tar创建1901的目录
#
# Create local directory
target=`basename $hdfs_file .tar`
mkdir $target


# 3.解压文件到本地目录,即将1901.tar解压到1901目录中
#   1901.tar打包了多个*.gz文件
#
echo "reporter:status:Un-tarring $hdfs_file to $target" >&2
tar xf `basename $hdfs_file` -C $target


# 4.将1901目录中的所有*.gz文件解压到一个1901.all文件中
#
# Unzip each station file and concat into one file
echo "reporter:status:Un-gzipping $target" >&2
for file in $target/*
do
        gunzip -c $file >> $target.all
        echo "reporter:status:Processed $file" >&2
done


# 5.将1901.all以gzip压缩并复制至HDFS中
#   删除1901.tar、1901目录、1901.all
#
# Put gzipped version into HDFS
echo "reporter:status:Gzipping $target and putting in HDFS" >&2
gzip -c $target.all | hdfs dfs -put - /hadoop/data/ncdc/noaa_all/$target.gz
rm `basename $hdfs_file`
rm -r $target
rm $target.all

  使用注释的测试代码,这里只会处理 ncdc_files.txt 里指定的第一个包

[root@bigdata111 MRzip]# cat ncdc_files.txt | ./load_ncdc_map.sh 
/hadoop/data/ncdc/noaa/1901/1901.tar
reporter:status:Retrieving /hadoop/data/ncdc/noaa/1901/1901.tar
reporter:status:Un-tarring /hadoop/data/ncdc/noaa/1901/1901.tar to 1901
reporter:status:Un-gzipping 1901
reporter:status:Processed 1901/029070-99999-1901.gz
reporter:status:Processed 1901/029500-99999-1901.gz
reporter:status:Processed 1901/029600-99999-1901.gz
reporter:status:Processed 1901/029720-99999-1901.gz
reporter:status:Processed 1901/029810-99999-1901.gz
reporter:status:Processed 1901/227070-99999-1901.gz
reporter:status:Gzipping 1901 and putting in HDFS

  查看 HDFS 可以看到生成了一个压缩文件,目录为 /hadoop/data/ncdc/noaa_all,说明脚本的处理是正常的。


6、在 MR Streaming 上运行脚本

  编写一个执行脚本 start_MRZip.sh,内容如下:

#!/bin/bash

hadoop jar ${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
    -D mapreduce.job.reduces=0 \
    -D mapreduce.map.speculative=false \
    -D mapreduce.task.timeout=12000000 \
    -inputformat org.apache.hadoop.mapred.lib.NLineInputFormat \
    -input /input/ncdc_files.txt \
    -output /output/noaa \
    -mapper load_ncdc_map.sh \
    -file load_ncdc_map.sh

  执行脚本之前,先上传 ncdc_files.txt 到 HDFS

hdfs dfs -put ncdc_files.txt /input

  执行脚本

sh start_MRZip.sh

  查看 HDFS 控制台发现目录下并未生成对应的压缩文件,查看 作业日志,发现是执行 shell 时无法识别 hdfs 命令,容器无法识别到宿主机的环境设置。

  因此要指定命令的绝对路径,将脚本改成:

#!/bin/bash

# [测试] 只处理ncdc_files.txt中的第一行数据指定的文件
#read hdfs_file
#echo "$hdfs_file"

read offset hdfs_file
echo -e "$offset\t$hdfs_file"

# 1.从HDFS获取ncdc_files.txt中指定的tar文件下载到本地当前目录
#
# Retrieve file from HDFS to local disk
echo "reporter:status:Retrieving $hdfs_file" >&2
/root/training/hadoop-2.7.3/bin/hdfs dfs -get $hdfs_file .


# 2.创建本地目录,eg:为1901.tar创建1901的目录
#
# Create local directory
target=`basename $hdfs_file .tar`
mkdir $target


# 3.解压文件到本地目录,即将1901.tar解压到1901目录中
#   1901.tar打包了多个*.gz文件
#
echo "reporter:status:Un-tarring $hdfs_file to $target" >&2
tar xf `basename $hdfs_file` -C $target


# 4.将1901目录中的所有*.gz文件解压到一个1901.all文件中
#
# Unzip each station file and concat into one file
echo "reporter:status:Un-gzipping $target" >&2
for file in $target/*
do
        gunzip -c $file >> $target.all
        echo "reporter:status:Processed $file" >&2
done


# 5.将1901.all以gzip压缩并复制至HDFS中
#   删除1901.tar、1901目录、1901.all
#
# Put gzipped version into HDFS
echo "reporter:status:Gzipping $target and putting in HDFS" >&2
gzip -c $target.all | /root/training/hadoop-2.7.3/bin/hdfs dfs -put - /hadoop/data/ncdc/noaa_all/$target.gz
rm `basename $hdfs_file`
rm -r $target
rm $target.all

  再次执行,job 下的所有 task 都失败了,并且每个 task 的每次 attemp 都报错如下:

  Current usage: 147.1 MB of 1 GB physical memory used; 4.1 GB of 2.1 GB virtual memory used. Killing container.

  容器需要的 4.1GB 内存,而虚拟内存只有 2.1 GB,不够用了,所以容器被杀死,而下次尝试执行任务创建的容器又因为同样的原因而被杀死,不断循环。

  这是因为容器最大内存默认为 1GB,在 mapred-site.xml 通过以下参数设置:

  <property>
    <name>mapreduce.map.memory.mb</name>
    <value>1024</value>
  </property>
  
  <property>
    <name>mapreduce.reduce.memory.mb</name>
    <value>1024</value>
  </property>

  而当容器使用的内存超过 1GB 时,就会启用虚拟内存,但是有限制最大倍数,在 yarn-site.xml 通过以下参数设置:

<property>
    <name>yarn.nodemanager.vmem-pmem-ratio</name>
    <value>2.1</value>
</property>

  所以 1GB x 2.1 = 2.1GB,容器最大能够使用的内存大小为 2.1GB,超过这个大小,容器就会被 nodemanager 中止。

  所以这里我简单调大容器最大物理内存大小为 4GB

  <property>
    <name>mapreduce.map.memory.mb</name>
    <value>4096</value>
  </property>
  
  <property>
    <name>mapreduce.reduce.memory.mb</name>
    <value>4096</value>
  </property>

  重启hadoop让配置生效,然后重新执行任务,不再报错,执行正常

  查看 HDFS 目录,压缩文件生成了!!!o(╥﹏╥)o

上一篇下一篇

猜你喜欢

热点阅读