Maxcompute数据清洗和
2019-07-10 本文已影响0人
zishen
上手dataworks
关于maxcompute
- maxcompute原名是odps(open-data-processing-server)就是阿里提供的一个大数据分布式计算服务系统,分布式系统用来提升企业处理海量数据能力,可能是新改的名字很多借口都用odps命名。阿里官方文档.
-
架构如下
image
集成了很多计算框架,也很方便接口开发。
关于dataworks
- dataworks就是基于maxcompute的PaaS平台,可以简单理解为dataworks是一个web形式的开发管理工具,而maxcompute就是一台云端的很强的大数据计算主机。你可以通过dataworks来用maxcompute,也可以通过maxcompute的sdk(下节会说)。
- dataworks提供了可视化的开发流程,一个“业务流程”就相当于一个项目工程(业务流程和数据库表命名规则)
- 业务流程里面的“数据集成”模块主要用来同步多端数据,“数据开发”模块可以可视化建立节点进行开发。可以通过dataworks在maxcompute中建表上传数据。具体操作入门可以看dataworks文档
PyODPS连接maxcompute:
- PyODPS是Maxcompute的Python版的SDK,它提供了对MaxCompute对象的基本操作,同时提供DataFrame框架,可以在MaxCompute上进行数据分析。同时兼容Python2.7和3.x,很方便。
安装调试pyodps:
-
如果是用的anaconda的环境的话直接pip install pyodps就能安装了。具体参考:pyodps安装指南
-
在本地Python环境里面连接maxcompute的时候官方文档没说明白。安装好pyodps后导入包。
from odps import ODPS
#实例化maxcompute的对象
o = ODPS(access_id='xxxxxx'
, access_secret='xxxxxx'
, project='工作空间名'
,end_point='https://service.odps.aliyun.com/api')
- accesskey主账号设置了发到邮箱,end_point取决于网络环境和maxcompute服务器地址,按照上面就可以,如果是外网或者vpc参考:end_point设置。连上后就可以像在dataworks上的pyodps节点一样在本地对远端maxcompute进行操作了。
jupyter-notebook利用pyodps和pyecharts统计可视化简单例子:
- pyechart我之前做股票分析的时候发现的是百度开发的js包echarts的py版本,自从用了他,再也没用过matplotlib,图标都是可交互的还能自定义主题也可以集成到Django、flask里面很方便。PyEcharts文档
from odps import ODPS
from odps.df import DataFrame
o = ODPS(access_id=' ', access_secret=' ', project=' ', end_point='https://service.odps.aliyun.com/api')
t = o.get_table('data_product')
df = DataFrame(t)
deepdraw = df[df['source']=='_deepdraw']
leycloud = df[df['source']=='_leycloud']
source_agg = df.groupby(df.source).agg(count=df.count())
print(source_agg)
from pyecharts.charts import Pie
from pyecharts import options as opts
source_pie =(Pie()
.add("产品数",[('_leycloud',15588349),('_deepdraw',1035428)])
.set_global_opts(title_opts=opts.TitleOpts(title="source分布"))
)
source_pie.render_notebook()
pyecharts可视化效果
Jupyter的交互增强和日志服务
对Maxcompute中的表进行操作
建表
- 在datawork里面就可以建表,在这里直接建表,也可以在业务流程中建表。建的表都是存在Maxcompute里面
工作空间建表
业务流程建表
在DDL模式中用SQL建表,- MaxcomputeSQL概述,- SQL规范。 dataworks创建表 - 也可以通过PyODPS创建和操作表pyodps基本操作。
maxcompute表的设计
数据中台表各字段对应的英文名称
字段表格图示例tops表代码
CREATE TABLE `wzs_tops` (
`uuid` string COMMENT '全局唯一uuid,unique key,有索引',
`key_word` string COMMENT '分类的关键词',
`title` string COMMENT '标题',
`images` string COMMENT '图片url,json字段',
`price` bigint COMMENT '价格',
`comments` bigint COMMENT '全局唯一uuid,unique key,有索引',
`brand` string COMMENT '品牌品',
`create_date` datetime,
`source_pictures` string COMMENT '所属图片的source id,有索引',
`product_name` string COMMENT '商品名称',
`style` string COMMENT '风格',
`craft` string COMMENT '工艺',
`color_pattern` string COMMENT '色彩花纹',
`main_fabric` string COMMENT '主面料',
`source` string COMMENT '信息来源',
`model` string COMMENT '版型',
`profile` string COMMENT '廓形',
`coat_length` string COMMENT '衣长',
`collar_design` string COMMENT '领型',
`sleeve_length` string COMMENT '袖长',
`sleeve_design` string COMMENT '袖型',
`placket_design` string COMMENT '门襟类型',
`hem_design` string COMMENT '下摆设计'
) ;
对ODPS的DataFrame使用自定义函数
-
apply(axis = 0,reduce = False)调用自定义函数
axis = 0的时候为对每一行调用自定义函数,默认直接传入collection的一行,函数处理返回后再传入下一行。reduce = False时返回的是sequence,否则返回的是collection,reduce为False时,也可以使用yield关键字来返回多行结果。
-
每一次对有odps表生成的dataframe进行聚合、数据变换或自定义函数等操作时都会在dataworks中生成一张对应变换dataframe的临时表(生命周期为1天),可以在表管理中找到也可以到数据地图(Meta)中对表进行操作,如修改生命周期,可以将运行生成的结果保存方便调用。
生成的临时表 - 也可以在运行出结果后用.persist方法将返回的新dataframe保存为odps表。
- 导入包尽量在函数内导入,自定义函数中调用其他函数要写成闭包的形式
用@output指定自定义函数返回的字段名称和类型
@output(['uuid','title','brand','create_date','source','source_picture','key_word',
'product_name','images','price','comments','style', 'craft', 'color_pattern',
'main_fabric', 'model', 'profile','coat_length','collar_design', 'sleeve_length',
'sleeve_design','placket_design', 'hem_design']
,['string','string','string','datetime','string','string','string','string','string'
,'string','string','string','string','string','string','string','string','string'
,'string','string','string','string','string'])
def df_clean(row):
import json
import pandas as pd
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
-
map_reduce调用自定义函数
df.map_reduce(mapper = df_clean)#等价于
df.apply(df_clean,axis = 0, reduce =False)
在odps上使用第三方包:
odps上只有numpy一个第三方包,如果想用pandas或其他包就得上传包和依赖包到odps资源。可以通过jupyter上pyodps的接口上传,但是在上传数据较大的包如pandas有20mb会出现timeout报错。解决办法是在dataworks上“业务流程”——“资源”——“新建Archive资源”中上传,上传时打钩“上传为ODPS资源”就可以在当前工作空间中使用资源,也可以将小型的包的源码上传为新建Python资源中就可以在odps中进行引用。也可以将需要调用的文件上传到File资源中进行调用。文档和包
-
包要找适配Linux和Python2.7的
上传csv文件 -
注意每一个第三方包要改成.zip或.tar.gz的后缀
包后缀
在代码中调用第三方包和文件:
from odps import options
options.sql.settings = { 'odps.isolation.session.enable': True }
options.df.libraries = ['pandas.zip','pytz.zip','dateutil.zip','six.tar.gz'] #导入资源库中的pandas包和依赖
resource = o.get_resource('category.csv')
with resource.open('r') as fp:
category = pd.read_csv(fp) #在dataworks中运行要先通过resource打开文件,如果本地运行直接打开