扣丁学堂Python培训

扣丁学堂解析PySpark如何设置worker的python命令

2018-08-06  本文已影响3人  994d14631d16

  今天扣丁学堂Python培训老师给大家介绍一下关于研究spark-deep-learning项目,来回顾一下Pyspark的知识,下面我们要求来看那一下吧,希望能够对本文的读者有所帮助。

  问题描述

  关于PySpark的基本机制我就不讲太多,你google搜索“PySpark原理”就会有不少还不错的文章。我这次是遇到一个问题,因为我原先安装了python2.7,python3.6。后面为了方便我在我的电脑上使用virtualenv来做环境隔离,这个时候就发生一个比较诡异的事情:

  在driver端能够正常使用PIL图片处理模块,但是executor端则不行。那显然是我在~/.bash_profile的配置在executor启动pythonworker时没有生效,程序依然走了我早先安装的python2.7,而早先的2.7里我没有安装PIL。那么应该怎么解决这个问题呢?

  Python里的RDD和JVM的RDD如何进行关联

  要解答上面的问题,核心是要判定JVM里的PythonRunner启动pythonworker时,python的地址是怎么指定的。

  我们以pythonrdd里的map作为起点,

  defmap(self,f,preservesPartitioning=False):

  deffunc(_,iterator):

  returnmap(f,iterator)

  returnself.mapPartitionsWithIndex(func,preservesPartitioning)

  进入self.mapPartitionsWithIndex:

  defmapPartitionsWithIndex(self,f,preservesPartitioning=False):

  returnPipelinedRDD(self,f,preservesPartitioning)

  可以看到PipelinedRDD,进入PipelinedRDD._jrdd里,可以看到:

  wrapped_func=_wrap_function(self.ctx,self.func,self._prev_jrdd_deserializer,

  self._jrdd_deserializer,profiler)

  python_rdd=self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),wrapped_func,

  self.preservesPartitioning)

  self._jrdd_val=python_rdd.asJavaRDD()

  这里和JVM里的PythonRDD建立了联系。进入_wrap_function:

  pickled_command,broadcast_vars,env,includes=_prepare_for_python_RDD(sc,command)

  returnsc._jvm.PythonFunction(bytearray(pickled_command),env,includes,sc.pythonExec,sc.pythonVer,broadcast_vars,sc._javaAccumulator)

  我们看到了sc.pythonExec对象,这个是传入到PythonRDD里的python命令。为了看的更清楚,我们看看sc.pythonExec的申明:

  self.pythonExec=os.environ.get("PYSPARK_PYTHON",'python')

  也就是你在很多文档中看到的,通过设置PYSPARK_PYTHON变量来设置启用哪个python。那么pythonExec是JVM里是怎么用的呢?

  private[spark]classPythonRDD(

  parent:RDD[_],

  func:PythonFunction,

  preservePartitoning:Boolean)

  extendsRDD[Array[Byte]](parent){

  PythonRDD是在python中通过_jvm对象在JVM里创建的,里面哟给重要的对象是PythonFunction,这个PythonFunction就是wrapped_func,wrapped_func里包含了env,pythonExec等。PythonRDD的compute方法里会调用PythonRunner的compute方法:

  valrunner=PythonRunner(func,bufferSize,reuse_worker)

  runner.compute(firstParent.iterator(split,context),split.index,context)

  上面的func其实就是PythonFunction,在PythonRunner里你可以看到:

  //AllthePythonfunctionsshouldhavethesameexec,versionandenvvars.

  privatevalenvVars=funcs.head.funcs.head.envVars

  privatevalpythonExec=funcs.head.funcs.head.pythonExec

  privatevalpythonVer=funcs.head.funcs.head.pythonVer

  三个变量的申明,具体使用在这:

  valworker:Socket=env.createPythonWorker(pythonExec,envVars.asScala.toMap)

  这里通过pythonRunner运行启动pythonworker。

  额外福利:Python如何启动JVM,从而启动Spark

  建议配置一套spark的开发环境,然后debug进行跟踪。Python启动时,首先启动SparkContext(context.py),在init方法里会_ensure_initialized方法确保Java里的SparkContext被初始化:

  @classmethod

  def_ensure_initialized(cls,instance=None,gateway=None,conf=None):

  withSparkContext._lock:

  ifnotSparkContext._gateway:

  SparkContext._gateway=gatewayorlaunch_gateway(conf)

  SparkContext._jvm=SparkContext._gateway.jvm

  初始时会调用lauch_gateway(java_gateway.py),该方法首先会到环境变量里找SPARK_HOME,然后使用里面的./bin/spark-submit进行Spark的启动,通过环境变量中的PYSPARK_SUBMIT_ARGS获取一些参数,默认是pyspark-shell,最后通过Popen启动Spark进程,返回一个JavaGateWay,之后持有这个对象,就可以向JVM发送指令了。

  解决问题

  有了上面的铺垫后,问题就变得很好解决了,下面的单元测试原先是跑步过去的

  deftest_readImages(self):

  #Testthatreading

  imageDF=imageIO._readImages("some/path",2,self.binaryFilesMock)

  self.assertTrue("image"inimageDF.schema.names)

  self.assertTrue("filePath"inimageDF.schema.names)

  #TheDFshouldhave2imagesand1null.

  self.assertEqual(imageDF.count(),3)

  validImages=imageDF.filter(col("image").isNotNull())

  self.assertEqual(validImages.count(),2)

  img=validImages.first().image

  self.assertEqual(img.height,array.shape[0])

  self.assertEqual(img.width,array.shape[1])

  self.assertEqual(imageIO.imageType(img).nChannels,array.shape[2])

  self.assertEqual(img.data,array.tobytes())

  现在我该如何让他通过呢?可以在setUp的时候添加

  importos

  os.environ["PYSPARK_PYTHON"]="your-python-path"

  即可。

上一篇下一篇

猜你喜欢

热点阅读