Spark on Hive 和 Hive on Spark 区别

2019-10-15  本文已影响0人  alexlee666

一、背景

1.1 为什么引入Hive?

最初提出Hive的主要目的在于:降低使用MapReduce完成查询任务的技术门槛
在RDBMS中,开发人员或者用户通过执行SQL语句进行查询,SQL语言是开发人员大都熟悉的语言。在大数据发展的初期,大数据查询的计算任务需要通过MapReduce来完成,然而编写MapReduce的程序是件复杂繁琐的事情。Hive 可以实现将大家熟悉的SQL语句翻译成复杂的MapReduce程序,利用Hive非MapReduce开发人员也能够快速上手使用MapReduce完成查询任务。因此,大家经常会说Hive使用的是一种类SQL的HQL语言。

Hive查询原理示意图

1.2 为什么引入Spark?

Hive底层计算使用的是Hadoop的MapReduce,由于需要繁的磁盘IO,其计算性能只适合于大文件的非实时的批处理操作。Spark基于内存计算,凭借着DAG和RDD特性(保证中间数据如果丢失可以重新计算恢复),可以将计算的中间结果以RDD的形式保存在内存中,而不需要频繁的磁盘IO,非常适合于交互式迭代计算。Spark的计算性能远高于Hadoop的MapReduce。

1.3 Hive的内部表、外部表以及元数据

Hive表的元数据:

内部表 V.S. 外部表:

名称 内部表 外部表
表数据由谁管理 Hive自身管理 HDFS管理
表数据存储位置 配置项hive.metastore.warehouse.dir(默认:/user/hive/warehouse) 自己制定
删除表带来的影响 直接删除元数据(metadata)和存储的数据 仅删除元数据(metadata)
修改表结构带来的影响 将修改同步给元数据(metadata) 需要修复外部表 (MSCK REPAIR TABLE table_name;)

了解了这些背景知识后,接下来比较下Spark on Hive 和 Hive on Spark 区别。


二、Spark on Hive 和 Hive on Spark 区别

2.1 Spark on Hive

顾名思义,即将Spark构建在Hive之上,Spark需要用到Hive,具体表现为:

总之,Spark使用Hive来提供表的metadata信息

2.2 Hive on Spark

顾名思义,即将Hive构建在Spark之上(Hive的底层默认计算引擎为Hadoop的MapReduce),Hive需要用到Spark,具体表现为:

<property>
    <name>hive.execution.engine</name>
    <value>spark</value>
    <description>
      Expects one of [mr, tez, spark].
      Chooses execution engine. Options are: mr (Map reduce, default), tez, spark. While MR
      remains the default engine for historical reasons, it is itself a historical engine
      and is deprecated in Hive 2 line. It may be removed without further warning.
    </description>
  </property>

hive> insert into tbl1 values(2,'a', 'f', 2);

Query ID = ......
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
java.lang.NoClassDefFoundError: io/netty/channel/EventLoopGroup
    at org.apache.hive.spark.client.SparkClientFactory.initialize(SparkClientFactory.java:56)
    at org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl.setup(SparkSessionManagerImpl.java:83)
    at org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl.getSession(SparkSessionManagerImpl.java:102)
    at org.apache.hadoop.hive.ql.exec.spark.SparkUtilities.getSparkSession(SparkUtilities.java:125)
    .....
Caused by: java.lang.ClassNotFoundException: io.netty.channel.EventLoopGroup
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 24 more
FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. io/netty/channel/EventLoopGroup

开发过程中常采取Spark on Hive 方案,接下来给出 Spark on Hive 创建外部查询表的方法。


三、 Spark on Hive 创建外部查询表


import org.apache.hadoop.conf.Configuration;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;


public class HiveUtil {

  private static final Logger logger = LoggerFactory.getLogger(HiveUtil.class);

  public void createSparkDatabase(String instanceName, String schemaName, String tableName)
          throws Exception {
    String databaseName = getDatabaseName(instanceName, schemaName);
    String ddl = String.format("CREATE DATABASE IF NOT EXISTS %s", databaseName+"___s");
    launchSparkTask(instanceName, schemaName, tableName, ddl);
  }

  public void createSparkTable(String instanceName, String schemaName,
                              String tableName, String tablePrefix) throws Exception {
    String sourcePK = metaConfigDao.getSoucePk(instanceName, schemaName, tableName);
    dropSparkTable(instanceName, schemaName, tableName);
    List<Pair<String, String>> columnList = dbHelper.getColumnList(instanceName,
            schemaName, tableName);
    StringBuffer dbColumns = new StringBuffer();
    for (int i=0; i<columnList.size(); i++) {
      Pair<String, String> p = columnList.get(i);
      if (i == columnList.size() -1 ) {
        dbColumns.append(p.getLeft()+" "+p.getRight());
      } else {
        dbColumns.append(p.getLeft()+" "+p.getRight()+",");
      }
    }

    String databaseName = ;
    String tenantName = ;

    String ddl = String.format("CREATE TABLE if not exists %s.%s (%s) "
                    + " using org.apache.spark.sql.execution.datasources.parquet"
                    + " options(\"tenant\" \'%s\',"
                    + "\"instance\" \"%s\", "
                    + "\"schema\" \"%s\", "
                    + "\"table\" \"%s\", "
                    + "\"prefix\" \"%s\", "
                    + "\"sourcepk\" \"%s\") ",
            databaseName+"___s", tableName, dbColumns, tenantName, instanceName,
            schemaName, tableName,tablePrefix, sourcePK);

    logger.info("Execute spark ddl, ddl is: [{}]", ddl);
    launchSparkTask(instanceName, schemaName, tableName, ddl);
    logger.info("Spark table created [{}.{}]", schemaName, tableName);
  }

  public void dropSparkTable(String instanceName, String schemaName, String tableName)
          throws Exception {
    String databaseName = getDatabaseName(instanceName, schemaName);
    String ddl = String.format("DROP TABLE IF EXISTS %s.%s", databaseName+"___s", tableName);
    launchSparkTask(instanceName, schemaName, tableName, ddl);
    logger.info("Dropped spark table [{}.{}]", databaseName+"___s", tableName);
  }


//launch spark task to execute spark jobs
  public void launchSparkTask(String instanceName, String schemaName,
                              String tableName, String sqlddl) throws Exception{
    String sparkTaskJarName = handler.getParquetJarName().trim();
    String createSparkTableClassPath = handler.getCreateSparkTableClassPath().trim();
    String sparkMaster = handler.getSparkMaster().trim();
    System.out.println("sparkTaskJarName: " + sparkTaskJarName);
    String keytabPath = (kerberosUtil.getKeytabPath() == null) ?
            "" : kerberosUtil.getKeytabPath();
    String principal = (kerberosUtil.getPrincipal() == null) ?
            "" : kerberosUtil.getPrincipal();
    SparkLauncher launcher = new SparkLauncher();
    // To be launched spark-application jar
    launcher.setAppResource(sparkTaskJarName);
    launcher.setMainClass(createSparkTableClassPath);
    launcher.addAppArgs(sqlddl, keytabPath, principal );
    // master could be yarn or local[*]
    //launcher.setMaster("local[*]");
    launcher.setMaster(sparkMaster);
    SparkAppHandle handle = launcher.startApplication();
    int retrytimes = 0;
    while (handle.getState() != SparkAppHandle.State.FINISHED) {
      retrytimes ++;
      Thread.sleep(5000L);
      System.out.println("applicationId is: " + handle.getAppId());
      System.out.println("current state: " + handle.getState());
      boolean mark = (handle.getAppId() == null
              && handle.getState() == SparkAppHandle.State.FAILED )
              && retrytimes > 8;
      if (mark) {
        logger.info("can not start spark job for creating spark table. Creating spark table failed. ");
        metaConfigDao.updateMetaFlag(instanceName, schemaName, tableName, Config.META_FLAG_TASK_FAILED);
        failedflag = true;
        break;
      }
    }
    System.out.println("Launcher over");
  }
}

上一篇下一篇

猜你喜欢

热点阅读