MLSQL平台执行python代码示例
MLSQL平台介绍
1、基于Apache Spark开发并支持通过写sql的方式执行批程序、流程序、AI、爬虫任务的分布式机器学习平台。
2、可运行在EC2, Hadoop YARN, Mesos, Kubernetes。
3、可操作HDFS,Alluxio,Cassandra,HBase,Hive等多种数据。
4、兼容Spark 2.2.x/2.3.x/2.4.x。
MLSQL项目地址:https://github.com/allwefantasy/streamingpro
https://github.com/allwefantasy/streamingpro/tree/master/examples/sklearn_elasticnet_wine
本文主要介绍如何在MLSQL平台无缝执行python任务脚本,MLSQL有两种集成 Python脚本的方式。
第一种:主要用于python脚本要获取全量数据的情况,比如做机器学习,参考下面示例(一)和示例(二)。
注:示例(一)和示例(二)其实就是将python项目目录放到hdfs上面,通过在python项目目录里新增2个配置文件conda.yaml和MLproject,然后就可以在MLSQL上面写sql脚本调用该python项目代码,最后数据保存到了HDFS中python项目目录下的model里面。
第二种:主要用来作数据处理,需要并行运行的情况,参考示例(三)。
注:示例(三)其实就是将python代码、conda.yaml、MLproject和sql代码直接写在MLSQL上面执行,最后数据保存到了HDFS中python项目目录下的model里面。另外示例(三)是通过set data = '''{"jack":1}'''语句构造的testData测试数据,也可以通过load语法加载其他数据源中的数据。
load语法参考:https://github.com/allwefantasy/streamingpro/blob/master/docs/mlsql-grammar.md
MLSQL平台执行python代码示例(一)
目标:通过MLSQL执行python文件,最终数据保存到hdfs
执行流程:配置python环境依赖、编写python主程序代码、编写MLSQL运行脚本
说明:testStreamingA为hdfs上的文件夹
1、配置python环境依赖的2个文件, 分别为conda.yaml和MLproject
conda.yaml文件内容如下:
name: tutorial
dependencies:
- python=3.6
- pip
- pip:
- --index-url https://mirrors.aliyun.com/pypi/simple/
- kafka==1.3.5
- numpy==1.14.3
MLproject文件内容如下:
name: tutorial
conda_env: conda.yaml
entry_points:
main:
train:
parameters:
alpha: {type: float, default: 0.5}
l1_ratio: {type: float, default: 0.1}
command: "python train.py 0.5 0.1"
Note: command: "python train.py 0.5 0.1" 是执行命令
2、编写python运行文件train.py
import os
import warnings
import sys
import mlsql
if __name__ == "__main__":
warnings.filterwarnings("ignore")
tempDataLocalPath = mlsql.internal_system_param["tempDataLocalPath"]
alpha = float(sys.argv[1]) if len(sys.argv) > 1 else 0.5
l1_ratio = float(sys.argv[2]) if len(sys.argv) > 2 else 0.5
isp = mlsql.params()["internalSystemParam"]
tempModelLocalPath = isp["tempModelLocalPath"]
if not os.path.exists(tempModelLocalPath):
os.makedirs(tempModelLocalPath)
with open(tempModelLocalPath + "/result.txt", "w") as f:
f.write(str(alpha))
f.write(str(l1_ratio))
3、编写MLSQL平台脚本
set modelPath="/testStreamingA";
load csv.`/testStreamingA` as testData;
-- train sklearn model
run testData as PythonAlg.`${modelPath}`
where pythonScriptPath="${HOME}/testStreamingA" -- python 项目所在目录
and keepVersion="false"; -- 是否覆盖模型, true为不覆盖, false为覆盖
load text.`/testStreamingA/model/0` as output; -- 查看目标文件
MLSQL平台执行python代码示例(二)
目标:通过MLSQL执行python文件,统计hdfs的{HOME}/testStreamingB/testDirB下面文件数量保存到hdfs
执行流程:配置python环境依赖、编写python主程序代码、编写MLSQL运行脚本
说明:testStreamingB为hdfs上的文件夹
1、配置python环境依赖的2个文件, 分别为conda.yaml和MLproject
conda.yaml文件内容如下:
name: tutorial
dependencies:
- python=3.6
- pip
- pip:
- --index-url https://mirrors.aliyun.com/pypi/simple/
- numpy==1.14.3
- kafka==1.3.5
- pandas==0.22.0
- scikit-learn==0.19.1
- scipy==1.1.0
MLproject文件内容如下:
name: tutorial
conda_env: conda.yaml
entry_points:
main:
train:
command: "python train.py"
Note: command: "python train.py" 是执行命令
2、编写python运行文件train.py
import os
import warnings
import sys
import mlsql
if __name__ == "__main__":
warnings.filterwarnings("ignore")
tempDataLocalPath = mlsql.internal_system_param["tempDataLocalPath"]
isp = mlsql.params()["internalSystemParam"]
tempModelLocalPath = isp["tempModelLocalPath"]
if not os.path.exists(tempModelLocalPath):
os.makedirs(tempModelLocalPath)
dir_list_A = os.listdir(mlsql.internal_system_param["resource"]["a"])
dir_list_A = [i for i in dir_list_A if i.endswith(".txt")]
dir_list_B = os.listdir(mlsql.internal_system_param["resource"]["b"])
dir_list_B = [i for i in dir_list_B if i.endswith(".txt")]
with open(tempModelLocalPath + "/result.txt", "w") as f:
f.write(str(len(dir_list_A) + len(dir_list_B)))
3、编写MLSQL平台运行脚本
set modelPath="/testStreamingB";
load csv.`/testStreamingB` as testData;
-- train sklearn model
run testData as PythonAlg.`${modelPath}`
where pythonScriptPath="${HOME}/testStreamingB" -- python 项目所在目录
and `fitParam.0.resource.a`="${HOME}/testStreamingB/testDirA" -- 指定要加载文件目录
and `fitParam.0.resource.b`="${HOME}/testStreamingB/testDirB" -- 指定要加载文件目录
and keepVersion="false"; -- 是否覆盖模型, true为不覆盖, false为覆盖
load text.`/testStreamingB/model/0` as output;
MLSQL平台执行python代码示例(三)
升级:对比示例一和示例二,可将配置代码、python代码都写在MLSQL脚本里面运行
MLSQL平台运行脚本如下:
-- 要执行的python代码
set pythonScript='''
import os
import warnings
import sys
import mlsql
if __name__ == "__main__":
warnings.filterwarnings("ignore")
tempDataLocalPath = mlsql.internal_system_param["tempDataLocalPath"]
isp = mlsql.params()["internalSystemParam"]
tempModelLocalPath = isp["tempModelLocalPath"]
if not os.path.exists(tempModelLocalPath):
os.makedirs(tempModelLocalPath)
with open(tempModelLocalPath + "/result.txt", "w") as f:
f.write("jack")
''';
-- 配置文件代码
set dependencies='''
name: tutorial
dependencies:
- python=3.6
- pip
- pip:
- --index-url https://mirrors.aliyun.com/pypi/simple/
- numpy==1.14.3
- kafka==1.3.5
- pyspark==2.3.2
- pandas==0.22.0
''';
set modelPath="/tmp/jack2";
set data='''
{"jack":1}
''';
load jsonStr.`data` as testData;
load script.`pythonScript` as pythonScript;
load script.`dependencies` as dependencies;
run testData as RepartitionExt.`` where partitionNum="5" as newdata; --partitionNum=5即将数据分成5个分区
-- train sklearn model
run newdata as PythonParallelExt.`${modelPath}`
where partitionNum="3" -- 可以修改任何一张表的分区数
and scripts="pythonScript"
and entryPoint="pythonScript"
and condaFile="dependencies"
and keepLocalDirectory="true";
Q&A
Q1:HDFS有1万多张图片,需要在MLSQL上面执行Python的深度学习训练代码,发现在MLSQL执行python的代码时,MLSQL进程一直存在,但是没有后续日志输出,也没有Python模型结果输出?
A1:MLSQL调用Python代码执行流程:复制HDFS数据到本地磁盘——>执行python代码——>将目标模型保存到HDFS模型目录。
原因:由于图片有1万多张,实测复制HDFS到本地用时18分钟,比较耗时。
优化方案:将HDFS文件进行压缩,下载到本地后解压使用。