Java面经

大数据面试题(一)

2019-06-16  本文已影响1人  阿涛哥
  1. 讲一下第一个项目
  1. hive中shuffle的优化

    1. 压缩
      压缩可以使磁盘上存储的数据量变小,通过降低I/O来提高查询速度。

      对hive产生的一系列MR中间过程启用压缩

      set hive.exec.compress.intermediate=true;
      set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
      

      对最终输出结果压缩(写到hdfs、本地磁盘的文件)

      set hive.exec.compress.output=true;
      set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
      
    2. join优化

      1. map join
        如果关联查询两张表中有一张小表默认map join,将小表加入内存
        hive.mapjoin.smalltable.filesize=25000000 默认大小
        hive.auto.convert.join=true 默认开启
        如果没有开启使用mapjoin,使用语句制定小表使用mapjoin
      select /*+ MAPJOIN(time_dim) */ count(1) from
      store_sales join time_dim on (ss_sold_time_sk = t_time_sk)
      
      1. smb join
        Sort-Merge-Bucket join
        解决大表与大表join速度慢问题
        通过分桶字段的的hash值对桶的个数取余进行分桶
    3. 倾斜连接
    ```xml
    <!-- hive.optimize.skewjoin:是否为连接表中的倾斜键创建单独的执行计划。它基于存储在元数据中的倾斜键。在编译时,Hive为倾斜键和其他键值生成各自的查询计 划。 -->
    <property> <name>hive.optimize.skewjoin</name> 
    <value>true</value>
    </property> <property>

    <!-- hive.skewjoin.key:决定如何确定连接中的倾斜键。在连接操作中,如果同一键值所对应的数据行数超过该参数值,则认为该键是一个倾斜连接键。 -->
    <name>hive.skewjoin.key</name>
    <value>100000</value> 
    </property>

    <!-- hive.skewjoin.mapjoin.map.tasks:指定倾斜连接中,用于Map连接作业的任务数。该参数应该与hive.skewjoin.mapjoin.min.split一起使用,执行细粒度的控制。 -->
    <property> <name>hive.skewjoin.mapjoin.map.tasks</name> 
    <value>10000</value>
    </property> 

    <!-- hive.skewjoin.mapjoin.min.split:通过指定最小split的大小,确定Map连接作业的任务数。该参数应该与hive.skewjoin.mapjoin.map.tasks一起使用,执行细粒度的控制。 -->
    <property>
    <name>hive.skewjoin.mapjoin.min.split</name>
    <value>33554432</value> 
    </property>
    ```
  1. Hive在集群过程中怎么解决数据倾斜
    本质原因:key的分布不均导致的

Map 端部分聚合,相当于Combiner

hive.map.aggr=true

有数据倾斜的时候进行负载均衡

hive.groupby.skewindata=true

当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。

  1. sqoop要将数据库中的所有表执行导入,怎么操作?哪些参数?增量导入?

全量导入

[hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop import \
> --connect jdbc:mysql://linux03.ibf.com:3306/mydb \
> --username root \
> --password 123456 \
> --table user

增量导入

bin/sqoop import \
--connect jdbc:mysql://linux03.ibf.com:3306/mydb \
--username root \
--password 123456 \
--table user \
--fields-terminated-by '\t' \
--target-dir /sqoop/incremental \
-m 1 \
--direct \
--check-column id \
--incremental append \
--last-value 3
  1. hive导致数据倾斜的可能性(哪些操作会导致) -->分桶 join key分布不均匀 大量空值导致如何解决?

    根据key操作到时结果分布不均都可能导致数据倾斜,如group by key

    order by 使用全局排序最终只会在一个reducer上运行所有数据,导致数据倾斜

    大量NULL

    hive的NULL有时候是必须的:

    1)hive中insert语句必须列数匹配,不支持不写入,没有值的列必须使用null占位。

    2)hive表的数据文件中按分隔符区分各个列。空列会保存NULL(\n)来保留列位置。但外部表加载某些数据时如果列不够,如表13列,文件数据只有2列,则在表查询时表中的末尾剩余列无数据对应,自动显示为NULL。

    所以,NULL转化为空字符串,可以节省磁盘空间,实现方法有几种
    1)建表时直接指定(两种方式)
    a、用语句
    ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’
    with serdeproperties('serialization.null.format' = '')
    实现,注意两者必须一起使用,如

    CREATE TABLE hive_tb (id int,name STRING)
    PARTITIONED BY ( `day` string,`type` tinyint COMMENT '0 as bid, 1 as win, 2 as ck', `hour` tinyint)
    ROW FORMAT SERDE ‘org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe’
    WITH SERDEPROPERTIES (
            ‘field.delim’='/t’,
            ‘escape.delim’='//’,
            ‘serialization.null.format'=''
    ) STORED AS TEXTFILE;
    
         b、或者通过ROW FORMAT DELIMITED NULL DEFINED AS '' 如
    
    CREATE TABLE hive_tb (id int,name STRING)
    PARTITIONED BY ( `day` string,`type` tinyint COMMENT '0 as bid, 1 as win, 2 as ck', `hour` tinyint)
    ROW FORMAT DELIMITED 
            NULL DEFINED AS '' 
    STORED AS TEXTFILE;
    

    2)修改已存在的表

    alter table hive_tb set serdeproperties('serialization.null.format' = '');
    
  1. hive中如何增加一列数据?
    新增一列

    hive > alter table log_messages add coloumns(
    app_name string comment 'Application name',
    session_id long comment 'The current session id'
    );
    -- 增加列的表的最后一个字段之后,在分区字段之前添加。
    

    如果在表中新增一列new_column,则在原表上直插入new_column这一列数据不可行

    如果新增一列是分区,则可以新增数据到该分区下

    insert into table clear partition(date='20150828',hour='18')   select id,url,guid from tracklogs where date='20150828' and hour='18'; 
    
  2. 运行spark

  1. 有没有hive处理过json?有哪些函数?
    1. 建表时制定jar包处理json数据

      1. 首先添加jar包
        ADD JAR hcatalog/share/hcatalog/hive-hcatalog-core-1.1.0-cdh5.14.2.jar;

      2. 建表

      hive (default)> ADD JAR hcatalog/share/hcatalog/hive-hcatalog-core-1.1.0-cdh5.14.2.jar;
      Added [hcatalog/share/hcatalog/hive-hcatalog-core-1.1.0-cdh5.14.2.jar] to class path
      Added resources: [hcatalog/share/hcatalog/hive-hcatalog-core-1.1.0-cdh5.14.2.jar]
      hive (default)> create table spark_people_json( 
                  > 
                  > `name` string,
                  > 
                  > `age`   int)
                  > 
                  > ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
                  > 
                  > STORED AS TEXTFILE;
      OK
      Time taken: 4.445 seconds        
      
    2. 记录下如果只是某个字段为json,想要获取里面的某个值怎么操作?

      1. get_json_object()

      只能获取一个字段

      select get_json_object('{"shop":{"book":[{"price":43.3,"type":"art"},{"price":30,"type":"technology"}],"clothes":{"price":19.951,"type":"shirt"}},"name":"jane","age":"23"}', '$.shop.book[0].type');
      
      1. json_tuple()

      可以获取多个字段

      select json_tuple('{"name":"jack","server":"www.qq.com"}','server','name')
      
      1. 自行编写UDF
  1. sparkstreaming正在运行的程序如何去中止?怎么安全停止?代码做了更新,如何让正在运行的和更新后的代码做一个交替?

    升级应用程序代码

    如果需要使用新的应用程序代码升级正在运行的Spark Streaming应用程序,则有两种可能的机制。

    1. 升级的Spark Streaming应用程序启动并与现有应用程序并行运行。一旦新的(接收与旧的数据相同的数据)已经预热并准备好黄金时间,旧的可以被放下。请注意,这可以用于支持将数据发送到两个目标(即早期和升级的应用程序)的数据源。

    2. 现有应用程序优雅关闭(请参阅 StreamingContext.stop(...) 或JavaStreamingContext.stop(...) 用于优雅关闭选项),确保在关闭之前完全处理已接收的数据。然后可以启动升级的应用程序,该应用程序将从早期应用程序停止的同一点开始处理。请注意,这只能通过支持源端缓冲的输入源(如Kafka和Flume)来完成,因为在前一个应用程序关闭且升级的应用程序尚未启动时需要缓冲数据。并且无法从早期检查点重新启动升级前代码的信息。检查点信息基本上包含序列化的Scala / Java / Python对象,并且尝试使用新的修改类反序列化对象可能会导致错误。在这种情况下,要么使用不同的检查点目录启动升级的应用程序,要么删除以前的检查点目录。

  1. sparkstreaming和kafka集成中 精确一次的数据消费如何实现?

    使用直接连接方式

  1. 消息语义有几种?

    1. at least once -- 消息绝不会丢,但可能会重复传输

    2. at most once -- 消息可能会丢,但绝不会重复传输

    3. exactly once -- 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的

  2. kafka的消费者怎么去保证精确一次的消费?

  1. sparkstreaming和kafka集成有几种方式?

    1. Receiver-based Approach

    2. Direct Approach (No Receivers) native Offsets

  2. 怎么实现sparkstreaming?

    初始化 StreamingContext

    通过创建输入DStreams来定义输入源。

    通过将转换和输出操作应用于DStream来定义流式计算。

    开始接收数据并使用它进行处理streamingContext.start()。

    等待处理停止(手动或由于任何错误)使用streamingContext.awaitTermination()。

    可以使用手动停止处理streamingContext.stop()。

  1. 项目中有几个人 如何分配?

  2. 所在项目中的存储架构?

  3. 开发工具用的是什么?(什么情况用什么工具/xshell/idea)

  4. 代码怎么去做的管理(git)?

  5. hive中的分析函数?

    1. 窗口函数
      LEAD
      可以选择指定要引导的行数。如果未指定要引导的行数,则前导是一行。
      当当前行的前导超出窗口末尾时返回null。
      hive (default)> desc function lead;
      OK
      tab_name
      LEAD (scalar_expression [,offset] [,default]) OVER ([query_partition_clause] order_by_clause); The LEAD function is used to return data from the next row. 
      

    LAG
    可以选择指定滞后的行数。如果未指定滞后行数,则滞后为一行。
    当当前行的延迟在窗口开始之前延伸时,返回null。
    hive (default)> desc function lag; OK tab_name LAG (scalar_expression [,offset] [,default]) OVER ([query_partition_clause] order_by_clause); The LAG function is used to access data from a previous row.
    FIRST_VALUE
    这最多需要两个参数。第一个参数是您想要第一个值的列,第二个(可选)参数必须是false默认的布尔值。如果设置为true,则跳过空值。
    LAST_VALUE
    这最多需要两个参数。第一个参数是您想要最后一个值的列,第二个(可选)参数必须是false默认的布尔值。如果设置为true,则跳过空值。

    1. OVER字句
      OVER标准聚合:

    COUNT、SUM、MIN、MAX、AVG

    使用带有任何原始数据类型的一个或多个分区列的PARTITION BY语句。

    使用PARTITION BY和ORDER BY 与任何数据类型的一个或多个分区和/或排序列。

    带有窗口的over具体说明。Windows可以在WINDOW子句中单独定义。窗口规范支持以下格式:

    (ROWS | RANGE) BETWEEN (UNBOUNDED | [num]) PRECEDING AND ([num] PRECEDING | CURRENT ROW | (UNBOUNDED | [num]) FOLLOWING)
    (ROWS | RANGE) BETWEEN CURRENT ROW AND (CURRENT ROW | (UNBOUNDED | [num]) FOLLOWING)
    (ROWS | RANGE) BETWEEN [num] FOLLOWING AND (UNBOUNDED | [num]) FOLLOWING
    

    当指定ORDER BY时缺少WINDOW子句,WINDOW规范默认为 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.

    当缺少ORDER BY和WINDOW子句时,WINDOW规范默认为 ROW BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.

    OVER子句支持以下函数,但它不支持带有它们的窗口(参见HIVE-4797):

    Ranking functions: Rank, NTile, DenseRank, CumeDist, PercentRank.

    Lead and Lag functions.

    1. 分析函数
      RANK
      ROW_NUMBER
      DENSE_RANK
      CUME_DIST:CUME_DIST 小于等于当前值的行数/分组内总行数
      PERCENT_RANK
      NTILE
    select 
    s_id,  
    NTILE(2) over(partition by c_id order by s_score)
    from score
    
    1. Hive 2.1.0及更高版本中支持Distinct (参见HIVE-9534)

    聚合函数支持Distinct,包括SUM,COUNT和AVG,它们聚合在每个分区内的不同值上。当前实现具有以下限制:出于性能原因,在分区子句中不能支持ORDER BY或窗口规范。支持的语法如下。

    COUNT(DISTINCT a) OVER (PARTITION BY c)
    

    Hive 2.2.0中支持ORDER BY和窗口规范(参见HIVE-13453)。一个例子如下。

    COUNT(DISTINCT a) OVER (PARTITION BY c ORDER BY d ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
    
    1. Hive 2.1.0及更高版本中OVER子句支持中的聚合函数(参见 HIVE-13475)

    添加了对OVER子句中引用聚合函数的支持。例如,目前我们可以在OVER子句中使用SUM聚合函数,如下所示。

    SELECT rank() OVER (ORDER BY sum(b))
    FROM T
    GROUP BY a;
    
  1. 常见的字符串用哪些函数?

    1. 字符串长度函数:length

    2. 字符串反转函数:reverse

    3. 字符串连接函数:concat

    4. 带分隔符字符串连接函数:concat_ws

    hive (practice)> select concat_ws('|','abc','def','gh');
    abc|def|gh
    
    1. 字符串截取函数:substr,substring

      substr(string A, int start),substring(string A, int start)

    2. 字符串截取函数:substr,substring

      substr(string A, int start, int len),substring(string A, int start, int len)

    3. 字符串转大写函数:upper,ucase

    4. 字符串转小写函数:lower,lcase

    5. 去空格函数:trim

    6. 左边去空格函数:ltrim

    7. 右边去空格函数:rtrim

    8. 正则表达式替换函数:regexp_replace

      语法: regexp_replace(string A, string B, string C)

      返回值: string

      说明:将字符串A中的符合java正则表达式B的部分替换为C。注意,在有些情况下要使用转义字符,类似oracle中的regexp_replace函数。

    9. 正则表达式解析函数:regexp_extract

      语法: regexp_extract(string subject, string pattern, int index)

      返回值: string

      说明:将字符串subject按照pattern正则表达式的规则拆分,返回index指定的字符。

    10. URL解析函数:parse_url

    11. json解析函数:get_json_object

    12. 空格字符串函数:space

    13. 重复字符串函数:repeat

    hive (practice)> select repeat('abc',5);
    abcabcabcabcabc
    
    1. 首字符ascii函数:ascii

    2. 左补足函数:lpad

    3. 右补足函数:rpad

    4. 分割字符串函数: split

    5. 集合查找函数: find_in_set

  1. hive中如何去统计每周一,每个月的第一天的pv?

    获取指定日期月份的第一天、年份的第一天

    select trunc('2019-02-24', 'YYYY');
    
    select trunc('2019-02-24', 'MM');
    

    指定日期下周的指定周几

    select next_day('2019-02-24', 'TU');
    

    按指定格式返回指定日期增加几个月后的日期

    select add_months('2019-02-28', 1);
    
    select add_months('2019-02-24 21:15:16', 2, 'YYYY-MM-dd HH:mm:ss');
    

    select count(guid) from table group by trunc(date, 'MM')

    select count(guid) from table group by next_day('2019-06-08', 'MONDAY');

  1. dataset和dataframe区别?

    Spark2.x之后,官方已经将 ( DataFrame ) /Dataset (数据集)API的进行了 统一 ,DataFrame仅 是Dataset中每个元素为Row类型的时候

    不同之处在于 Dataset 是 strongly typed (强类型的) ,而dataframe则是 untypedrel (弱类型的)

  2. 项目中hive的元数据在哪儿保存?

    1. 若没有制定将metastore保存到指定的数据库,则metastore默认存放在hive自带的deybe数据库中,这就是安装hive的嵌入模式

    2. 若在设置中制定外部数据库,则保存在该数据库中,本地模式和远程模式使用这种方式保存metastore

  3. 元数据怎么保证他的安全性?

    修改元数据所用的用户名和密码

    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>123456</value>
    </property> 
    

    在mysql端设置metastore数据库的访问权限

  4. sqoop导入导出有几种方式?增量导出?

    导入
    全量导入
    [hadoop@linux03 sqoop-1.4.5-cdh5.3.6]$ bin/sqoop import \ > --connect jdbc:mysql://linux03.ibf.com:3306/test_db \ > --username root \ > --password root \ > --table toHdfs \ > --target-dir /toHdfs \ > --direct \ > --delete-target-dir \ > --fields-terminated-by '\t' \ > -m 1

    增量导入append
    ```sh
    bin/sqoop import \
    --connect jdbc:mysql://linux03.ibf.com:3306/mydb \
    --username root \
    --password 123456 \
    --table user \
    --fields-terminated-by '\t' \
    --target-dir /sqoop/incremental \
    -m 1 \
    --direct \
    --check-column id \
    --incremental append \
    --last-value 3
    ```
    
    增量导入lastmodified
    表中必须有一列指示时间
    ```
    sqoop import \
    --connect jdbc:mysql://master:3306/test \
    --username hive \
    --password 123456 \
    --table customertest \
    --check-column last_mod \
    --incremental lastmodified \
    --last-value "2016-12-15 15:47:29" \
    -m 1 \
    --append     
    ```
    

    导出
    插入
    默认情况下,sqoop-export将新行添加到表中;每行输入记录都被转换成一条INSERT语句,将此行记录添加到目标数据库表中。如果数据库中的表具有约束条件(例如,其值必须唯一的主键列)并且已有数据存在,则必须注意避免插入违反这些约束条件的记录。如果INSERT语句失败,导出过程将失败。此模式主要用于将记录导出到可以接收这些结果的空表中。

    更新
    如果指定了--update-key参数,则Sqoop将改为修改数据库中表中现有的数据。每个输入记录都将转化为UPDATE语句修改现有数据。语句修改的行取决于--update-key指定的列名,如果数据库中的表中不存在的数据,那么也不会插入。
    根据目标数据库,如果要更新已存在于数据库中的行,或者如果行尚不存在则插入行,则还可以--update-mode 使用allowinsert模式指定参数
    
  5. sparkstreaming按批处理 hdfs做保存 小文件过多问题?

    使用窗口函数,指定足够长的窗口处理数据,总而使数据量足够大(最好在一个block大小左右),完成后使用foreachRDD将数据写出到HDFS

  1. hive中的负责数据类型有哪些?

    1. 数值型

      TINYINT

      SMALLINT

      INT/INTEGER

      BIGINT

      FLOAT

      DOUBLE

      DECIMAL

    2. 字符型

      string

      varchar

      char

    3. 日期型

      TIMESTAMP

      DATE

    4. 复杂类型

      ARRAY<data_type>

      create table hive_array_test (name string, stu_id_list array<INT>)
      ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
      COLLECTION ITEMS TERMINATED BY ':' ;
      
      -- 'FIELDS TERMINATED BY' :字段与字段之间的分隔符
      -- 'COLLECTION ITEMS TERMINATED BY' :一个字段各个 item 的分隔符
      
      [chen@centos01 ~]$ vi hive_array.txt 
      0601,1:2:3:4
      0602,5:6
      0603,7:8:9:10
      0604,11:12
      
      load data local inpath '/home/chen/hive_array.txt' into table hive_array_test;
      
      hive (default)> select * from hive_array_test;
      OK
      hive_array_test.name    hive_array_test.stu_id_list
      0601    [1,2,3,4]
      0602    [5,6]
      0603    [7,8,9,10]
      0604    [11,12]
      Time taken: 0.9 seconds, Fetched: 4 row(s)
      

      MAP<primitive_type, data_type>

      create table hive_map_test (id int, unit map<string, int>)
      ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '\t'
      COLLECTION ITEMS TERMINATED BY ','
      MAP KEYS TERMINATED BY ':';
      
      ‘MAP KEYS TERMINATED BY’: key value 分隔符
      
      [chen@centos01 ~]$ vi hive_map.txt
      0       Chinese:100,English:80,math:59
      1       Chinese:80,English:90
      2       Chinese:100,English:100,math:60
      
      load data local inpath '/home/chen/hive_map.txt' into table hive_map_test;
      
      hive (default)> select * from hive_map_test;
      OK
      hive_map_test.id        hive_map_test.unit
      0       {"Chinese":100,"English":80,"math":59}
      1       {"Chinese":80,"English":90}
      2       {"Chinese":100,"English":100,"math":60}
      Time taken: 0.204 seconds, Fetched: 3 row(s)
      
      hive (default)> select id, unit['math'] from hive_map_test;
      OK
      id      _c1
      0       59
      1       NULL
      2       60
      Time taken: 0.554 seconds, Fetched: 3 row(s)
      

      STRUCT<col_name : data_type [COMMENT col_comment], ...>

      create table hive_struct_test(id int, info struct<name:string, age:int, height:float>)
      ROW FORMAT DELIMITED
      FIELDS TERMINATED BY ','
      COLLECTION ITEMS TERMINATED BY ':';
      
      [chen@centos01 ~]$ vi hive_struct.txt
      0,zhao:18:178
      1,qian:30:173
      2,sun:20:180
      3,li:23:183
      
      load data local inpath '/home/chen/hive_struct.txt' into table hive_struct_test;
      
      
      hive (default)> select * from hive_struct_test;
      OK
      hive_struct_test.id     hive_struct_test.info
      0       {"name":"zhao","age":18,"height":178.0}
      1       {"name":"qian","age":30,"height":173.0}
      2       {"name":"sun","age":20,"height":180.0}
      3       {"name":"li","age":23,"height":183.0}
      Time taken: 0.153 seconds, Fetched: 4 row(s)
      
      hive (default)> select id, info.name from hive_struct_test;
      OK
      id      name
      0       zhao
      1       qian
      2       sun
      3       li
      Time taken: 0.133 seconds, Fetched: 4 row(s)
      
  1. hive中数据的导入导出,如何加载?保存在哪儿?

    导入:load data [local] inpath '路径' overwrite into table 表名

    导出:insert overwrite [local] directory '/home/hadoop/data' select * from emp_p;

    local:加local是从本地加载,不加local是从hdfs加载

  2. RDD的创建方式?

    1. 并行化在driver端已有的数据集,不能parallelize executor端的数据
    scala> var data = Array(1, 2, 3, 4, 5)
    data: Array[Int] = Array(1, 2, 3, 4, 5)
    
    scala> val rdd = sc.parallelize(data)
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26
    
    1. 引用或者读取一个外部存储系统的数据集,比如像 HDFS , Hbase ,或者任何 Hadoop inputFormat 的子类的数据集
    scala> sc.textFile("student.log")
    res0: org.apache.spark.rdd.RDD[String] = student.log MapPartitionsRDD[1] at textFile at <console>:25
    
    1. 从已经存在的 RDD ,调用 transformation 算子,产生新的子 RDD
  3. 运行中driver的内存溢出 怎么处理?

上一篇下一篇

猜你喜欢

热点阅读