spark任务版本管理和部署
2019-05-14 本文已影响0人
王金松
把代码上传到git
git add ./
git commit
git push git://....git dev
发起merge请求
把需要部署进行下载
克隆指定的分支:git clone -b dev git://....git
编译
build.sh
#!/bin/bash
###############################################################################
#编译脚本的原理是将编译结果放到output目录中,这个样例模版提供一个产生
#jar包的最基本的编译脚本,对于特殊的需求请酌情考虑
#1、需要有一个control文件在工作目录的根目录
#2、该脚本支持参数化,参数将传入build_package函数(内容为最终执行的编译命令)
#,用$1,$2....表示,第1,2...个参数
###############用户修改部分################
readonly PACKAGE_DIR_NAME="" #如果在pom文件中定义了模块,请设置该变量,可选项
readonly PACKAGE_JAR_NAME="*.jar" #定义产出的jar包名,必填项
#最终的抽包路径为$OUTPUT
###########################################
if [[ "${PACKAGE_JAR_NAME}" == "" ]];then
echo "Please set "PACKAGE_JAR_NAME" value"
exit 1
fi
function set_work_dir
{
readonly OUTPUT=$(pwd)/output
readonly WORKSPACE_DIR=$(pwd)
}
#清理编译构建目录操作
function clean_before_build
{
cd ${WORKSPACE_DIR}
mvn clean
rm -rf ${OUTPUT}
}
#实际的编译命令
#这个函数中可使用$1,$2...获取第1,2...个参数
function build_package()
{
cd ${WORKSPACE_DIR}
mvn clean package -Dmaven.test.skip=true || return 1
}
#建立最终发布的目录
function build_dir
{
mkdir ${OUTPUT} || return 1
mkdir ${OUTPUT}/bin || return 1
mkdir ${OUTPUT}/logs || return 1
mkdir ${OUTPUT}/conf || return 1
mkdir ${OUTPUT}/lib || return 1
mkdir ${OUTPUT}/pid || return 1
}
function dir_not_empty()
{
if [[ ! -d $1 ]];then
return 1
fi
if [[ $(ls $1|wc -l) -eq 0 ]];then
return 1
fi
return 0
}
#拷贝编译结果到发布的目录
function copy_result
{
cd ${WORKSPACE_DIR}
#cp -r ./control ${OUTPUT}/bin || return 1
cp -r ./target/${PACKAGE_JAR_NAME} ${OUTPUT}/lib || return 1
cp -rf ./bin/* ${OUTPUT}/bin || return 1
cp -rf ./conf/* ${OUTPUT}/conf || return 1
#如果有其他需要拷贝的文件,可以在这里添加
}
#执行
function main()
{
cd $(dirname $0)
set_work_dir
echo "At: "$(date "+%Y-%m-%d %H:%M:%S") 'Cleaning...'
clean_before_build || exit 1
echo "At: "$(date "+%Y-%m-%d %H:%M:%S") 'Clean completed'
echo
echo "At: "$(date "+%Y-%m-%d %H:%M:%S") 'Building...'
build_package $@ || exit 1
echo "At: "$(date "+%Y-%m-%d %H:%M:%S") 'Build completed'
echo
echo "At: "$(date "+%Y-%m-%d %H:%M:%S") 'Making dir...'
build_dir || exit 1
echo "At: "$(date "+%Y-%m-%d %H:%M:%S") 'Make completed'
echo
echo "At: "$(date "+%Y-%m-%d %H:%M:%S") 'Copy result to publish dir...'
copy_result || exit 1
echo "At: "$(date "+%Y-%m-%d %H:%M:%S") 'Copy completed'
echo
exit 0
}
main $@
运行任务
control
source ~/.bash_profile
PRO_NAME=spark_online
PRO_BIN=$(cd `dirname $0`; pwd)
PRO_HOME=$(dirname ${PRO_BIN})
PRO_PID=${PRO_HOME}/pid/pro.pid
#PRO_LOG=${PRO_HOME}/logs/spark.log
PRO_LOG=$(dirname ${PRO_HOME})/log/spark.log
OPTIONS_DIR=${PRO_HOME}/bin/start.options
parse_spark_options() {
if [ -f "$1" ]; then
echo "$(grep "^-" "$1" | grep -v "mainjar" | grep -v "param" | tr '\n' ' ')"
fi
}
function get_other_options() {
MAINJAR=`sed '/^--mainjar=/!d;s/.*=//' ${OPTIONS_DIR}`
PARAMS=`sed '/^--params=/!d;s/.*=//' ${OPTIONS_DIR}`
}
function start() {
SPARK=$(which spark-submit)
RUNNING_OPTIONS="$(parse_spark_options "${OPTIONS_DIR}")$RUNNING_OPTIONS"
get_other_options
local pid
if [[ -f "${PRO_PID}" ]]; then
pid=$(cat ${PRO_PID})
if kill -0 ${pid} >/dev/null 2>&1; then
echo "${PRO_NAME} is already running"
return 0;
fi
fi
exec "$SPARK" $RUNNING_OPTIONS ${MAINJAR} ${PARAMS} >> ${PRO_LOG} 2>&1 &
pid=$!
echo $pid
if [[ -z "${pid}" ]]; then
echo "${PRO_NAME} start error"
return 1;
else
echo "${PRO_NAME} start success"
echo ${pid} > ${PRO_PID}
fi
}
function stop() {
local pid
if [[ ! -f "${PRO_PID}" ]]; then
echo "${PRO_NAME} is not running"
else
pid=$(cat ${PRO_PID})
if [[ -z "${pid}" ]]; then
echo "${PRO_NAME} is not running"
else
wait_for_schedule_to_die $pid
$(rm -f ${PRO_PID})
echo "${PRO_NAME} stop success"
fi
fi
}
function wait_for_schedule_to_die() {
local pid
pid=$1
$(kill -9 ${pid} > /dev/null 2> /dev/null)
}
function find_schedule_process() {
local pid
if [[ -f "${PRO_PID}" ]]; then
pid=$(cat ${PRO_PID})
if ! kill -0 ${pid} > /dev/null 2>&1; then
echo "${PRO_NAME} running but process is dead"
return 1
else
echo "${PRO_NAME} is running"
fi
else
echo "${PRO_NAME} is not running"
return 1
fi
}
case "${1}" in
start)
start
;;
stop)
stop
;;
status|health|checkhealth)
find_schedule_process
;;
restart)
stop && start
;;
*)
echo ${USAGE}
esac
start.options
## Spark start configuration
################################################################
## resource settings
################################################################
##
## --num-executors NUM Number of executors to launch (Default: 2).
## --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
## --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
## --total-executor-cores NUM Total cores for all executors. Spark standalone must setting!!!
## --class CLASS_NAME Your application's main class (for Java / Scala apps).
## --jars running jar dir
##
################################################################
#--executor-cores=1
#--executor-memory=1g
#--driver-memory=1g
#--total-executor-cores=5
################################################################
## running pro settings
################################################################
##
## --class CLASS_NAME Your application's main class (for Java / Scala apps).
## --mainjar running jar dir
##
################################################################
#--class=com.jd.Waf2CSA
#--mainjar=./lib/WafDistinct-1.0-SNAPSHOT-jar-with-dependencies.jar
#--params=out ./conf/waf.conf
--class=com.jd.sa.job.RiskModelCompute
--mainjar=./lib/aggjob-1.0-SNAPSHOT-jar-with-dependencies.jar
--params=./conf/dev_agg.conf
################################################################
## Expert settings
################################################################
##
## --files FILES Comma-separated list of files to be placed in the working directory of each executor. File paths of these files
## in executors can be accessed via SparkFiles.get(fileName).
## --conf PROP=VALUE Arbitrary Spark configuration property.
## you understand what you are doing
##
################################################################
# --files a.txt
# --conf spark.memory.Fraction=0.8 \
--conf spark.default.parallelism=100
--conf spark.es.nodes=127.0.0.1
--conf spark.es.port=9200
--conf spark.streaming.concurrentJobs=1
--conf spark.streaming.kafka.maxRatePerPartition=1000
--conf spark.driver.port=10000
--conf spark.es.batch.size.entries=300
--conf spark.es.batch.size.bytes=2mb
--conf spark.driver.cores=4
--conf spark.driver.memory=16g
--conf spark.executor.cores=4
--conf spark.executor.memory=16g
--conf spark.streaming.kafka.maxRatePerPartition=1000
--conf spark.cores.max=8
整体目录
[admin@hostname output]$ tree
.
├── bin
│ ├── control
│ ├── setup_env.sh
│ ├── start.options
│ ├── start.sh
│ ├── waf2csa-online.sh
│ └── waf2csa.sh
├── conf
│ ├── agg.conf
│ ├── attack.conf
│ ├── attack-es.conf
│ ├── dev_agg.conf
│ ├── dga-online.conf
│ ├── dga-online-pre.conf
│ ├── dga-test.conf
│ ├── dga-test-pre.conf
│ ├── flow_agg.conf
│ ├── risk.conf
│ ├── waf.conf
│ └── waf-online.conf
├── default_env.sh
├── deploy_config.json
├── lib
│ ├── aggjob-1.0-SNAPSHOT.jar
│ ├── aggjob-1.0-SNAPSHOT-jar-with-dependencies.jar
│ ├── ml-1.0.0.jar
│ ├── ml-1.0.0-jar-with-dependencies.jar
│ ├── WafDistinct-1.0-SNAPSHOT.jar
│ └── WafDistinct-1.0-SNAPSHOT-jar-with-dependencies.jar
├── logs
├── pid
│ └── pro.pid
└── spark-warehouse