python

2020-05-09-python-在yarn-cluster下

2020-05-09  本文已影响0人  logi

环境打包

创建好
环境后,进入到环境所在的文件夹,例如你的环境是 ***/***/project_env, cd到project_env下,使用打包命令将当前目录下的文件打包

zip -r project_env.zip ./*
在当前文件夹下,将其上传至hdfs
hadoop fs -put ***/***/project_env/project_env.zip hdfs://***/***/***/env/

执行脚本

HOME_PATH=$(cd $(dirname $0);pwd)


xx/spark-2.2/bin/spark-submit     \
--queue xxx \
--executor-memory 32G \
--packages com.databricks:spark-csv_2.10:1.5.0 \
--driver-memory 12G \
--master yarn-cluster \
--executor-cores 4 \
--num-executors 100 \
--name "xxxx" \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.shuffle.service.enabled=true \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./project_env.zip/bin/python \
--conf spark.executorEnv.PYSPARK_PYTHON=./project_env.zip/bin/python \
--conf spark.yarn.appMasterEnv.LD_LIBRARY_PATH=/opt/rh/python27/root/usr/lib64 \
--conf spark.executorEnv.LD_LIBRARY_PATH=/opt/rh/python27/root/usr/lib64 \
--archives viewfs:///user/hadoop-nlpml/yuhang06/envs/spark_knn.zip \
$HOME_PATH/ann_pyspark.py ${version} ${strategy}

LD_LIBRARY_PATH如果存在导入一些包的时候报libffi.so.6不存在,这里使用老的的Python环境中的这个依赖。

python demo

#coding=utf-8

from annoy import AnnoyIndex
import pickle
import re
import numpy as np
import json
import traceback
import sys
from pyspark.sql import SparkSession
from pyspark import SparkFiles

def do(iter):
        def do_line(input):
          try:
            
             
          except:
            traceback.print_exc()
            return ""
     return (do_line(x) for x in iter)
if __name__ == "__main__":
    version = sys.argv[1]
    strategy = sys.argv[2]
    hadoop_path = ""
  
    spark = SparkSession.builder.getOrCreate()
    spark.conf.set("spark.sql.shuffle.partitions", 5000)
    spark.conf.set("spark.hadoopRDD.ignoreEmptySplits", True)
    spark.conf.set("spark.hadoopRDD.targetBytesInPartition", 67108864)
    sc = spark.sparkContext
    sc.setLogLevel("INFO")

    # 将hdfs的file 加入环境中 方便每台机器读取
    sc.addFile(path1)
    sc.addFile(path2)

    result = sc.textFile(path3).repartition(500).mapPartitions(do)
    result.saveAsTextFile(output_path)
上一篇 下一篇

猜你喜欢

热点阅读