扣丁学堂解析PySpark如何设置worker的python命令
今天扣丁学堂Python培训老师给大家介绍一下关于研究spark-deep-learning项目,来回顾一下Pyspark的知识,下面我们要求来看那一下吧,希望能够对本文的读者有所帮助。
问题描述
关于PySpark的基本机制我就不讲太多,你google搜索“PySpark原理”就会有不少还不错的文章。我这次是遇到一个问题,因为我原先安装了python2.7,python3.6。后面为了方便我在我的电脑上使用virtualenv来做环境隔离,这个时候就发生一个比较诡异的事情:
在driver端能够正常使用PIL图片处理模块,但是executor端则不行。那显然是我在~/.bash_profile的配置在executor启动pythonworker时没有生效,程序依然走了我早先安装的python2.7,而早先的2.7里我没有安装PIL。那么应该怎么解决这个问题呢?
Python里的RDD和JVM的RDD如何进行关联
要解答上面的问题,核心是要判定JVM里的PythonRunner启动pythonworker时,python的地址是怎么指定的。
我们以pythonrdd里的map作为起点,
defmap(self,f,preservesPartitioning=False):
deffunc(_,iterator):
returnmap(f,iterator)
returnself.mapPartitionsWithIndex(func,preservesPartitioning)
进入self.mapPartitionsWithIndex:
defmapPartitionsWithIndex(self,f,preservesPartitioning=False):
returnPipelinedRDD(self,f,preservesPartitioning)
可以看到PipelinedRDD,进入PipelinedRDD._jrdd里,可以看到:
wrapped_func=_wrap_function(self.ctx,self.func,self._prev_jrdd_deserializer,
self._jrdd_deserializer,profiler)
python_rdd=self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),wrapped_func,
self.preservesPartitioning)
self._jrdd_val=python_rdd.asJavaRDD()
这里和JVM里的PythonRDD建立了联系。进入_wrap_function:
pickled_command,broadcast_vars,env,includes=_prepare_for_python_RDD(sc,command)
returnsc._jvm.PythonFunction(bytearray(pickled_command),env,includes,sc.pythonExec,sc.pythonVer,broadcast_vars,sc._javaAccumulator)
我们看到了sc.pythonExec对象,这个是传入到PythonRDD里的python命令。为了看的更清楚,我们看看sc.pythonExec的申明:
self.pythonExec=os.environ.get("PYSPARK_PYTHON",'python')
也就是你在很多文档中看到的,通过设置PYSPARK_PYTHON变量来设置启用哪个python。那么pythonExec是JVM里是怎么用的呢?
private[spark]classPythonRDD(
parent:RDD[_],
func:PythonFunction,
preservePartitoning:Boolean)
extendsRDD[Array[Byte]](parent){
PythonRDD是在python中通过_jvm对象在JVM里创建的,里面哟给重要的对象是PythonFunction,这个PythonFunction就是wrapped_func,wrapped_func里包含了env,pythonExec等。PythonRDD的compute方法里会调用PythonRunner的compute方法:
valrunner=PythonRunner(func,bufferSize,reuse_worker)
runner.compute(firstParent.iterator(split,context),split.index,context)
上面的func其实就是PythonFunction,在PythonRunner里你可以看到:
//AllthePythonfunctionsshouldhavethesameexec,versionandenvvars.
privatevalenvVars=funcs.head.funcs.head.envVars
privatevalpythonExec=funcs.head.funcs.head.pythonExec
privatevalpythonVer=funcs.head.funcs.head.pythonVer
三个变量的申明,具体使用在这:
valworker:Socket=env.createPythonWorker(pythonExec,envVars.asScala.toMap)
这里通过pythonRunner运行启动pythonworker。
额外福利:Python如何启动JVM,从而启动Spark
建议配置一套spark的开发环境,然后debug进行跟踪。Python启动时,首先启动SparkContext(context.py),在init方法里会_ensure_initialized方法确保Java里的SparkContext被初始化:
@classmethod
def_ensure_initialized(cls,instance=None,gateway=None,conf=None):
withSparkContext._lock:
ifnotSparkContext._gateway:
SparkContext._gateway=gatewayorlaunch_gateway(conf)
SparkContext._jvm=SparkContext._gateway.jvm
初始时会调用lauch_gateway(java_gateway.py),该方法首先会到环境变量里找SPARK_HOME,然后使用里面的./bin/spark-submit进行Spark的启动,通过环境变量中的PYSPARK_SUBMIT_ARGS获取一些参数,默认是pyspark-shell,最后通过Popen启动Spark进程,返回一个JavaGateWay,之后持有这个对象,就可以向JVM发送指令了。
解决问题
有了上面的铺垫后,问题就变得很好解决了,下面的单元测试原先是跑步过去的
deftest_readImages(self):
#Testthatreading
imageDF=imageIO._readImages("some/path",2,self.binaryFilesMock)
self.assertTrue("image"inimageDF.schema.names)
self.assertTrue("filePath"inimageDF.schema.names)
#TheDFshouldhave2imagesand1null.
self.assertEqual(imageDF.count(),3)
validImages=imageDF.filter(col("image").isNotNull())
self.assertEqual(validImages.count(),2)
img=validImages.first().image
self.assertEqual(img.height,array.shape[0])
self.assertEqual(img.width,array.shape[1])
self.assertEqual(imageIO.imageType(img).nChannels,array.shape[2])
self.assertEqual(img.data,array.tobytes())
现在我该如何让他通过呢?可以在setUp的时候添加
importos
os.environ["PYSPARK_PYTHON"]="your-python-path"
即可。