Hadoop MapReduce 程序实例(Python)

2021-11-11  本文已影响0人  HAO延WEI

转载地址:https://www.cnblogs.com/surimj/p/9863841.html

问题背景

现在有两份数据,file1是校园新闻版块,每一条新闻点击记录;file2是校园新闻版块使用活跃度高的学生记录。用mr统计出某一天的点击记录里,使用ios/android手机的活跃学生的总的点击次数
原始数据格式如下:
file 1,校园新闻点击记录,记录了每一次学生点击校园新闻的行为
格式:学号\t姓名\t手机端\t新闻id\新闻关键词

20170001 xiaoming android 331 学费
20170002 xiaohong ios 332 食堂
20170003 xiaohua android 333 考研
20170001 xiaoming android 222 评优
20170001 xiaoming android 225 学费
20170003 xiaohua android 111 期末考试
20170002 xiaohong ios 117 空调安装

file2,校园新闻活跃学生记录,记录了对校园新闻版块点击活跃度高的学生信息
格式:格式:学号\t姓名\t年龄\t手机端

20170001 xiaoming 20 android
20170002 xiaohong 19 ios
20170001 xiaoqiang 20 android

问题分析
1、key的选择:选取学号+手机端共同作为key。学号是用于对是否活跃用户的判断,手机端是用于统计时的分类。注意,这里如果仅仅将手机端作为key有一个坏处,就是key值将只有两种,这样不利于将海量的数据分为多个小部分处理,因此选择学号+手机端共同作为key。
2、关键点:在对每一行数据进行处理时,利用两份数据,做好是否活跃用户的标记;在对同一个key进行处理时,根据前一步处理的结果,判断是否进行累加。

代码实现
1、mapper.py
解析原始数据,将其处理为一系列的键值对(key-value pair),这里选取"学号_手机端"作为key。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import sys


def main():
    for line in sys.stdin:
        data = line.strip().split(" ")
        if len(data) == 5:
            student_id = data[0]
            os_name = data[2]
            key = student_id + '_' + os_name
            print("\t".join([key, "file1"]))
        elif len(data) == 4:
            student_id = data[0]
            os_name = data[-1]
            key = student_id + '_' + os_name
            print("\t".join([key, "file2"]))


if __name__ == '__main__':
    main()

2、reducer.py
由MapReduce框架的机制,同一个key的所有数据都会被传给同一个reducer,同一个reducer可以接受多个键值对。
reducer将相同键的一组值,进行处理产生一组规模更小的值(如累加得到一个值)。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import sys
import json

dic_result = {'android': 0, 'ios': 0}
pre_key = ""


def post_deal(pre_key):
    # print pre_key
    # 相同key的数据的处理
    global dic
    os_name = pre_key.strip().split("_")[1]
    print("==>>>", pre_key, os_name)
    # print dic
    # if 'click' in dic and dic['click'] == 'yes' :#and 'active' in dict and dic['active'] == 'yes':
    if 'click' in dic and dic['click'] == 'yes' and 'active' in dic and dic['active'] == 'yes':

        # if os_name not in dic_result:
        #     dic_result[os_name] =0
        dic_result[os_name] += 1


def deal(data):
    # 对每一行数据的处理
    if data[1].strip() == "file1":
        dic['click'] = 'yes'
    if data[1].strip() == "file2":
        dic['active'] = 'yes'


def pre_deal():
    # 预处理,如将用于存储同一个key的一组value中间结果的字典置零
    global dic
    dic = dict()


def main():
    # reducer
    # data_list = ['20170001_android    file1',
    #              '20170002_ios    file1',
    #              '20170001_android    file1',
    #              '20170001_android    file1',
    #              '20170001_android    file2',
    #              '20170001_android    file2',
    #              '20170002_ios    file1',
    #              '20170002_ios    file2',
    #              '20170003_android    file1',
    #              '20170003_android    file1']
    #
    # for line in data_list:
    #     data = line.strip().split("    ")
    for line in sys.stdin:
        data = line.strip().split("\t")
        key = data[0]
        global pre_key
        # 当同一个key的数据全部deal完,到下一个key时
        if key != pre_key:
            if pre_key != "":
                # 处理同一个key所有数据
                post_deal(pre_key)
            # 处理完成后,预处理准备下一轮处理
            pre_deal()
            # 将pre_key更新为下一个要处理的key
            pre_key = key
        # 将同一个key的一组值按需要进行处理/整合
        deal(data)

        # 处理最后一个key的数据
        if pre_key != "":
            post_deal(pre_key)
        # 打印结果
    print(json.dumps(dic_result))


if __name__ == "__main__":
   main()

3、线下测试 cat file|map|sort|reduce

STEP1:测试map
cat file1_test file2_test | python mapper.py > map_out
测试中,map_out中的数据将如下所示:

20170001_andorid file1
20170002_ios file1
20170003_andorid file1
20170001_andorid file1
...
20170001_andorid file2
20170002_ios file2

STEP2:对map_out按照key(key默认放在第一列)排序,使相同key的数据在一起,然后测试reducer
cat map_out |sort -k1,1|python reducer.py > red_out
reducer的输出是json,形如:{'ios':1123, 'android':3333}

4、hadoop线上运行 run.sh

#!/bin/bash
WORKROOT="..."
HADOOP="hadoop客户端路径"
INPUT1="file1文件路径"
INPUT2="file2文件路径"
OUTPUT_DIR="输出的HDFS路径"
PYTHON="python路径"

#设置一系列参数,并启动MR任务
$HADOOP streaming \
    -D mapred.map.tasks=200 \
    -D mapred.job.map.capacity=400 \
    -D mapred.red.tasks=100 \
    -D mapred.job.red.capacity=400 \
    -D mapred.job.priority=HIGH \
    -D stream.num.map.output.key.fields=1 \
    ...
    -partitionor ... \
    -cacheArchive ... \
    -input $INPUT1 \
    -input $INPUT2 \
    -output $OUTPUT_DIR \
    -mapper "$PYTHON mapper.py" \
    -reducer "$PYTHON reducer.py" \
    -file $WORKROOT/mapper.py \
    -flle $WORTROOT/reducer.py
"""
 成功运行完成,创建一个空文件,加上成功的标签
"""
 if [ ${?} -eq 0 ]; then
   $HADOOP fs -touchz $OUTPUT_DIR succuss.tag
部分参数的解释:

mapred.map.tasks:map数目
mapred.reduce.tasks:reduce数目
mapred.job.map.capacity:map并发数目
mapred.job.reduce.capacity:educe并发数目
stream.num.map.output.key.fields:设置map程序分隔符的位置,该位置之前的部分作为key,之后的部分作为value

上一篇 下一篇

猜你喜欢

热点阅读