Sqoop学习
sqoop1和sqoop2的区别
sqoop1和sqoop2完全就是两个东西,互相不兼容,sqoop1版本从1.4.1开始,sqoop2从1.99.1开始。sqoop1就是一个客户端工具,用脚本实现数据的抽取。sqoop2相对于sqoop1来说多了一个server端,客户端可以通过Rest API、JAVA API等方式访问server,提交请求。服务端连接hadoop生成mr,实现数据的同步。我们最常用的还是sqoop1,sqoop2并不稳定,使用也比较麻烦。
1、sqoop安装
下载
$wget http://archive.cloudera.com/cdh5/cdh/5/sqoop-1.4.6-cdh5.14.0.tar.gz
解压
$tar -xzvf sqoop-1.4.6-cdh5.14.0.tar.gz -C /home/hadoop/app/
复制jar包
需要复制三个jar包,一个是mysql的驱动(hive使用的即可),另外两个是hive的包
$cp /home/hadoop/app/hive-1.1.0-cdh5.14.0/lib/hive-common-1.1.0-cdh5.14.0.jar /home/hadoop/app/hive-1.1.0-cdh5.14.0/lib/hive-exec-1.1.0-cdh5.14.0.jar /home/hadoop/app/hive-1.1.0-cdh5.14.0/lib/mysql-connector-java.jar /home/hadoop/app/sqoop-1.4.6-cdh5.14.0/lib
配置环境变量和配置文件
$vi /etc/profile
export SQOOP_HOME=/home/hadoop/app/sqoop-1.4.6-cdh5.7.0
export PATH=$SQOOP_HOME/bin:$PATH
$source /etc/profile
$cd /home/hadoop/app/sqoop-1.4.6-cdh5.14.0/conf
$cp sqoop-env-template.sh sqoop-env.sh
$vi sqoop-env.sh
2、sqoop help
查看帮助
3、sqoop list-databases
4、sqoop list-tables
5、sqoop import
--connect:指定源数据库的连接信息
--username:指定连接数据库名称
--password:指定连接数据库密码
--table:指定读取的目标表
-m:指定运行的MapReduce数量
--mapreduce-job-name:指定sqoop运行MapReduce名称
--delete-target-dir:导入前先删除目标HDFS目录,防止重复报错
--hive-import:导入到hive的标识
--hive-table:指定导入的hive表名称
--fields-terminated-by:指定导入数据的分隔符,一定要和建表时指定的保持一致,否则会出现全是NULL的情况
--hive-overwrite:覆盖导入到hive,默认是追加导入
6、sqoop export
--export-dir:指定hive表存储的hdfs文件路径
--columns:指定导出的列,要和数据表对应,只能少不能多
--fields-terminated-by:指定hive表存储的文件的列分割符
7、业务场景作业
描述:MySQL里存储着城市表和产品表,Hadoop里存储着用户点击表,需要计算每个地区(城市归属于地区,例如上海属于华东区)热度Top3的产品名称和点击数量等信息,然后要把计算的结果导出到MySQL中作为可视化使用。
过程:
创建城市表
create table city_info(city_id int,city_name string,area string)
row format delimited fields terminated by '\t';
导入城市数据
sqoop import \
--connect jdbc:mysql://localhost:3306/business \
--username root --password root \
--table city_info -m 1 \
--mapreduce-job-name city_info_imp \
--delete-target-dir \
--hive-table test.city_info \
--hive-import \
--fields-terminated-by '\t' \
--hive-overwrite;
创建产品表
create table product_info(product_id int,product_name string,extend_info string)
row format delimited fields terminated by '\t';
导入产品数据
sqoop import \
--connect jdbc:mysql://localhost:3306/business \
--username root --password root \
--table product_info -m 1 \
--mapreduce-job-name product_info_imp \
--delete-target-dir \
--hive-table test.product_info \
--hive-import \
--fields-terminated-by '\t' \
--hive-overwrite;
创建用户点击临时表,用来给实际表动态分区
create table user_click_temp(user_id int,session_id string,action_time string,city_id int,product_id int)
row format delimited fields terminated by ',';
load data local inpath '/home/hadoop/data/user_click.txt' into table user_click_temp;
创建用户点击表
create table user_click(user_id int,session_id string,action_time string,city_id int,product_id int)
partitioned by (create_day string)
row format delimited fields terminated by ',';
load data local inpath '/home/hadoop/data/user_click.txt' into table user_click partition(create_day='2016-05-05');
从临时表抽取数据到实际表,动态分区
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table user_click partition(create_day) select user_id,session_id,action_time,city_id,product_id,date(action_time) from user_click_temp;
创建计算结果表
create table top3(product_id int,product_name string,area string,click_count int,rank int,day1 string)
partitioned by (day string)
row format delimited fields terminated by '\t';
临时版:数据只有一天,所以没有时间过滤
insert overwrite table top3 partition(day)
select * from (
select t.product_id,t.product_name,t.area,t.click_count,
row_number() over(partition by area order by click_count desc) rank,t.day day1,t.day
from (select u.product_id,p.product_name,c.area,count(1) click_count,date(action_time) day
from user_click u left join product_info p on u.product_id=p.product_id
left join city_info c on c.city_id=u.city_id group by u.product_id,p.product_name,c.area,date(action_time)) t where t.area is not null) tt where tt.rank<4
正式版:每天执行一次,计算前一天的top3并写到top3表中
insert overwrite table test.top3 partition(day)
select * from (
select t.product_id,t.product_name,t.area,t.click_count,
row_number() over(partition by area order by click_count desc) rank,t.day day1,t.day
from (select u.product_id,p.product_name,c.area,count(1) click_count,date(action_time) day
from (select * from test.user_click where date(action_time)=date(date_sub(current_date,1))) u left join test.product_info p on u.product_id=p.product_id
left join test.city_info c on c.city_id=u.city_id group by u.product_id,p.product_name,c.area,date(action_time)) t where t.area is not null) tt where tt.rank<4
导出top3到mysql
sqoop export \
--connect jdbc:mysql://localhost:3306/business \
--username root --password root \
--table top3 -m 1 \
--mapreduce-job-name top3_imp \
--export-dir /user/hive/warehouse/test.db/top3/day=2016-05-05 \
--columns "product_id,product_name,area,click_count,rank,day" \
--fields-terminated-by '\t'
调度执行,每天凌晨1点执行计算前一天的数据并导出到mysql
crontab -e * 1 * * * /home/hadoop/data/test.sh
test.sh
#!/bin/bash
yesterday=`date -d last-day +%Y-%m-%d`
echo $yesterday
echo "hive begin........."
hive -e "set hive.exec.dynamic.partition.mode=nonstrict;insert overwrite table test.top3 partition(day)
select * from (
select t.product_id,t.product_name,t.area,t.click_count,
row_number() over(partition by area order by click_count desc) rank,t.day day1,t.day
from (select u.product_id,p.product_name,c.area,count(1) click_count,date(action_time) day
from (select * from test.user_click where date(action_time)=date(date_sub(current_date,1))) u left join test.product_info p on u.product_id=p.product_id
left join test.city_info c on c.city_id=u.city_id group by u.product_id,p.product_name,c.area,date(action_time)) t where t.area is not null) tt where tt.rank<4;"
echo "hive end."
echo "sqoop export begin......"
sqoop export \
--connect jdbc:mysql://localhost:3306/business \
--username root --password root \
--table top3 -m 1 \
--mapreduce-job-name top3_imp \
--export-dir /user/hive/warehouse/test.db/top3/day=$yesterday \
--columns "product_id,product_name,area,click_count,rank,day" \
--fields-terminated-by '\t';
echo "sqoop export end."