数据分析引擎 —— Pig
一、Pig
1、简介
Pig是一个基于Apache Hadoop的大规模数据分析平台,它提供的SQL-LIKE语言叫Pig Latin,该语言的编译器会把类SQL的数据分析请求转换为一系列经过优化处理的MapReduce运算。Pig为复杂的海量数据并行计算提供了一个简单的操作和编程接口,使用者可以透过Python或者JavaScript编写Java,之后再重新转写。
Pig 的特点总结如下:
- 1)将 Pig Latin 翻译成 MapReduce 任务执行,可以看成是一个翻译器、映射器;
- 2)可以看成是 Hadoop 的客户端软件,为数据分析人员提供通过简单的类 SQL,连接到 Hadoop 集群并进行数据分析的接口和工具;
- 3)Pig Latin 可以进行排序、过滤、求和、分组、关联等常用操作,可以自定义函数,是一种面向数据分析处理的轻量级脚本语言;
- 4)Pig 可自动对集群进行分配和回收,自动地对 MapReduce 程序进行优化。
2、与 Hive 的对比
Pig与Hive作为一种高级数据语言,均运行于HDFS之上,是hadoop上层的衍生架构,用于简化hadoop任务,并对MapReduce进行一个更高层次的封装。Pig与Hive的区别如下:
- Pig是一种面向过程的数据流语言;Hive是一种数据仓库语言,并提供了完整的sql查询功能。
- Pig更轻量级,执行效率更快,适用于实时分析;Hive适用于离线数据分析。
- Hive查询语言为Hql,支持分区;Pig查询语言为Pig Latin,不支持分区。
- Hive支持JDBC/ODBC;Pig不支持JDBC/ODBC。
- Pig适用于半结构化数据(如:日志文件);Hive适用于结构化数据。
二、安装配置
解压
tar -zxvf pig-0.17.0.tar.gz -C ~/training/
配置环境变量
PIG_HOME=/root/training/pig-0.17.0
export PIG_HOME
# 本地模式不需要,但是集群模式需要的变量
PIG_CLASSPATH=$HADOOP_HOME/etc/hadoop
export PIG_CLASSPATH
1、本地模式
启动
pig -x local
可以看到配置好环境变量之后,在命令行中输入 pig 按 tab 键会自动提示可执行的命令或脚本,以本地模式启动后,可以看到 Pig 连接到的是本地文件系统。
2、集群模式
集群模式需要先启动 hadoop,再启动 pig
pig
可以看到集群模式启动后,pig 连接到的是 hadoop 文件系统。
三、常用命令
pig 常用命令非常简单,并且对 hdfs 执行一些操作时会发现比 hdfs 自带的命令执行要快。
# 查看文件系统当前目录下的所有文件目录
ls
# 切换到某个目录下
cd <dir>
# 输出某个文件的内容
cat <file>
# 查看当前目录
pwd
# 创建目录
mkdir <dir>
# 从本地文件系统拷贝到 HDFS 上
copyFromLocal <localfile> <hdfsDir>
# 将 HDFS 文件拷贝到本地文件系统
copyToLocal <hdfsFile> <localDir>
# 通过 sh cmd 可将命令操作的目标从HDFS改为本地文件系统
sh命令:调用操作系统的命令,操作本地的文件系统
# pig 自定义函数的定义和注册
define
register
四、数据模型
pig 的表不是矩形的(即每一行都有相同的列),pig 的表被称为包(bag),包中存在行(Tuple)准确地说叫元组,每个元组中存在多个列,表允许不同的元组有完全不相同的列。
列可以是基本类型int,long,float,double,chararray(string:注意以单引号),bytearray(byte[]),也可以嵌套 map,type,bag 等复杂类型。
如果人为把每一行都设置成具有相同的列,则叫做一个关系;Pig 的物理存储结构是 JSON 格式。
五、PigLatin 处理数据
首先需要启动 hadoop 的 historyserver
mr-jobhistory-daemon.sh start historyserver
常用的 Pig Latin 语句有:
1、load:加载数据到表(Bag)
2、foreach:相当于循环,对bag中的每一条数据(Tuple)进行处理
3、filter:相当于where
4、group by:分组
5、join:连接
6、generate:提取列
7、union、intersect:集合计算
8、输出:dump 直接打印在屏幕上
store 保存到HDFS中
数据准备,在 HDFS 的 /scott
目录下存在两个 csv 文件,分别是员工表 emp.csv
empno | ename | job | mgr | hiredate | sal | comm | deptno |
---|---|---|---|---|---|---|---|
7369 | SMITH | CLERK | 7902 | 1980/12/17 | 800 | 0 | 20 |
7499 | ALLEN | SALESMAN | 7698 | 1981/2/20 | 1600 | 300 | 30 |
7521 | WARD | SALESMAN | 7698 | 1981/2/22 | 1250 | 500 | 30 |
7566 | JONES | MANAGER | 7839 | 1981/4/2 | 2975 | 0 | 20 |
7654 | MARTIN | SALESMAN | 7698 | 1981/9/28 | 1250 | 1400 | 30 |
7698 | BLAKE | MANAGER | 7839 | 1981/5/1 | 2850 | 0 | 30 |
7782 | CLARK | MANAGER | 7839 | 1981/6/9 | 2450 | 0 | 10 |
7788 | SCOTT | ANALYST | 7566 | 1987/4/19 | 3000 | 0 | 20 |
7839 | KING | PRESIDENT | -1 | 1981/11/17 | 5000 | 0 | 10 |
7844 | TURNER | SALESMAN | 7698 | 1981/9/8 | 1500 | 0 | 30 |
7876 | ADAMS | CLERK | 7788 | 1987/5/23 | 1100 | 0 | 20 |
7900 | JAMES | CLERK | 7698 | 1981/12/3 | 950 | 0 | 30 |
7902 | FORD | ANALYST | 7566 | 1981/12/3 | 3000 | 0 | 20 |
7934 | MILLER | CLERK | 7782 | 1982/1/23 | 1300 | 0 | 10 |
部门表 dept.csv
10 | ACCOUNTING | NEW YORK |
---|---|---|
20 | RESEARCH | DALLAS |
30 | SALES | CHICAGO |
40 | OPERATIONS | BOSTON |
1、加载数据到表
table = load 'hdfsfile';
加载 /scott/emp.csv 到 emp 表,并查看表结构
emp = load '/scott/emp.csv';
desc emp;
由于没有指定表结构(字段和字段类型),查询表结构返回 unknow。
指定表的列,默认数据类型是 bytearray
emp = load '/scott/emp.csv' as(empno,ename,job,mgr,hiredate,sal,comm,deptno);
加载员工数据到表,并且指定每个tuple的schema和数据类型
emp = load '/scott/emp.csv' as(empno:int,ename:chararray,job:chararray,mgr:int,hiredate:chararray,sal:int,comm:int,deptno:int);
通过 dump emp; 可以查询表中数据,效果相当于 select * from emp; 并且会触发一个 MapReduce 作业的执行,但是查询到数据却是空的。
原因是 Pig 加载数据到表时,默认的列值分隔符是 tab(这点跟hive一样),需要指定分隔符。
emp = load '/scott/emp.csv' using PigStorage(',') as(empno:int,ename:chararray,job:chararray,mgr:int,hiredate:chararray,sal:int,comm:int,deptno:int);
dept = load '/scott/dept.csv' using PigStorage(',') as(deptno:int,dname:chararray,loc:chararray);
再执行 dump emp; 查询,正常返回数据。
2、查询某些列(投影)
# 语法
tempTable = foreach table generate col1,col2,col3;
# 相当于 SQL
select col1,col2,col3 from table;
查询员工表的员工号、姓名、薪水。
emp3 = foreach emp generate empno,ename,sal;
3、排序
# 语法
tempTable = order table by col;
# 相当于 SQL
select * from table order by col;
查询员工信息,按照月薪排序
emp4 = order emp by sal;
4、分组
# 语法
tempTable = group table by col;
# 相当于 SQL
from table group by col;
# 获得投影
foreach tempTable generate col,MAX(col2),MIN(col3),SUM(col4),...;
# 两条 Pig Latin 合起来才相当于一条完整的 SQL
select col,MAX(col2) from table group by col;
求每个部门的最高工资
# SQL
select deptno,max(sal) from emp group by deptno;
# Pig Latin
emp51 = group emp by deptno;
emp52 = foreach emp51 generate group,MAX(emp.sal);
执行分组后,dump emp51; 得到的数据
grunt> describe emp51;
emp51: {group: int,emp: {(empno: int,ename: chararray,job: chararray,mgr: int,hiredate: chararray,sal: int,comm: int,deptno: int)}}
grunt> dump emp51;
(10,{(7934,MILLER,CLERK,7782,1982/1/23,1300,0,10),
(7839,KING,PRESIDENT,-1,1981/11/17,5000,0,10),
(7782,CLARK,MANAGER,7839,1981/6/9,2450,0,10)})
(20,{(7876,ADAMS,CLERK,7788,1987/5/23,1100,0,20),
(7788,SCOTT,ANALYST,7566,1987/4/19,3000,0,20),
(7369,SMITH,CLERK,7902,1980/12/17,800,0,20),
(7566,JONES,MANAGER,7839,1981/4/2,2975,0,20),
(7902,FORD,ANALYST,7566,1981/12/3,3000,0,20)})
(30,{(7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30),
(7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30),
(7698,BLAKE,MANAGER,7839,1981/5/1,2850,0,30),
(7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30),
(7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30),
(7900,JAMES,CLERK,7698,1981/12/3,950,0,30)})
最后 dump emp52; 结果为
5、过滤
# 语法
filter table by col == value;
# 相当于 SQL
select * from table where col = value;
查询 10 号部门的所有员工
# SQL
select * from emp where deptno = 10;
# Pig Latin
emp6 = filter emp by deptno==10;
dump emp6;
6、多表查询
# 语法
tempTable1 = join table1 by col, table2 by col;
tempTable2 = foreach tempTable1 generate table1::col1,table2::col2
# 等价 SQL
select table1.col1, table2.col2
from table1,table2
where table1.col = table2.col;
查询员工姓名和部门名称
# SQL
select ename,dname from emp,dept where emp.deptno = dept.deptno;
# Pig Latin
emp71 = join emp by deptno, dept by deptno;
emp72 = foreach emp71 generate emp::ename,dept::dname;
执行结果
7、集合运算
# 求和集
tempTable = union table1,table2;
# 求交集
tempTable = intersect table1,table2;
查询10号和20号部门的员工
# SQL
select * from emp where deptno = 10
union
select * from emp where deptno = 20;
# Pig Latin
emp10 = filter emp by deptno == 10;
emp20 = filter emp by deptno == 20;
emp10_20 = union emp10,emp20;
执行结果
六、自定义函数
需要的依赖
$PIG_HOME/pig-0.17.0-core-h2.jar
$PIG_HOME/lib
$PIG_HOME/lib/h2
$HADOOP_HOME/share/hadoop/common
$HADOOP_HOME/share/hadoop/common/lib
$HADOOP_HOME/share/hadoop/mapreduce
$HADOOP_HOME/share/hadoop/mapreduce/lib
1、自定义运算函数
自定义运算函数需要继承 org.apache.pig.EvalFunc
类。
现在要实现一个需求:根据员工薪水判断薪水的级别,不大于1000是 Grade A,大于1000不大于3000是 Grade B,大于3000是 Grade C。
/**
* 根据员工薪水判断薪水的级别
*
* 1、sal <= 1000, 返回Grade A
* 2、sal > 1000 && sal <= 3000, 返回Grade B
* 3、sal > 3000, 返回Grade C
*/
public class CheckSalaryGrade extends EvalFunc<String> {
/**
* 调用
* emp = load ***
* emp2 = foreach emp generate empno,ename,sal,demo.CheckSalaryGrade(sal);
*/
@Override
public String exec(Tuple tuple) throws IOException {
// 获取员工薪水
int sal = (Integer) tuple.get(0);
if (sal <= 1000) return "Grade A";
else if (sal <= 3000) return "Grade B";
else return "Grade C";
}
}
打包 jar,并在 pig shell 上注册
grunt> register /root/temp/program/mypig.jar;
grunt> emp2 = foreach emp generate empno,ename,sal,demo.CheckSalaryGrade(sal);
grunt> dump emp2;
2、自定义过滤函数
自定义过滤函数需要继承 org.apache.pig.FilterFunc
。
现在用自定义过滤函数实现一个简单的需求,过滤掉薪水低于3000的员工数据。
// 如果员工薪水大于等于3000块钱,就选择出来
public class IsSalaryTooHigh extends FilterFunc {
/**
* 调用
* emp = load ***
* emp1 = filter emp by demo.IsSalaryTooHigh(sal);
*/
@Override
public Boolean exec(Tuple tuple) throws IOException {
// 获取当前员工的薪水
int sal = (Integer) tuple.get(0);
return sal >= 3000 ? true : false;
}
}
打包注册,调用自定义过滤函数。
grunt> register /root/temp/program/mypig.jar;
grunt> emp1 = filter emp by demo.IsSalaryTooHigh(sal);
grunt> dump emp1;
3、自定义加载函数
默认情况下,一行数据加载时会被拆解为一个 Tuple,但是某些情况下,希望有特殊的处理,这时就需要使用自定义加载函数来加载,自定义加载函数需要继承 org.apache.pig.LoadFunc
。
现在实现一个简单的需求,将数据文件中的每个单词解析成一个 Tuple,一行数据可能有多个行,所以每行最终会被解析为一个 bag,代码如下:
public class MyLoadFunc extends LoadFunc {
// 定义变量来代表输入流
private RecordReader reader;
@Override
public InputFormat getInputFormat() throws IOException {
// 处理的数据类型是什么(输入的类型)
return new TextInputFormat();
}
@Override
public Tuple getNext() throws IOException {
// 从输入流中读取一行,如何解析该行数据
// 数据: I love Beijing
Tuple result = null;
try {
// 判断是否读取了数据
if (!this.reader.nextKeyValue()) {
// 没有数据
return result;
}
// 得到数据
String data = this.reader.getCurrentValue().toString();
// 分词
String[] words = data.split(" ");
// 构造返回的结果
result = TupleFactory.getInstance().newTuple();
// 每一个单词单独生成一个Tuple(s),再把这些tuple放入一个bag
// 在这个bag放入result中
// 创建表
DataBag bag = BagFactory.getInstance().newDefaultBag();
for(String w : words) {
//为每个单词生成一个tuple
Tuple aTuple = TupleFactory.getInstance().newTuple();
aTuple.append(w);
//再把aTuple放入表
bag.add(aTuple);
}
// 最后,把bag表放入result
result.append(bag);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit arg1) throws IOException {
// 初始化加载程序
this.reader = reader;
}
@Override
public void setLocation(String path, Job job) throws IOException {
// HDFS的输入路径
FileInputFormat.setInputPaths(job, new Path(path));
}
}
继承 LoadFunc 抽象类,需要实现几个方法。首先,需要实现 prepareToRead() 方法,该方法会传递读取一行数据的迭代器 RecordReader 对象,该对象被自定义加载函数类内定义的成员接收;setLocation() 方法会设置 HDFS 的输出路径;getInputFormat() 方法设置了读取数据的格式,这里读取的是文本数据,所以设置为 TextInputFormat;真正加载数据时,每次读取数据都会调用 getNext() 方法,这里是自定义加载函数的核心逻辑,代码中会先去读取一个数据化并进行分词,对每个单词生成一个 Tuple 对象,最终一行数据对应的所有 Tuple 对象都会添加到 DataBag 对象中,DataBag 对象又被嵌套添加到返回的 Tuple 对象中。
通过以下命令加载和使用自定义加载函数,并打印结果:
grunt> register /root/temp/program/mypig.jar;
grunt> record = load '/input/data.txt' using demo.MyLoadFunc();
grunt> dump record;