11 打包Spark应用程序
测试时,我们使用jupyter notebook可以方便开发调试程序。但是,当我们实际生产过程中,需要执行一个计划作业时,该作业每小时都在运行,jupyter notebook就工作不了了。如何去利用可以重用的模块形式去编写脚本,打包好我们的Spark应用程序,直至最后提交Spark作业用于生产,这是需要我们掌握的一项技能。
1 spark-submit命令
# 一般级别的,语法如下:
spark-submit [options] <python file> [app arguments]
提交作业到Spark的入口点(本地或集群)是spark-submit脚本,该脚本不仅可以提交作业,也可以终止作业或检查其状态。
spark-submit命令提供了一个统一的API把应用程序部署到各种Spark支持的集群管理器上(如YARN),从而免除了单独配置每个应用程序。
PySpark命令行参数
--master:用于设置主节点的URL参数。
(1)local 用于执行本地机器的代码。
如果你传递local参数,Spark会运行一个单一的线程(不会利用任何并行线程)。
在一个多核机器上,local[n]来为Spark指定一个具体使用的内核数,n指的是使用的内核数。
通过loacl[*]来制定运行和Spark机器内核一样多的复杂线程。
(2)spark://host:port 这是一个URL和一个Spark单机集群的端口(不运行任何作业调度,如Mesos或者Yarn)
(3)mesos://host:port 这是一个URL和一个部署在Mesos上的Spark集群端口
(4)yarn 作为一个负载均衡器,用于从运行Yarn的主节点提交作业。
--deploy-mode:允许你决定是否在本地(使用client)启动Spark驱动程序的参数,或者在集群内(使用cluster)的任意一台机器上启动。此参数默认值为client。
--name:应用程序的名字,如果在创建SparkSession时,以编程方式指定应用程序名称,那么来自命令行的参数会被重写。
--py-files : .py、.egg或者.zip文件的逗号分隔列表,包括Python应用程序。这些文件将被交付给每一个执行器来使用。
--file :命令给出一个逗号分隔的文件列表,这些文件将被交付到每一个执行器来使用。
--conf : 参数通过命令行动态地更改应用程序的配置。语法是:
<Spark property>=<value for the property> 。
例如:
--conf spark.local.dir=/opt/Spark2.2.0/
--conf spark.app.name=com.xxx.passengerflow.metro_jh
需要注意的是Spark有3个地方使用配置参数:
(1)最高优先级的是在SparkContext时,指定了SparkConf的参数获得最高优先权;
(2)第二优先权:spark-submit传递给的参数;
(3)第三优先权:conf/spark-default.conf文件中指定的参数。
--properties-file :配置文件,它应该有和conf/spark-defaults.conf文件相同的属性设置,也是可读的。
--driver-memory:指定应用程序在驱动程序上分配多少内存的参数。允许的值有一个语法限制,类似于1000M,2G。默认值为1024M。
--executor-memory:参数指定每个执行器上为应用程序分配多少内存。默认值为1G。
--help:展示帮助信息和退出
--verbose:在运行应用程序时打印附加调试信息。
--version:打印spark版本
--driver-cores:(在spark单机cluster或者Yarn上部署cluster模式下),允许指定驱动程序的内核数量(默认值为1)
--kill:将完成的过程赋予submission_id
--status:如果指定了该命令,它将请求指定的应用程序的状态。
--total-executor-cores:(在spark单机或Mesos client部署模式下)该参数会为所有执行器(不是每一个)请求指定的内核数量。
在YARN集群提交时可以指定的:
--queue:该参数指定了YARN上的队列,以便于将该作业提交到队列(默认值是default)
--num-executors:指定需要多个执行器来请求该作业的参数。如果启动了动态分配,则执行器的初始数量至少是指定的数量。
2 以编程方式部署应用程序
如何创建和配置SparkSession?
如何对Spark使用外部模块?
2.1 配置你的SparkSession(或者sc)
以编程方式使用Jupyter和提交作业的主要区别是,你必须创建Spark context上下文背景环境,而使用Jupyter运行Spark,上下文背景会自动开始。
2.2 创建SparkSession
Spark2.2.0
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('CalculatingGeoDistances') \
.getOrCreate()
print('Session created')
如果此时你想创建一个SparkContext,可以直接使用:
sc = spark.SparkContext
Spark1.6.2
from pyspark import SparkContext
sc = SparkContext \
.builder \
.appName('CalculatingGeoDistances') \
.getOrCreate()
print('sc created')
2.3 模块化代码(重要重要!!!)
以模块化的形式构建代码,以便于以后重用该代码是一件值得做的事情。
例子:建立一个模块,并且在数据集上做一些计算,计算出上车和下车位置的直线距离(英里),并且将英里转换为公里。
(1)模块结构
在Python包的结构中,在顶层有一个setup.py文件,所以可以打包我们的模块。
setup.py文件内容如下:
from setuptools import setup
setup(
name='PySparkUtilities',
version='0.1dev',
packages=['utilities', 'utilities/converters'],
license='''
Creative Commons
Attribution-Noncommercial-Share Alike license''',
long_description='''
An example of how to package code for PySpark'''
)
关于如何定义其他项目的setup.py文件,可以参考:
https://pythonhosted.org/an_example_pypi_project/setuptools.html
init.py文件内容如下:
from .geoCalc import geoCalc
__all__ = ['geoCalc','converters']
(2)计算两点之间的距离
该代码位于该模块的geoCalc.py文件中
import math
class geoCalc(object):
@staticmethod
def calculateDistance(p1, p2):
'''
calculates the distance using Haversine formula
'''
R = 3959 # earth's radius in miles
# get the coordinates
lat1, lon1 = p1[0], p1[1]
lat2, lon2 = p2[0], p2[1]
# convert to radians
deltaLat_radians = math.radians(lat2-lat1)
deltaLon_radians = math.radians(lon2-lon1)
lat1_radians = math.radians(lat1)
lat2_radians = math.radians(lat2)
# apply the formula
hav = math.sin(deltaLat_radians / 2.0) * \
math.sin(deltaLat_radians / 2.0) + \
math.sin(deltaLon_radians / 2.0) * \
math.sin(deltaLon_radians / 2.0) * \
math.cos(lat1_radians) * \
math.cos(lat2_radians)
dist = 2 * R * math.asin(math.sqrt(hav))
return dist
if __name__ == '__main__':
p1 = {'address': '301 S Jackson St, Seattle, WA 98104',
'lat': 47.599200,
'long': -122.329841}
p2 = {'address': 'Thunderbird Films Inc 533, Smithe St #401, Vancouver, BC V6B 6H1, Canada',
'lat': 49.279688,
'long': -123.119190}
print(geoCalc.calculateDistance((p1['lat'], p1['long']), (p2['lat'], p2['long'])))
calculateDistance()是geoCalc类的静态方法,它需要两个地理位置,表示为一个元祖或者一个具有两个元素的列表(按照顺序排列的维度和经度),并且使用Haversine公式计算距离(英里)。
(3)转变距离单位
为了便于使用,作为converter实现的任何类都应该公开类似的窗口,查看base.py文件内容:
from abc import ABCMeta, abstractmethod
class BaseConverter(metaclass=ABCMeta):
@staticmethod
@abstractmethod
def convert(f, t):
raise NotImplementedError
if __name__ == '__main__':
i = BaseConverter()
distance.py内容:
from ..base import BaseConverter
class metricImperial(BaseConverter):
pass
@staticmethod
def convert(f, t):
conversionTable = {
'in': {
'mm': 25.4, 'cm': 2.54, 'm': 0.0254,
'km': 0.0000254
}, 'ft': {
'mm': 304.8, 'cm': 30.48, 'm': 0.3048,
'km': 0.0003048
}, 'yd': {
'mm': 914.4, 'cm': 91.44, 'm': 0.9144,
'km': 0.0009144
}, 'mile': {
'mm': 1609344, 'cm': 160934.4, 'm': 1609.344,
'km': 1.609344
}
}
f_val, f_unit = f.split(' ')
f_val = float(f_val)
if f_unit in conversionTable.keys():
if t in conversionTable[f_unit].keys():
conv = 1 / conversionTable[f_unit][t]
else:
raise KeyError('Key {0} not found...' \
.format(t))
elif t in conversionTable.keys():
if f_unit in conversionTable[t].keys():
conv = conversionTable[t][f_unit]
else:
raise KeyError('Key {0} not found...' \
.format(f_unit))
else:
raise KeyError('Neither {0} nor {1} key found'\
.format(t, f_unit))
return f_val / conv
if __name__ == '__main__':
f = metricImperial()
print(f.convert('10 mile', 'km'))
(4)打包:创建一个egg文件
PySpark文档指出,可以用逗号来分隔传递.py文件给spark-submit脚本。其实最方便的是把模块打包进一个.zip或者一个.egg。当setup.py文件方便使用时,调用additionalCode文件夹里的内容:
python setup.py bdist_egg
可以看到3个文件夹:PySparkUtilities.egg-info、build和dist
dist文件夹有:PySparkUtilities-0.1.dev0-py3.5.egg
(5)Spark中用户定义函数
为了对在PySpark中的DataFrame执行操作,有两个选择:使用内置函数来处理数据(大多数情况下都足以达到你的需求,且作为更高性能的代码而被推荐使用);但是有的时候需要自定义函数去实现若干功能选项。
为了定义一个UDF,必须把Python函数封装在.udf()方法中,并且定义它的返回值类型。以下是我们如何在脚本中实现
calculatingGeoDistance.py文件
import utilities.geoCalc as geo
from utilities.converters import metricImperial
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as func
def geoEncode(spark):
# read the data in
uber = spark.read.csv(
'uber_data_nyc_2016-06_3m_partitioned.csv',
header=True,
inferSchema=True
)\
.repartition(4) \
# .select('VendorID','tpep_pickup_datetime', 'pickup_longitude', 'pickup_latitude','dropoff_longitude','dropoff_latitude','total_amount')
# prepare the UDFs
getDistance = func.udf(
lambda lat1, long1, lat2, long2:
geo.calculateDistance(
(lat1, long1),
(lat2, long2)
)
)
convertMiles = func.udf(lambda m:
metricImperial.convert(str(m) + ' mile', 'km'))
# create new columns
uber = uber.withColumn(
'miles',
getDistance(
func.col('pickup_latitude'),
func.col('pickup_longitude'),
func.col('dropoff_latitude'),
func.col('dropoff_longitude')
)
)
uber = uber.withColumn(
'kilometers',
convertMiles(func.col('miles')))
# print 10 rows
# uber.show(10)
# save to csv (partitioned)
uber.write.csv(
'uber_data_nyc_2016-06_new.csv',
mode='overwrite',
header=True,
compression='gzip'
)
if __name__ == '__main__':
spark = SparkSession \
.builder \
.appName('CalculatingGeoDistances') \
.getOrCreate()
print('Session created')
try:
geoEncode(spark)
finally:
spark.stop()
2.4 提交作业
可以在命令行键入以下命令:
./launch_spark_submit.sh \
--master local[4] \
--py-files additionalCode/dist/PySparkUtilities-0.1.dev0-
py3.5.egg \
calculatingGeoDistance.py
其中launch_spark_submit.sh是spark-submit命令的封装,通过对jupyter设置了PYSPARK_DRIVER_PYTHON系统变量:
#!/bin/bash
unset PYSPARK_DRIVER_PYTHON
spark-submit $*
export PYSPARK_DRIVER_PYTHON=jupyter
2.5 实际项目中,spark-submit的设置和打包
(1)命令行模式
集群模式:
读取的文件必须放在HDFS上:
spark-submit --master yarn \
--deploy-mode cluster \
--num-executors 25 \
--executor-cores 2 \
--driver-memory 4g \
--executor-memory 4g \
--conf spark.broadcast.compress=true \
--jars "/data/spark/ojdbc6-11.2.0.3.jar" com.meihuichina.passengerflow.grid1km_laccell_grid100m.py > /home/ydzhao/log/.out 2>&1
(2)单节点模式
单节点启动PySpark Shell
(1)pyspark --jars "/data/spark/ojdbc6-11.2.0.3.jar"
(2)pyspark --jars "/data/spark/ojdbc6-11.2.0.3.jar" \
--num-executors 25 \
--executor-cores 2 \
--driver-memory 4g \
--executor-memory 8g
单节点提交任务
spark-submit --jars "/data/spark/ojdbc6-11.2.0.3.jar" /pyspark_app/test.py
./bin/spark-submit --master lcoal[*] /home/ydzhao/pyspark_app/com.meihuichina.passengerflow.grid1km_laccell_grid100m.py
(3)编辑打包成test.sh Shell脚本
#!/usr/bin/env bash
spark-submit --master yarn \
--deploy-mode cluster \
--num-executors 25 \
--executor-cores 2 \
--driver-memory 4g \
--executor-memory 4g \
--conf spark.broadcast.compress=true \
--conf spark.yarn.executor.memoryOverhead=900 \
--conf spark.sql.shuffle.partitions=20 \
--jars "/data/spark/ojdbc6-11.2.0.3.jar" com.meihuichina.passengerflow.grid1km_laccell_grid100m.py > /home/ydzhao/log/.out 2>&1
# ./test.sh
# 给test.sh相关权限
chmod u+x test.sh
运行test.sh
./test.sh
2.6 监控执行
image.pngimage.png