pyspark学习

11 打包Spark应用程序

2018-01-20  本文已影响253人  7125messi

测试时,我们使用jupyter notebook可以方便开发调试程序。但是,当我们实际生产过程中,需要执行一个计划作业时,该作业每小时都在运行,jupyter notebook就工作不了了。如何去利用可以重用的模块形式去编写脚本,打包好我们的Spark应用程序,直至最后提交Spark作业用于生产,这是需要我们掌握的一项技能。

1 spark-submit命令

# 一般级别的,语法如下:
spark-submit [options] <python file> [app arguments]

提交作业到Spark的入口点(本地或集群)是spark-submit脚本,该脚本不仅可以提交作业,也可以终止作业或检查其状态。
spark-submit命令提供了一个统一的API把应用程序部署到各种Spark支持的集群管理器上(如YARN),从而免除了单独配置每个应用程序。

PySpark命令行参数

--master:用于设置主节点的URL参数。
(1)local 用于执行本地机器的代码。
如果你传递local参数,Spark会运行一个单一的线程(不会利用任何并行线程)。
在一个多核机器上,local[n]来为Spark指定一个具体使用的内核数,n指的是使用的内核数。
通过loacl[*]来制定运行和Spark机器内核一样多的复杂线程。
(2)spark://host:port 这是一个URL和一个Spark单机集群的端口(不运行任何作业调度,如Mesos或者Yarn)
(3)mesos://host:port 这是一个URL和一个部署在Mesos上的Spark集群端口
(4)yarn 作为一个负载均衡器,用于从运行Yarn的主节点提交作业。

--deploy-mode:允许你决定是否在本地(使用client)启动Spark驱动程序的参数,或者在集群内(使用cluster)的任意一台机器上启动。此参数默认值为client。

--name:应用程序的名字,如果在创建SparkSession时,以编程方式指定应用程序名称,那么来自命令行的参数会被重写。

--py-files : .py、.egg或者.zip文件的逗号分隔列表,包括Python应用程序。这些文件将被交付给每一个执行器来使用。

--file :命令给出一个逗号分隔的文件列表,这些文件将被交付到每一个执行器来使用。

--conf : 参数通过命令行动态地更改应用程序的配置。语法是:

<Spark property>=<value for the property> 。
例如:
--conf spark.local.dir=/opt/Spark2.2.0/ 
--conf spark.app.name=com.xxx.passengerflow.metro_jh

需要注意的是Spark有3个地方使用配置参数:
(1)最高优先级的是在SparkContext时,指定了SparkConf的参数获得最高优先权;
(2)第二优先权:spark-submit传递给的参数;
(3)第三优先权:conf/spark-default.conf文件中指定的参数。

--properties-file :配置文件,它应该有和conf/spark-defaults.conf文件相同的属性设置,也是可读的。

--driver-memory:指定应用程序在驱动程序上分配多少内存的参数。允许的值有一个语法限制,类似于1000M,2G。默认值为1024M。

--executor-memory:参数指定每个执行器上为应用程序分配多少内存。默认值为1G。

--help:展示帮助信息和退出

--verbose:在运行应用程序时打印附加调试信息。

--version:打印spark版本

--driver-cores:(在spark单机cluster或者Yarn上部署cluster模式下),允许指定驱动程序的内核数量(默认值为1)

--kill:将完成的过程赋予submission_id

--status:如果指定了该命令,它将请求指定的应用程序的状态。

--total-executor-cores:(在spark单机或Mesos client部署模式下)该参数会为所有执行器(不是每一个)请求指定的内核数量。

在YARN集群提交时可以指定的:

--queue:该参数指定了YARN上的队列,以便于将该作业提交到队列(默认值是default)

--num-executors:指定需要多个执行器来请求该作业的参数。如果启动了动态分配,则执行器的初始数量至少是指定的数量。

2 以编程方式部署应用程序

如何创建和配置SparkSession?

如何对Spark使用外部模块?

2.1 配置你的SparkSession(或者sc)

以编程方式使用Jupyter和提交作业的主要区别是,你必须创建Spark context上下文背景环境,而使用Jupyter运行Spark,上下文背景会自动开始。

2.2 创建SparkSession

Spark2.2.0

from pyspark.sql import SparkSession
spark = SparkSession \
              .builder \
              .appName('CalculatingGeoDistances') \
              .getOrCreate()
print('Session created')
如果此时你想创建一个SparkContext,可以直接使用:
sc = spark.SparkContext

Spark1.6.2

from pyspark import SparkContext
sc = SparkContext \
              .builder \
              .appName('CalculatingGeoDistances') \
              .getOrCreate()
print('sc created')

2.3 模块化代码(重要重要!!!)

以模块化的形式构建代码,以便于以后重用该代码是一件值得做的事情。
例子:建立一个模块,并且在数据集上做一些计算,计算出上车和下车位置的直线距离(英里),并且将英里转换为公里。
(1)模块结构

image.png image.png

在Python包的结构中,在顶层有一个setup.py文件,所以可以打包我们的模块。

setup.py文件内容如下:

from setuptools import setup

setup(
    name='PySparkUtilities',
    version='0.1dev',
    packages=['utilities', 'utilities/converters'],
    license='''
        Creative Commons 
        Attribution-Noncommercial-Share Alike license''',
    long_description='''
        An example of how to package code for PySpark'''
)

关于如何定义其他项目的setup.py文件,可以参考:
https://pythonhosted.org/an_example_pypi_project/setuptools.html

image.png
init.py文件内容如下:
from .geoCalc import geoCalc
__all__ = ['geoCalc','converters']

(2)计算两点之间的距离
该代码位于该模块的geoCalc.py文件中

import math

class geoCalc(object):
    @staticmethod
    def calculateDistance(p1, p2):
        '''
            calculates the distance using Haversine formula
        '''
        R = 3959 # earth's radius in miles

        # get the coordinates
        lat1, lon1 = p1[0], p1[1]
        lat2, lon2 = p2[0], p2[1]

        # convert to radians
        deltaLat_radians = math.radians(lat2-lat1)
        deltaLon_radians = math.radians(lon2-lon1)

        lat1_radians = math.radians(lat1)
        lat2_radians = math.radians(lat2)

        # apply the formula
        hav = math.sin(deltaLat_radians / 2.0) * \
            math.sin(deltaLat_radians / 2.0) + \
            math.sin(deltaLon_radians / 2.0) * \
            math.sin(deltaLon_radians / 2.0) * \
            math.cos(lat1_radians) * \
            math.cos(lat2_radians) 

        dist = 2 * R * math.asin(math.sqrt(hav)) 

        return dist

if __name__ == '__main__':
    p1 = {'address': '301 S Jackson St, Seattle, WA 98104',
    'lat': 47.599200, 
    'long': -122.329841}

    p2 = {'address': 'Thunderbird Films Inc 533, Smithe St #401, Vancouver, BC V6B 6H1, Canada',
        'lat': 49.279688, 
        'long': -123.119190}

    print(geoCalc.calculateDistance((p1['lat'], p1['long']), (p2['lat'], p2['long'])))

calculateDistance()是geoCalc类的静态方法,它需要两个地理位置,表示为一个元祖或者一个具有两个元素的列表(按照顺序排列的维度和经度),并且使用Haversine公式计算距离(英里)。

(3)转变距离单位
为了便于使用,作为converter实现的任何类都应该公开类似的窗口,查看base.py文件内容:

from abc import ABCMeta, abstractmethod

class BaseConverter(metaclass=ABCMeta):
    @staticmethod
    @abstractmethod
    def convert(f, t):
        raise NotImplementedError

if __name__ == '__main__':
    i = BaseConverter()

distance.py内容:

from ..base import BaseConverter

class metricImperial(BaseConverter):
    pass

    @staticmethod
    def convert(f, t):
        conversionTable = {
            'in': {  
                'mm': 25.4,    'cm': 2.54,     'm': 0.0254, 
                'km': 0.0000254
            }, 'ft': {  
                'mm': 304.8,   'cm': 30.48,    'm': 0.3048,
                'km': 0.0003048
            }, 'yd': {  
                'mm': 914.4,   'cm': 91.44,    'm': 0.9144,
                'km': 0.0009144
            }, 'mile': {    
                'mm': 1609344, 'cm': 160934.4, 'm': 1609.344,
                'km': 1.609344
            }   
        }

        f_val, f_unit = f.split(' ')
        f_val = float(f_val)

        if f_unit in conversionTable.keys():
            if t in conversionTable[f_unit].keys():
                conv = 1 / conversionTable[f_unit][t]
            else:
                raise KeyError('Key {0} not found...' \
                    .format(t))
        elif t in conversionTable.keys():
            if f_unit in conversionTable[t].keys():
                conv = conversionTable[t][f_unit]
            else:
                raise KeyError('Key {0} not found...' \
                    .format(f_unit))
        else:
            raise KeyError('Neither {0} nor {1} key found'\
                .format(t, f_unit))

        return f_val / conv

if __name__ == '__main__':
    f = metricImperial()
    print(f.convert('10 mile', 'km'))

(4)打包:创建一个egg文件
PySpark文档指出,可以用逗号来分隔传递.py文件给spark-submit脚本。其实最方便的是把模块打包进一个.zip或者一个.egg。当setup.py文件方便使用时,调用additionalCode文件夹里的内容:

python setup.py bdist_egg

可以看到3个文件夹:PySparkUtilities.egg-info、build和dist
dist文件夹有:PySparkUtilities-0.1.dev0-py3.5.egg

(5)Spark中用户定义函数

为了对在PySpark中的DataFrame执行操作,有两个选择:使用内置函数来处理数据(大多数情况下都足以达到你的需求,且作为更高性能的代码而被推荐使用);但是有的时候需要自定义函数去实现若干功能选项。

为了定义一个UDF,必须把Python函数封装在.udf()方法中,并且定义它的返回值类型。以下是我们如何在脚本中实现
calculatingGeoDistance.py文件

import utilities.geoCalc as geo
from utilities.converters import metricImperial

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as func

def geoEncode(spark):
    # read the data in
    uber = spark.read.csv(
        'uber_data_nyc_2016-06_3m_partitioned.csv', 
        header=True, 
        inferSchema=True
        )\
        .repartition(4) \
        # .select('VendorID','tpep_pickup_datetime', 'pickup_longitude', 'pickup_latitude','dropoff_longitude','dropoff_latitude','total_amount')

    # prepare the UDFs
    getDistance = func.udf(
        lambda lat1, long1, lat2, long2: 
            geo.calculateDistance(
                (lat1, long1),
                (lat2, long2)
            )
        )

    convertMiles = func.udf(lambda m: 
        metricImperial.convert(str(m) + ' mile', 'km'))

    # create new columns
    uber = uber.withColumn(
        'miles', 
            getDistance(
                func.col('pickup_latitude'),
                func.col('pickup_longitude'), 
                func.col('dropoff_latitude'), 
                func.col('dropoff_longitude')
            )
        )

    uber = uber.withColumn(
        'kilometers', 
        convertMiles(func.col('miles')))

    # print 10 rows
    # uber.show(10)

    # save to csv (partitioned)
    uber.write.csv(
        'uber_data_nyc_2016-06_new.csv',
        mode='overwrite',
        header=True,
        compression='gzip'
    )

if __name__ == '__main__':
    spark = SparkSession \
        .builder \
        .appName('CalculatingGeoDistances') \
        .getOrCreate()

    print('Session created')

    try:
        geoEncode(spark)

    finally:
        spark.stop()

2.4 提交作业

可以在命令行键入以下命令:

./launch_spark_submit.sh \
--master local[4] \
--py-files additionalCode/dist/PySparkUtilities-0.1.dev0-
py3.5.egg \
calculatingGeoDistance.py

其中launch_spark_submit.sh是spark-submit命令的封装,通过对jupyter设置了PYSPARK_DRIVER_PYTHON系统变量:

#!/bin/bash
unset PYSPARK_DRIVER_PYTHON
spark-submit $*
export PYSPARK_DRIVER_PYTHON=jupyter

2.5 实际项目中,spark-submit的设置和打包

(1)命令行模式

集群模式:

读取的文件必须放在HDFS上:

spark-submit --master yarn \
             --deploy-mode cluster \
             --num-executors 25 \
             --executor-cores 2 \
             --driver-memory 4g \
             --executor-memory 4g \
             --conf spark.broadcast.compress=true \
             --jars "/data/spark/ojdbc6-11.2.0.3.jar" com.meihuichina.passengerflow.grid1km_laccell_grid100m.py > /home/ydzhao/log/.out 2>&1

(2)单节点模式

单节点启动PySpark Shell
(1)pyspark --jars "/data/spark/ojdbc6-11.2.0.3.jar"

(2)pyspark --jars "/data/spark/ojdbc6-11.2.0.3.jar" \
        --num-executors 25 \
        --executor-cores 2 \
        --driver-memory 4g \
        --executor-memory 8g
单节点提交任务
spark-submit --jars "/data/spark/ojdbc6-11.2.0.3.jar" /pyspark_app/test.py

./bin/spark-submit --master lcoal[*] /home/ydzhao/pyspark_app/com.meihuichina.passengerflow.grid1km_laccell_grid100m.py

(3)编辑打包成test.sh Shell脚本

#!/usr/bin/env bash

spark-submit --master yarn \
             --deploy-mode cluster \
             --num-executors 25 \
             --executor-cores 2 \
             --driver-memory 4g \
             --executor-memory 4g \
             --conf spark.broadcast.compress=true \
             --conf spark.yarn.executor.memoryOverhead=900 \
             --conf spark.sql.shuffle.partitions=20 \
             --jars "/data/spark/ojdbc6-11.2.0.3.jar" com.meihuichina.passengerflow.grid1km_laccell_grid100m.py > /home/ydzhao/log/.out 2>&1
# ./test.sh

# 给test.sh相关权限
chmod u+x test.sh

运行test.sh

./test.sh

2.6 监控执行

image.png
image.png
上一篇下一篇

猜你喜欢

热点阅读