hadoop处理字典文件

2019-01-01  本文已影响0人  鹅鹅鹅_

一、背景说明


      最近得到一批字典文件需要处理,当然原始的字典文件格式是不能直接当做字典来用的,而且这一批字典格式也不同一。刚开始都是一批压缩文件,经过脚本处理后解压出一批文本文件:

[hadoop@master dict]$ ll TXT
总用量 6676700
-rw-r--r--. 1 hadoop hadoop  102532669 3月  28 16:05 11-15.txt
-rw-r--r--. 1 hadoop hadoop   12515529 3月  28 16:05 123.txt
-rw-r--r--. 1 hadoop hadoop     180052 3月  28 16:05 133127.txt
-rw-r--r--. 1 hadoop hadoop    8537098 3月  28 16:05 142183.txt
-rw-r--r--. 1 hadoop hadoop  156511269 3月  28 16:05 14365003.txt
-rw-r--r--. 1 hadoop hadoop  121252829 3月  28 16:05 1-5.txt
-rw-r--r--. 1 hadoop hadoop   97542553 3月  28 16:05 16-20.txt
...

      可以看一下几个文件的格式内容:

    [hadoop@master dict]$ cat TXT/acg | tail -3
    zzzzzzzzyy
    zzzzzzzzz
    zzzzzzzzzz
    [hadoop@master dict]$ cat TXT/www.csdn.net | tail -3
    suwei2007  suwei2007  love_flyweiwei@163.com
    fangchengli  19860601  fangchengli@gmail.com
    jxjaxa  05040603  jxjaxa@hotmail.com
    [hadoop@master dict]$ cat TXT/hak5-withcount.txt | tail -3
          1 #th1992#
          1 #hjkyui67
          1  b55273236542107
    [hadoop@master dict]$ cat cred  | tail -3
    78450578-|--|-jbrueneman@cox.net-|-u22m/i9KQBI=-|-new coffee mug|--
    78450579-|--|-matthew_j_gould@hotmail.com-|-hLctj76LgIHioxG6CatHBw==-|-It goes 1x2x3x|--
    78450580-|--|-marialali@telefonica.net-|-qMtZ8ulqWL1fr+u4/BG3[hadoop@master dict]$ 

      可以看到,文件内容格式是不统一的,而且不是每个字段都可以直接作为密码,比如邮箱字段,最好是将邮箱的用户名作为可参考的密码,比如123456@qq.com中的123456.

二、技术思路


      整个技术处理流程可分为以下几个阶段:

  1. 预处理
    将所有的文本文件的格式略处理一下,形成比较统一的、可方便集中处理的文本格式。
    然后可以选择将所有的文本文件内容抽出几行,组合成一个包含了所有文本文件内容格式的文件,用于观察和测试。
  2. map阶段
    map阶段可以用于提取一行文本的各个字段,并做处理,输出所需字段。
  3. reduce阶段
    reduce阶段可用来去重。因为map阶段已经默认自动排序过了,所以这个阶段去下重就可以了。

三、技术实现


  1. 预处理
  1. 编写map程序
    从look.txt文件内容格式来看,各个需要的字段都是以空格或者tab分割的。邮箱需要特殊处理,mapper脚本如下:
    #!/usr/bin/env python3.6
    
    import sys 
    import re
    
    # input comes from STDIN (standard input)
    for line in sys.stdin:
        line = line.strip()
        words = filter(lambda word: word, line.split())
        for word in words:
            word = re.sub(r'@[^.]*.com','',word)
            #只需要输出key就可以了,hadoop会自动按照key排序
            print(word) 
    
  2. 编写reduce程序
    reduce阶段重点在于去重,可使用Python的字典来去重。
    #! /usr/bin/env python3.6
    
    import sys 
    import subprocess
    import os
    dict = {}
    for line in sys.stdin:
        word = line.strip('\n')
        if word not in dict.keys(): 
            print(word)
        else:
            dict[word]=1
    
    
  3. 单机测试
    直接执行如下命令进行单机测试:
    [hadoop@master dict]$ cat look.txt | ./mapper.py | ./reducer.py 
    zangzt1985
    88300667
    sunguangjin2006
    sunguangjin
    shanwenyi3427.cn
    1103887448
    !@#$%^&*
    !@#$%^&*(
    !@#$%^&*()
    %null%
    %username%
    !@#$
    !8zj39le
    
    
    显然,执行结果正确。
  4. 集群测试
    可将测试文件look.txt上传至hadoop文件系统运行mapreduce来测试,具体方法略。
  5. 真正运行程序
    [hadoop@master dict]$ hadoop jar ../hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -input dict -output output -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py 
    
    程序最终成功执行,从运行日志来看,耗时26分钟,原始文件6.4G,输出文件6.7G。
    17/04/01 20:14:31 INFO client.RMProxy: Connecting to ResourceManager at master/10.10.18.230:8032
    17/04/01 20:14:31 INFO client.RMProxy: Connecting to ResourceManager at master/10.10.18.230:8032
    ...
    17/04/01 20:40:31 INFO mapreduce.Job: Job job_1491015080715_0001 completed successfully
    17/04/01 20:40:31 INFO mapreduce.Job: Counters: 51
    
    
  6. reducer简单的优化
    由于reducer接收到的key/value已经根据key排序了,在java程序中可以得到key-valuelist对:
    keyA value1 value2 value3 ...
    keyB value1 value2 value3 ...
    keyC value1 value2 value3 ...
    
    但是在streaming应用中,貌似得到的是key/value对:
    keyA value1
    keyA value2
    keyA value3
    keyB value1
    keyB value2
    keyB value3
    
    所以,其实不用Python字典也可以进行去重:
    #! /usr/bin/env python3.6
    
    import sys 
    old=""
    for line in sys.stdin:
        word = line.strip('\n')
        if word == old :
            continue
        else :
            print(word)
            old = word
    
    
    但是实际仍然运行了26分钟。
    进一步而言,reducer其实也可以分布式执行。
  7. 问题

      在mapper.py处理输入文件时,Python报错:

```
'utf-8' codec can't decode byte 0xb4 in position 0:invalid start byte
```

      然而若Python版本换为2.6就没有问题。具体原因不在这里分析,最后通过将源文件转码解决问题:

    [hadoop@master dict]$ iconv -f gb2312 -t utf-8 -c all.txt -o all.t
上一篇下一篇

猜你喜欢

热点阅读