大数据大数据

数据分析引擎 —— Pig

2022-03-28  本文已影响0人  小胡_鸭

一、Pig

1、简介

  Pig是一个基于Apache Hadoop的大规模数据分析平台,它提供的SQL-LIKE语言叫Pig Latin,该语言的编译器会把类SQL的数据分析请求转换为一系列经过优化处理的MapReduce运算。Pig为复杂的海量数据并行计算提供了一个简单的操作和编程接口,使用者可以透过Python或者JavaScript编写Java,之后再重新转写。

  Pig 的特点总结如下:

2、与 Hive 的对比

  Pig与Hive作为一种高级数据语言,均运行于HDFS之上,是hadoop上层的衍生架构,用于简化hadoop任务,并对MapReduce进行一个更高层次的封装。Pig与Hive的区别如下:


二、安装配置

  解压

tar -zxvf pig-0.17.0.tar.gz -C ~/training/

  配置环境变量

PIG_HOME=/root/training/pig-0.17.0
export PIG_HOME

# 本地模式不需要,但是集群模式需要的变量
PIG_CLASSPATH=$HADOOP_HOME/etc/hadoop
export PIG_CLASSPATH

1、本地模式

  启动

pig -x local

  可以看到配置好环境变量之后,在命令行中输入 pig 按 tab 键会自动提示可执行的命令或脚本,以本地模式启动后,可以看到 Pig 连接到的是本地文件系统。

2、集群模式

  集群模式需要先启动 hadoop,再启动 pig

pig

  可以看到集群模式启动后,pig 连接到的是 hadoop 文件系统。


三、常用命令

pig 常用命令非常简单,并且对 hdfs 执行一些操作时会发现比 hdfs 自带的命令执行要快。
# 查看文件系统当前目录下的所有文件目录
ls
# 切换到某个目录下
cd <dir>
# 输出某个文件的内容
cat <file>
# 查看当前目录
pwd
# 创建目录
mkdir <dir>
# 从本地文件系统拷贝到 HDFS 上
copyFromLocal <localfile> <hdfsDir>
# 将 HDFS 文件拷贝到本地文件系统
copyToLocal <hdfsFile> <localDir>
# 通过 sh cmd 可将命令操作的目标从HDFS改为本地文件系统
sh命令:调用操作系统的命令,操作本地的文件系统

# pig 自定义函数的定义和注册
define
register


四、数据模型

  pig 的表不是矩形的(即每一行都有相同的列),pig 的表被称为包(bag),包中存在行(Tuple)准确地说叫元组,每个元组中存在多个列,表允许不同的元组有完全不相同的列。

  列可以是基本类型int,long,float,double,chararray(string:注意以单引号),bytearray(byte[]),也可以嵌套 map,type,bag 等复杂类型。

  如果人为把每一行都设置成具有相同的列,则叫做一个关系;Pig 的物理存储结构是 JSON 格式。


五、PigLatin 处理数据

  首先需要启动 hadoop 的 historyserver

mr-jobhistory-daemon.sh start historyserver

  常用的 Pig Latin 语句有:

1、load:加载数据到表(Bag)
2、foreach:相当于循环,对bag中的每一条数据(Tuple)进行处理
3、filter:相当于where
4、group by:分组
5、join:连接
6、generate:提取列
7、union、intersect:集合计算
8、输出:dump 直接打印在屏幕上
         store 保存到HDFS中

  数据准备,在 HDFS 的 /scott 目录下存在两个 csv 文件,分别是员工表 emp.csv

empno ename job mgr hiredate sal comm deptno
7369 SMITH CLERK 7902 1980/12/17 800 0 20
7499 ALLEN SALESMAN 7698 1981/2/20 1600 300 30
7521 WARD SALESMAN 7698 1981/2/22 1250 500 30
7566 JONES MANAGER 7839 1981/4/2 2975 0 20
7654 MARTIN SALESMAN 7698 1981/9/28 1250 1400 30
7698 BLAKE MANAGER 7839 1981/5/1 2850 0 30
7782 CLARK MANAGER 7839 1981/6/9 2450 0 10
7788 SCOTT ANALYST 7566 1987/4/19 3000 0 20
7839 KING PRESIDENT -1 1981/11/17 5000 0 10
7844 TURNER SALESMAN 7698 1981/9/8 1500 0 30
7876 ADAMS CLERK 7788 1987/5/23 1100 0 20
7900 JAMES CLERK 7698 1981/12/3 950 0 30
7902 FORD ANALYST 7566 1981/12/3 3000 0 20
7934 MILLER CLERK 7782 1982/1/23 1300 0 10

  部门表 dept.csv

10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON

1、加载数据到表

table = load 'hdfsfile';

  加载 /scott/emp.csv 到 emp 表,并查看表结构

emp = load '/scott/emp.csv';
desc emp;

  由于没有指定表结构(字段和字段类型),查询表结构返回 unknow。

  指定表的列,默认数据类型是 bytearray

emp = load '/scott/emp.csv' as(empno,ename,job,mgr,hiredate,sal,comm,deptno);

  加载员工数据到表,并且指定每个tuple的schema和数据类型

emp = load '/scott/emp.csv' as(empno:int,ename:chararray,job:chararray,mgr:int,hiredate:chararray,sal:int,comm:int,deptno:int);

  通过 dump emp; 可以查询表中数据,效果相当于 select * from emp; 并且会触发一个 MapReduce 作业的执行,但是查询到数据却是空的。

  原因是 Pig 加载数据到表时,默认的列值分隔符是 tab(这点跟hive一样),需要指定分隔符。

emp = load '/scott/emp.csv' using PigStorage(',') as(empno:int,ename:chararray,job:chararray,mgr:int,hiredate:chararray,sal:int,comm:int,deptno:int);
dept = load '/scott/dept.csv' using PigStorage(',') as(deptno:int,dname:chararray,loc:chararray);

  再执行 dump emp; 查询,正常返回数据。

2、查询某些列(投影)

# 语法
tempTable = foreach table generate col1,col2,col3;

# 相当于 SQL
select col1,col2,col3 from table;

  查询员工表的员工号、姓名、薪水。

emp3 = foreach emp generate empno,ename,sal;


3、排序

# 语法
tempTable = order table by col;

# 相当于 SQL
select * from table order by col;

  查询员工信息,按照月薪排序

emp4 = order emp by sal;


4、分组

# 语法
tempTable = group table by col;

# 相当于 SQL
from table group by col;

# 获得投影
foreach tempTable generate col,MAX(col2),MIN(col3),SUM(col4),...;

# 两条 Pig Latin 合起来才相当于一条完整的 SQL
select col,MAX(col2) from table group by col;

  求每个部门的最高工资

# SQL
select deptno,max(sal) from emp group by deptno;

# Pig Latin
emp51 = group emp by deptno;
emp52 = foreach emp51 generate group,MAX(emp.sal);

  执行分组后,dump emp51; 得到的数据

grunt> describe emp51;
emp51: {group: int,emp: {(empno: int,ename: chararray,job: chararray,mgr: int,hiredate: chararray,sal: int,comm: int,deptno: int)}}
grunt> dump emp51;
(10,{(7934,MILLER,CLERK,7782,1982/1/23,1300,0,10),
   (7839,KING,PRESIDENT,-1,1981/11/17,5000,0,10),
   (7782,CLARK,MANAGER,7839,1981/6/9,2450,0,10)})
   
(20,{(7876,ADAMS,CLERK,7788,1987/5/23,1100,0,20),
   (7788,SCOTT,ANALYST,7566,1987/4/19,3000,0,20),
   (7369,SMITH,CLERK,7902,1980/12/17,800,0,20),
   (7566,JONES,MANAGER,7839,1981/4/2,2975,0,20),
   (7902,FORD,ANALYST,7566,1981/12/3,3000,0,20)})
   
(30,{(7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30),
   (7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30),
   (7698,BLAKE,MANAGER,7839,1981/5/1,2850,0,30),
   (7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30),
   (7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30),
   (7900,JAMES,CLERK,7698,1981/12/3,950,0,30)})

  最后 dump emp52; 结果为



5、过滤

# 语法
filter table by col == value;

# 相当于 SQL
select * from table where col = value;

  查询 10 号部门的所有员工

# SQL
select * from emp where deptno = 10;

# Pig Latin
emp6 = filter emp by deptno==10;
dump emp6;


6、多表查询

# 语法
tempTable1 = join table1 by col, table2 by col;
tempTable2 = foreach tempTable1 generate table1::col1,table2::col2

# 等价 SQL
select table1.col1, table2.col2 
from table1,table2 
where table1.col = table2.col;

  查询员工姓名和部门名称

# SQL
select ename,dname from emp,dept where emp.deptno = dept.deptno;

# Pig Latin
emp71 = join emp by deptno, dept by deptno;
emp72 = foreach emp71 generate emp::ename,dept::dname;

  执行结果



7、集合运算

# 求和集
tempTable = union table1,table2;

# 求交集
tempTable = intersect table1,table2;

  查询10号和20号部门的员工

# SQL
select * from emp where deptno = 10
    union
select * from emp where deptno = 20;

# Pig Latin
emp10 = filter emp by deptno == 10;
emp20 = filter emp by deptno == 20;
emp10_20 = union emp10,emp20;

  执行结果




六、自定义函数

  需要的依赖

$PIG_HOME/pig-0.17.0-core-h2.jar
$PIG_HOME/lib
$PIG_HOME/lib/h2
$HADOOP_HOME/share/hadoop/common
$HADOOP_HOME/share/hadoop/common/lib
$HADOOP_HOME/share/hadoop/mapreduce
$HADOOP_HOME/share/hadoop/mapreduce/lib

1、自定义运算函数

  自定义运算函数需要继承 org.apache.pig.EvalFunc 类。

  现在要实现一个需求:根据员工薪水判断薪水的级别,不大于1000是 Grade A,大于1000不大于3000是 Grade B,大于3000是 Grade C。

/**
 * 根据员工薪水判断薪水的级别
 * 
 * 1、sal <= 1000, 返回Grade A
 * 2、sal > 1000 && sal <= 3000, 返回Grade B
 * 3、sal > 3000, 返回Grade C
 */
public class CheckSalaryGrade extends EvalFunc<String> {

    /**
     * 调用
     * emp = load ***
     * emp2 = foreach emp generate empno,ename,sal,demo.CheckSalaryGrade(sal);
     */
    @Override
    public String exec(Tuple tuple) throws IOException {
        // 获取员工薪水
        int sal = (Integer) tuple.get(0);
        if (sal <= 1000)        return "Grade A";
        else if (sal <= 3000)   return "Grade B";
        else                    return "Grade C";       
    }
}

  打包 jar,并在 pig shell 上注册

grunt> register /root/temp/program/mypig.jar;
grunt> emp2 = foreach emp generate empno,ename,sal,demo.CheckSalaryGrade(sal);
grunt> dump emp2;

2、自定义过滤函数

  自定义过滤函数需要继承 org.apache.pig.FilterFunc

  现在用自定义过滤函数实现一个简单的需求,过滤掉薪水低于3000的员工数据。

// 如果员工薪水大于等于3000块钱,就选择出来
public class IsSalaryTooHigh extends FilterFunc {

    /**
     * 调用
     * emp = load ***
     * emp1 = filter emp by demo.IsSalaryTooHigh(sal);
     */
    @Override
    public Boolean exec(Tuple tuple) throws IOException {
        // 获取当前员工的薪水
        int sal = (Integer) tuple.get(0);
        
        return sal >= 3000 ? true : false;
    }
}

  打包注册,调用自定义过滤函数。

grunt> register /root/temp/program/mypig.jar;
grunt> emp1 = filter emp by demo.IsSalaryTooHigh(sal);
grunt> dump emp1;


3、自定义加载函数

  默认情况下,一行数据加载时会被拆解为一个 Tuple,但是某些情况下,希望有特殊的处理,这时就需要使用自定义加载函数来加载,自定义加载函数需要继承 org.apache.pig.LoadFunc

  现在实现一个简单的需求,将数据文件中的每个单词解析成一个 Tuple,一行数据可能有多个行,所以每行最终会被解析为一个 bag,代码如下:

public class MyLoadFunc extends LoadFunc {

    // 定义变量来代表输入流
    private RecordReader reader;
    
    @Override
    public InputFormat getInputFormat() throws IOException {
        // 处理的数据类型是什么(输入的类型)
        return new TextInputFormat();
    }

    @Override
    public Tuple getNext() throws IOException {
        // 从输入流中读取一行,如何解析该行数据
        // 数据: I love Beijing
        Tuple result = null;
        try {
            // 判断是否读取了数据
            if (!this.reader.nextKeyValue()) {
                // 没有数据
                return result;                          
            }
            
            // 得到数据
            String data = this.reader.getCurrentValue().toString();
            // 分词
            String[] words = data.split(" ");
            // 构造返回的结果
            result = TupleFactory.getInstance().newTuple();
            
            // 每一个单词单独生成一个Tuple(s),再把这些tuple放入一个bag
            // 在这个bag放入result中
            
            // 创建表
            DataBag bag = BagFactory.getInstance().newDefaultBag();
            for(String w : words) {
                //为每个单词生成一个tuple
                Tuple aTuple = TupleFactory.getInstance().newTuple();
                aTuple.append(w);
                
                //再把aTuple放入表
                bag.add(aTuple);
            }
            
            // 最后,把bag表放入result
            result.append(bag);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return result;
    }

    @Override
    public void prepareToRead(RecordReader reader, PigSplit arg1) throws IOException {
        // 初始化加载程序
        this.reader = reader;       
    }

    @Override
    public void setLocation(String path, Job job) throws IOException {
        // HDFS的输入路径
        FileInputFormat.setInputPaths(job, new Path(path));
    }
}

  继承 LoadFunc 抽象类,需要实现几个方法。首先,需要实现 prepareToRead() 方法,该方法会传递读取一行数据的迭代器 RecordReader 对象,该对象被自定义加载函数类内定义的成员接收;setLocation() 方法会设置 HDFS 的输出路径;getInputFormat() 方法设置了读取数据的格式,这里读取的是文本数据,所以设置为 TextInputFormat;真正加载数据时,每次读取数据都会调用 getNext() 方法,这里是自定义加载函数的核心逻辑,代码中会先去读取一个数据化并进行分词,对每个单词生成一个 Tuple 对象,最终一行数据对应的所有 Tuple 对象都会添加到 DataBag 对象中,DataBag 对象又被嵌套添加到返回的 Tuple 对象中。

  通过以下命令加载和使用自定义加载函数,并打印结果:

grunt> register /root/temp/program/mypig.jar;
grunt> record = load '/input/data.txt' using demo.MyLoadFunc();
grunt> dump record;
上一篇下一篇

猜你喜欢

热点阅读