spark-submit

2021-09-25  本文已影响0人  hehehehe
    print("cfg_file_base", Path(__file__).absolute())
    lists = os.listdir(".")
    print(lists)
    print(Path(__file__).parent.absolute())
    print(Path(__file__).parent.parent.absolute())
    cfg_file = Path(__file__).parent.parent.joinpath('cfg.yaml')
    print("cfg_file", cfg_file.absolute())

    cfg = yaml.load(Path('cfg.yaml').read_text(), Loader=yaml.FullLoader)

cfg_file_base /data/yarn/nm/usercache/root/appcache/application_1629353712286_3572/container_1629353712286_3572_01_000001/address_format_normal.py
['tmp',  'depend.zip', '__spark_conf__', 'cfg.yaml', 'python_dir', 'address_format_normal.py']
/data/yarn/nm/usercache/root/appcache/application_1629353712286_3572/container_1629353712286_3572_01_000001
/data/yarn/nm/usercache/root/appcache/application_1629353712286_3572/container_1629353712286_3572_01_000001
cfg_file /data/yarn/nm/usercache/root/appcache/application_1629353712286_3572/container_1629353712286_3572_01_000001/cfg.yaml
    cfg_file = Path('cfg.yaml')
    cfg = yaml.load(cfg_file.read_text(), Loader=yaml.FullLoader)
    print(cfg)

    spark = SparkSession.builder \
        .appName('appName') \
        .getOrCreate()

    file = open(cfg_file)
    readlines = file.readlines()
    for line in readlines:
        print(readlines)

    spark.stop()

submit_depend.sh

export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0
export PYSPARK_PYTHON=./python_dir/venv/bin/python3
export PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3

spark-submit --master yarn \
    --deploy-mode cluster \
    --driver-memory 30g \
    --num-executors 4 \
    --executor-cores 5 \
    --executor-memory 30g \
    --name hnApp \
    --archives hdfs:///user/sll/venv.zip#python_dir \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_dir/venv/bin/python3 \
    --conf spark.kryoserializer.buffer.max=256m \
    --conf spark.driver.maxResultSize=10g \
    --conf spark.sql.broadcastTimeout=600 \
    --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3 \
    --conf spark.yarn.appMasterEnv.LTP_MODEL_DIR=./python_dir/venv/lib/python3.7/site-packages \
    --files /data/sll/code/cfg.yaml \
    --py-files /data/sll/code/depend.zip \
    /data/sll/code/spark_test.py
                              

submit.sh

export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0
export PYSPARK_PYTHON=./python_dir/venv/bin/python3
export PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3

spark-submit --master yarn \
    --deploy-mode cluster \
    --driver-memory 30g \
    --num-executors 5 \
    --executor-cores 4 \
    --executor-memory 30g \
    --name hnApp \
    --archives hdfs:///user/sll/venv.zip#python_dir \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_dir/venv/bin/python3 \
    --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3 \
    --conf spark.yarn.appMasterEnv.LTP_MODEL_DIR=./python_dir/venv/lib/python3.7/site-packages \
    --conf spark.network.timeout=10000000 \
    --py-files /data/sll/code/prpcrypt_util.py,/data/sll/code/cfg.yaml \
     /data/sll/code/spark_test.py

代码用到的文件存放于
self.user_dict = './py3/venv/lib/python3.7/site-packages/userDict2.txt'

zip -r venv.zip venv/
hdfs dfs -put venv.zip /user/sll/
zip -r -q depend.zip depend/
True method

conda create --name venv python=3.7
zip -r venv.zip venv

cd  anaconda3/
cp -r bin ../venv/
cd /root/anaconda3/lib
cp -r python3.7 ../../venv/lib/

zip -r venv.zip venv/
hdfs dfs -put venv.zip /user/sll/


=================================================================

export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0
export PYSPARK_PYTHON=./python_dir/venv/bin/python3
export PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3



spark-submit --master yarn \
    --deploy-mode cluster \
    --driver-memory 30g \
    --num-executors 10 \
    --executor-cores 10 \
    --executor-memory 30g \
    --name hnApp \
    --jars /data/sll/code/jars/sedona-python-adapter-2.4_2.11-1.1.0-incubating.jar,/data/sll/code/jars/geotools-wrapper-geotools-24.1.jar \
    --archives hdfs:///user/sll/venv.zip#python_dir \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_dir/venv/bin/python3 \
    --conf spark.kryoserializer.buffer.max=256m \
    --conf spark.driver.maxResultSize=10g \
    --conf spark.sql.broadcastTimeout=600 \
    --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./python_dir/venv/bin/python3 \
    --conf spark.yarn.appMasterEnv.LTP_MODEL_DIR=./python_dir/venv/lib/python3.7/site-packages \
    --files /data/sll/code/cfg.yaml \
    --py-files /data/sll/code/depend.zip \
    /data/sll/code/sedona_poi_admin2.py                                                                        

import os
import sys
import json
from collections import OrderedDict
from pathlib import Path

project_path = Path(__file__).parent
print(project_path)
sys.path.insert(0, str(project_path))
sys.path.insert(0, str(project_path.joinpath('depend')))

from depend.address_parser import QueryParser


def address_split(address: str, queryParser):
    try:
        if address:
            return queryParser.recognise(address)['entities']
    except Exception as e:
        print("address_split:%s error:" % address)
    return None


def address_extract(address):
    queryParser = QueryParser()
    split_address = address_split(address, queryParser)
    # print(split_address)
    if not split_address:
        print(f"address_split error:{address}")
        return "", ""
    format_address, level_dict, index_dict = dict_to_format_address(split_address)
    # print(format_address)
    gate = level_dict.get("12")[0] if level_dict.get("12") else None
    place_num = level_dict.get("26")[0] if level_dict.get("26") else None
    return gate, place_num
    # return 1, 1


def dict_to_format_address(address_splits: list):
    if address_splits:
        address_splits_sorted = sorted(address_splits, key=lambda x: x['index'])
        format_address_list = []
        format_address_level_dict = OrderedDict()
        format_address_index_dict = OrderedDict()
        for address_split in address_splits_sorted:
            level = str(address_split['level'])
            format_address_list.append(f"{address_split['word']},{level}")
            format_address_index_dict[str(address_split['index'])] = (level, address_split['word'])
            if level in format_address_level_dict:
                format_address_level_dict[level].append(address_split['word'])
            else:
                format_address_level_dict[level] = [address_split['word']]
        return "|".join(format_address_list), format_address_level_dict, format_address_index_dict


def gate_extract_tuple(hn_id, address):
    print(hn_id, address)
    gate, place_num = address_extract(address)
    return hn_id, gate, place_num

def gate_extract_tuple(rows):
    queryParser = QueryParser()
    result = []
    for row in rows:
        gate, place_num = address_extract(row[1], queryParser)
        result.append([row[0], gate, place_num])
    return result

os.environ['SPARK_HOME'] = '/Users/xxx/Downloads/soft/spark-3.0.3-bin-hadoop2.7'
os.environ['PYTHONPATH'] = '/Users/xxx/Downloads/soft/spark-3.0.3-bin-hadoop2.7/python'
if __name__ == '__main__':
    # print(gate_extract_tuple((12, "河南省焦作市武陟县S104")))
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StringType, StructField

    from_database = "poi_product"
    from_user = "xxx"
    from_pw = "xxx"
    from_host = "192.168.160.12"
    from_port = "15450"
    jdbcUrl = f"jdbc:postgresql://{from_host}:{from_port}/{from_database}"
    connectionProperties = {
        "user": from_user,
        "password": from_pw,
        "driver": "org.postgresql.Driver"
    }

    spark = SparkSession.builder.appName('hnApp').getOrCreate()

    sql = "(select hn_id,address from poi_hn_edit_0826 limit 20) tmp"
    df = spark.read.jdbc(url=jdbcUrl, table=sql, properties=connectionProperties)
    df.show(2)

    rdd2 = df.rdd.map(lambda e: gate_extract_tuple(e['hn_id'], e['address']))
    # rdd2 = df.rdd.mapPartitions(gate_extract_tuple)
    gate_schema = StructType([
        StructField("hn_id", StringType(), True),
        StructField("gate", StringType(), True),
        StructField("place_num", StringType(), True)
    ])
    print(rdd2.take(2))
    df2 = spark.createDataFrame(rdd2, gate_schema)
    df2.show(2)

    df2.write.jdbc(url=jdbcUrl, table='hn_gate_2', mode='append', properties=connectionProperties)

    spark.stop()

上一篇下一篇

猜你喜欢

热点阅读