分布式计算(一)Hadoop例子
2019-10-10 本文已影响0人
一个想当大佬的菜鸡
计算向量的和
输入为A 11 0.3
import sys
import os
def map():
for line in sys.stdin:
line = line.strip()
fields = line.split('\t')
AB = fields[0]
index = fields[1]
val = fields[2]
print("\t".join([index, AB, val]))
def red():
res = 0
preIndex = preVal = None
for line in sys.stdin:
line = line.strip()
fields = line.split('\t')
index = fields[0]
val = fields[2]
if index == preIndex:
res += preVal * val
preVal = val
preIndex = index
print(res)
def run():
cmd = "%(hadoop_home)s/hadoop fs -rmr %(output)s" % {'hadoop_home': hadoop_home, 'output': output_dir}
os.system(cmd)
cmd = "%(hadoop_home)s/hadoop streaming \
-D mapred.job.priority=NORMAL \
-D mapred.reduce.tasks=%(red_tasks)d \
-cacheArchive '/share/python2.7.tar.gz#python2.7' \
-file %(script)s \
-mapper 'python2.7/bin/python %(map)s' \
-reducer 'python2.7/bin/python %(red)s' \
-input %(input_dir)s \
-output %(output_dir)s " % {
'hadoop_home': hadoop_home,
'script': script,
'input_dir': input_dir,
'output_dir': output_dir,
'red_tasks': 1,
'map': '%s map' % (script),
'red': '%s red' % (script)
}
os.system(cmd)
统计词的个数
import sys
import os
def map():
for line in sys.stdin:
line = line.strip()
fields = line.split('\t')
word = fields[0]
cnt = fields[1]
print("\t".join([word, cnt]))
def red():
res = 0
preWord = None
for line in sys.stdin:
line = line.strip()
fields = line.split('\t')
word = fields[0]
cnt = fields[1]
if not preWord:
res = cnt
preWord = word
elif word == preWord:
res += cnt
preWord = word
else:
print("\t".join([preWord, res]))
res = cnt
preWord = word
print("\t".join([preWord, res]))
其他资料
#!/usr/bin/python
import sys
import random
import os
import urllib
import re
is_wise = 1 #wise:1 pc:0
date = '20141218' # formate:20140115
num_of_set = 1200 #case number
if is_wise == 1:
output_dir="/app/ecom/fcr-ad/wangqinglei/lixingpingu_wise/"
else:
output_dir="/app/ecom/fcr-ad/wangqinglei/lixingpingu_pc/"
#Range:'204','225','201'
position_pc = ['204','225']
#Range:'222','223','228', '229'
position_wise = ['222','223']
mapper_out_radio = 10
script = "hadoop_sampling_0123.py"
hadoop_home ='/home/work/wangqinglei/hadoop-client-nmg/hadoop/bin'
if is_wise == 0:
input_dir = "/app/ecom/fcr-important/shitu-log-new/fc_shitu_new_204/%(date)s/*/part-* -input /app/ecom/fcr-important/shitu-log-new/fc_shitu_new_225/%(date)s/*/part-* -input /app/ecom/fcr-important/shitu-log-new/201/%(date)s/*/part-*"%{'date':date}
else:
input_dir = "/app/ecom/fcr-important/shitu-log-wise/222_223/%(date)s/*/part-*"%{'date':date}
def map():
for line in sys.stdin:
line=line.strip()
fields=line.split('\t')
if (is_wise == 1) and (fields[9] in position_wise):
randnum=random.randint(1,100)
if randnum <= mapper_out_radio:
print line
if (is_wise == 0) and (fields[9] in position_pc):
randnum=random.randint(1,100)
if randnum <= mapper_out_radio:
print line
def red():
i = 0
set = []
for line in sys.stdin:
if i>=num_of_set:
break
line=line.strip()
fields=line.split('\t')
red=fields[0:]
set.append(red)
i+=1
for line in sys.stdin:
randnum=random.randint(0,i)
if randnum < num_of_set:
fields=line.strip().split('\t')
red=fields[0:]
set[randnum] = red
i+=1
for j in range(0,len(set)):
sys.stdout.write('\t'.join(set[j])+'\n')
def run():
ret = 0
cmd="%(hadoop_home)s/hadoop fs -rmr %(output)s"%{'hadoop_home':hadoop_home, 'output':output_dir}
ret = os.system(cmd)
cmd="%(hadoop_home)s/hadoop streaming \
-D mapred.job.priority=NORMAL \
-D mapred.reduce.tasks=%(red_tasks)d \
-cacheArchive '/share/python2.7.tar.gz#python2.7' \
-file %(script)s \
-mapper 'python2.7/bin/python %(map)s' \
-reducer 'python2.7/bin/python %(red)s' \
-input %(input_dir)s \
-output %(output_dir)s "%{
'hadoop_home':hadoop_home,
'script':script,
'input_dir':input_dir,
'output_dir':output_dir,
'red_tasks': 1,
'map':'%s map'%(script),
'red':'%s red'%(script)
}
ret = os.system(cmd)
if ret != 0:
print "ERROR in hadoop step,cmd[%s]"%(cmd)
exit(-1)
if is_wise == 0:
cmd="/bin/rm ./case_%(date)s_pc_ori"%{'date':date}
os.system(cmd)
cmd="%(hadoop_home)s/hadoop fs -get %(output)s/part-00000 ./case_%(date)s_pc_ori"%{'hadoop_home':hadoop_home,'output':output_dir,'date':date}
else:
cmd="/bin/rm ./case_%(date)s_%(position)s_wise_ori"%{'date':date, 'position':position_wise[0]}
os.system(cmd)
cmd="%(hadoop_home)s/hadoop fs -get %(output)s/part-00000 ./case_%(date)s_%(position)s_wise_ori"%{'hadoop_home':hadoop_home,'output':output_dir,'date':date, 'position':position_wise[0]}
ret = os.system(cmd)
if ret != 0:
print "ERROR in get file from hadoop,cmd[%s]"%(cmd)
exit(-1)
if is_wise == 0:
cmd="python %(script)s extract case_%(date)s_pc <./case_%(date)s_pc_ori"%{'script':script, 'date':date}
else:
cmd="python %(script)s extract case_%(date)s_%(position)s_wise <./case_%(date)s_%(position)s_wise_ori"%{'script':script, 'date':date, 'position':position_wise[0]}
ret = os.system(cmd)
if ret != 0:
print "ERROR in extract data, cmd[%s]"%(cmd)
exit(-1)
def extract(filename_prefix):
index = 0
if is_wise == 0:
t_url_index = 73
else:
t_url_index = 92
ori_with_index = open("%s_ori_with_index"%(filename_prefix), "w")
final_output = open("%s_final.txt"%(filename_prefix), "w")
print >>final_output, "index\tmark\tquery\ttitle\tdesc1\tdesc2\tcmatch\twmatch\tbidword\ttarget-url"
for line in sys.stdin:
array = line.strip().split("\t")
print >>ori_with_index, "%d\t%s"%(index, line.strip())
fol = "%(index)d \t \t %(query)s \t %(title)s \t %(desc1)s \t %(desc2)s \t %(cmatch)s \t %(wmatch)s \t %(bidword)s \t %(target_url)s \r\n"%{
'index':index,
'query':re.sub("\t", " ", urllib.unquote(array[3])),
'cmatch':array[9],
'wmatch':array[10],
'title':re.sub("\t", " ", urllib.unquote(array[24])),
'desc1':re.sub("\t", " ", urllib.unquote(array[25])),
'desc2':re.sub("\t", " ", urllib.unquote(array[26])),
'bidword':re.sub("\t", " ", urllib.unquote(array[23])),
'target_url':"%s"%(urllib.unquote(array[t_url_index])),
}
print >>final_output, fol.decode('gbk', 'ignore').encode('gbk'),
index+=1
ori_with_index.close()
final_output.close()
if __name__ == '__main__':
if len(sys.argv) < 2:
run()
exit(0)
elif sys.argv[1] == "map":
map()
exit(0)
elif sys.argv[1] == "red":
red()
exit(0)
elif sys.argv[1] == "extract":
extract(sys.argv[2])
import sys
import os
import datetime
import time
import json
import urllib
def map():
url_dict = {}
cmatch_list = ['222', '223']
for line in open('target_url', 'r'):
fields = line.strip().split('\t')
url = fields[0]
url_dict[url] = 0
for line in sys.stdin:
fields = line.strip().split('\t')
show = fields[0]
clk = fields[1]
query = fields[3]
userid = fields[17]
bidword = fields[23]
cmatch = fields[9]
url = urllib.unquote(fields[92])
if cmatch not in cmatch_list:
continue
if url not in url_dict:
continue
content = []
content.append(query)
content.append(bidword)
content.append(userid)
content.append(url)
content.append(show)
content.append(clk)
print '\t'.join(content)
def reduce():
for line in sys.stdin:
fields = line.strip().split()
def run():
hadoop_home = '/home/maling03/hadoop/hadoop-client-kehan/kehan_hadoop/bin/'
input_dir1 = 'hdfs://nmg01-mulan-hdfs.dmop.baidu.com:54310/app/ecom/fcr-important/shitu-log-wise/222_223/201710*/*'
output_dir = '/app/ecom/baitong/maling03/fc-app/201710'
ret = 0
script = "log_cost.py"
cmd ="%(hadoop_home)s/hadoop fs -rmr %(output)s"%{
'hadoop_home':hadoop_home,
'output':output_dir}
ret = os.system(cmd)
cmd="%(hadoop_home)s/hadoop streaming \
-D mapred.job.priority=VERY_HIGH \
-D mapred.reduce.tasks=%(red_tasks)d \
-D mapred.map.tasks=3000 \
-D stream.memory.limit=4000 \
-D mapred.job.name=%(workname)s \
-D mapred.map.memory.limit=4000 \
-D mapred.job.map.capacity=3000 \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-jobconf stream.num.map.output.key.fields=2 \
-jobconf num.key.fields.for.partition=1 \
-cacheArchive /share/python2.7.tar.gz#python2.7 \
-file %(script)s \
-file target_url \
-mapper 'python2.7/bin/python %(map)s' \
-reducer 'cat' \
-input %(input_dir1)s \
-output %(output_dir)s "%{
'hadoop_home':hadoop_home,
'script':script,
'workname':"log_cost",
'input_dir1':input_dir1,
'output_dir':output_dir,
'red_tasks': 100,
'map':'%s map'%(script),
'red':'%s red'%(script)
}
ret = os.system(cmd)
if ret != 0:
print "ERROR in hadoop step, cmd[%s]"%(cmd)
if __name__ == '__main__':
if sys.argv[1] == "run":
run()
elif sys.argv[1] == "map":
map()
exit(0)
elif sys.argv[1] == "red":
reduce()
exit(0)
else:
print "error"