邵红晓

Flink1.10.0与Hive3.1.2集成详细步骤

2020-05-15  本文已影响0人  耳边的火

本文记录了flink1.10.0与hive3.1.2集成时遇到的问题,以及解决方案。有的同学与我的版本不同,可以参考下踩坑的过程,以及要解决哪些问题,怎么解决。总的体验下来,官方文档写的不是很详细,尤其这么多hive的版本,大家只能自己实际操作下踩坑再分享。

背景

在flink1.9之前,想要将数据写入hive,只能将数据写入HDFS的文件,然后在hive中创建外表。
这需要自定义开发HDFS File Sink 写入不同文件类型,且需要在flink中管理数据所需写入的分区。
在数据写完后,还需要在hive中操作,增加分区,才可以在hive中使用。
基础设施版本:

Flink Hive 集成介绍

在Flink1.9时,flink官方支持了与hive的集成,但是支持的Hive版本较少(2.3.41.2.1)。
在Flink1.10版本中,flink支持的hive版本扩大很多,我们所使用的Hive3.1.2也支持。
Flink之所以在1.9中,开始支持hive的集成,是因为1.9版本开始,blink分支merged到flink的master中。
blink分支中的 table-planner-blink 项目对Table & SQL API 进行了优化与扩展。因此flink1.10.0集成Hive3.1.2时,需要使用Table/SQL API 才可以使用。

Flink集群设置

虽然官方文档指出,flink1.10.0可以与hive3.1.2进行集成,但是仅给出了与hive3.1.0集成时,flink集群需要的额外依赖
并没有指出集成hive3.1.2如何操作。
因此,首先按照集成hive3.1.0的操作进行处理:
将hive3.1.2的lib中的 hive-exec-3.1.2.jar,libfb303-0.9.3.jar 以及 flink-connector-hive_2.11-1.10.0.jar 放入 lib 目录中
启动集群
./bin/yarn-session -jm 1024 -tm 2048 -s 2 -nm flink-hive-test -d

题外话:正确启动flink on yarn集群,需要让flink集群启动时知道hadoop的相关配置,因此需要配置 HADOOP_CONF_DIR 环境变量,一般在 $HADOOP_HOME/etc/hadoop。此外,1.9之后lib目录中不自带 flink-hadoop2-uber.jar了,因此需要让flink知道hadoop的依赖,要么将 hadoop-uber.jar 放入lib目录,要么设置环境变量 HADOOP_CLASSPATH,推荐的做法是设置 HADOOP_CLASSPATH 环境变量)。

启动报错:

------------------------------------------------------------
The program finished with the following exception:

org.apache.hadoop.yarn.exceptions.YarnRuntimeException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl.getClient(RpcClientFactoryPBImpl.java:81)
at org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC.getProxy(HadoopYarnProtoRPC.java:48)
at org.apache.hadoop.yarn.client.RMProxy$1.run(RMProxy.java:151)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:360)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1710)
at org.apache.hadoop.yarn.client.RMProxy.getProxy(RMProxy.java:147)
at org.apache.hadoop.yarn.client.RMProxy.newProxyInstance(RMProxy.java:134)
at org.apache.hadoop.yarn.client.RMProxy.createRMProxy(RMProxy.java:102)
at org.apache.hadoop.yarn.client.ClientRMProxy.createRMProxy(ClientRMProxy.java:72)
at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceStart(YarnClientImpl.java:233)
at org.apache.hadoop.service.AbstractService.start(AbstractService.java:194)
at org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:72)
at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:56)
at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:42)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:529)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl.getClient(RpcClientFactoryPBImpl.java:78)
... 21 more
Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357)
at org.apache.hadoop.conf.Configuration.set(Configuration.java:1338)
at org.apache.hadoop.conf.Configuration.setClass(Configuration.java:2713)
at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:205)
at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.<init>(ApplicationClientProtocolPBClientImpl.java:209)

核心错误为:NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
经过排查与查找资料发现,应该是 hive-exec-3.1.2.jar 中含有 com.google.common.base.Preconditions 类,而 hadoop 中也依赖了该类,且版本不一致导致。
到hive的lib中查看,发现hive的lib中使用了hadoop的guava来避免版本问题


hive guava

因此,解决方案为:将 guava-27.0-jre.jar 放入flink的lib目录中,即可正常启动 flink-on-yarn 集群。
此时,将根据官网demo写的项目打包后,在flink集群上运行,发现报错:

------------------------------------------------------------
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to create Hive Metastore client
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client
at org.apache.flink.table.catalog.hive.client.HiveShimV310.getHiveMetastoreClient(HiveShimV310.java:100)
at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.<init>(HiveMetastoreClientWrapper.java:71)
at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:188)
at org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:102)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:235)
at com.example.flink.HiveWriteDemo.main(HiveWriteDemo.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 11 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.table.catalog.hive.client.HiveShimV310.getHiveMetastoreClient(HiveShimV310.java:98)
... 23 more
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:86)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:95)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:148)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
... 28 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:84)
... 31 more
Caused by: MetaException(message:org/datanucleus/NucleusContext)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:84)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:93)
at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:8667)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:169)
... 36 more
Caused by: java.lang.NoClassDefFoundError: org/datanucleus/NucleusContext
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.hive.metastore.utils.JavaUtils.getClass(JavaUtils.java:52)
at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:65)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStoreForConf(HiveMetaStore.java:718)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMSForConf(HiveMetaStore.java:696)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:690)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:767)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:538)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:147)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:108)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:80)
... 39 more
Caused by: java.lang.ClassNotFoundException: org.datanucleus.NucleusContext
at [java.net](http://java.net/).URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 55 more

经排查,是缺少 datanucleus 相关的jar包,而在hive的lib中发现是有相关jar包的,直接cp到flink的lib中即可


hive datanucleus

再次运行demo,仍然报错:

2020-05-15 11:05:25,659 ERROR org.apache.hadoop.hive.metastore.RetryingHMSHandler - Retrying HMSHandler after 2000 ms (attempt 2 of 10) with error: javax.jdo.JDOFatalInternalException: Error creating transactional connection factory
at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:671)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:830)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:334)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:213)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965)
at java.security.AccessController.doPrivileged(Native Method)
at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960)
at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:650)
at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:693)
at org.apache.hadoop.hive.metastore.ObjectStore.initializeHelper(ObjectStore.java:483)
at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:420)
at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:375)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:77)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:137)
at org.apache.hadoop.hive.metastore.RawStoreProxy.<init>(RawStoreProxy.java:59)
at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:67)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStoreForConf(HiveMetaStore.java:718)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMSForConf(HiveMetaStore.java:696)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:690)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:773)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:538)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:147)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:108)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:80)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:93)
at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:8667)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:169)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:84)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:95)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:148)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.table.catalog.hive.client.HiveShimV310.getHiveMetastoreClient(HiveShimV310.java:98)
at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.<init>(HiveMetastoreClientWrapper.java:71)
at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:188)
at org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:102)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:235)
at com.example.flink.HiveWriteDemo.main(HiveWriteDemo.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
NestedThrowablesStackTrace:
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:606)
at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:330)
at org.datanucleus.store.AbstractStoreManager.registerConnectionFactory(AbstractStoreManager.java:203)
at org.datanucleus.store.AbstractStoreManager.<init>(AbstractStoreManager.java:162)
at org.datanucleus.store.rdbms.RDBMSStoreManager.<init>(RDBMSStoreManager.java:285)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:606)
at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:301)
at org.datanucleus.NucleusContextHelper.createStoreManagerForProperties(NucleusContextHelper.java:133)
at org.datanucleus.PersistenceNucleusContextImpl.initialise(PersistenceNucleusContextImpl.java:422)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:817)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:334)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:213)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965)
at java.security.AccessController.doPrivileged(Native Method)
at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960)
at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:650)
at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:693)
at org.apache.hadoop.hive.metastore.ObjectStore.initializeHelper(ObjectStore.java:483)
at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:420)
at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:375)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:77)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:137)
at org.apache.hadoop.hive.metastore.RawStoreProxy.<init>(RawStoreProxy.java:59)
at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:67)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStoreForConf(HiveMetaStore.java:718)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMSForConf(HiveMetaStore.java:696)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:690)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:773)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:538)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:147)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:108)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:80)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:93)
at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:8667)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:169)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:84)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:95)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:148)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.table.catalog.hive.client.HiveShimV310.getHiveMetastoreClient(HiveShimV310.java:98)
at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.<init>(HiveMetastoreClientWrapper.java:71)
at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:188)
at org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:102)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:235)
at com.example.flink.HiveWriteDemo.main(HiveWriteDemo.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.datanucleus.exceptions.NucleusException: Attempt to invoke the "HikariCP" plugin to create a ConnectionPool gave an error : The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver.
at org.datanucleus.store.rdbms.ConnectionFactoryImpl.generateDataSources(ConnectionFactoryImpl.java:232)
at org.datanucleus.store.rdbms.ConnectionFactoryImpl.initialiseDataSources(ConnectionFactoryImpl.java:117)
at org.datanucleus.store.rdbms.ConnectionFactoryImpl.<init>(ConnectionFactoryImpl.java:82)
... 90 more
Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException: The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver.
at org.datanucleus.store.rdbms.connectionpool.AbstractConnectionPoolFactory.loadDriver(AbstractConnectionPoolFactory.java:58)
at org.datanucleus.store.rdbms.connectionpool.HikariCPConnectionPoolFactory.createConnectionPool(HikariCPConnectionPoolFactory.java:66)
at org.datanucleus.store.rdbms.ConnectionFactoryImpl.generateDataSources(ConnectionFactoryImpl.java:213)
... 92 more

发现是缺少mysql的jar包,因为hive的metastore需要去msyql获取表结构等信息。
同样,在hive的lib中可以找到mysql的jar包,将它cp到flink的lib目录中。


hive mysql

再次运行demo,发现仍然报错:

------------------------------------------------------------
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to create Hive Metastore client
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client
at org.apache.flink.table.catalog.hive.client.HiveShimV310.getHiveMetastoreClient(HiveShimV310.java:100)
at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.<init>(HiveMetastoreClientWrapper.java:71)
at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:188)
at org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:102)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:235)
at com.example.flink.HiveWriteDemo.main(HiveWriteDemo.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 11 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.table.catalog.hive.client.HiveShimV310.getHiveMetastoreClient(HiveShimV310.java:98)
... 23 more
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:86)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:95)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:148)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
... 28 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:84)
... 31 more
Caused by: MetaException(message:javax/jdo/JDOQLTypedQuery)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:84)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:93)
at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:8667)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:169)
... 36 more
Caused by: java.lang.NoClassDefFoundError: javax/jdo/JDOQLTypedQuery
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.newPM(JDOPersistenceManagerFactory.java:881)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManager(JDOPersistenceManagerFactory.java:863)
at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManager(JDOPersistenceManagerFactory.java:844)
at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:693)
at org.apache.hadoop.hive.metastore.ObjectStore.initializeHelper(ObjectStore.java:483)
at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:420)
at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:375)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:77)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:137)
at org.apache.hadoop.hive.metastore.RawStoreProxy.<init>(RawStoreProxy.java:59)
at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:67)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStoreForConf(HiveMetaStore.java:718)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMSForConf(HiveMetaStore.java:696)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:690)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:767)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:538)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:147)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:108)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:80)
... 39 more
Caused by: java.lang.ClassNotFoundException: javax.jdo.JDOQLTypedQuery
at [java.net](http://java.net/).URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 62 more

经过排查,发现是缺少 javax.jdo 相关的jar
同样在hive的lib中可以找到,我们cp到flink的lib中。


hive javax.jdo

再次运行demo,报错:

------------------------------------------------------------
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to create Hive Metastore client
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Failed to create Hive Metastore client
at org.apache.flink.table.catalog.hive.client.HiveShimV310.getHiveMetastoreClient(HiveShimV310.java:100)
at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.createMetastoreClient(HiveMetastoreClientWrapper.java:240)
at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.<init>(HiveMetastoreClientWrapper.java:71)
at org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory.create(HiveMetastoreClientFactory.java:35)
at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:188)
at org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:102)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:235)
at com.example.flink.HiveWriteDemo.main(HiveWriteDemo.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 11 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.table.catalog.hive.client.HiveShimV310.getHiveMetastoreClient(HiveShimV310.java:98)
... 23 more
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:86)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:95)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:148)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
... 28 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:84)
... 31 more
Caused by: MetaException(message:org/antlr/runtime/RecognitionException)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:84)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:93)
at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:8667)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:169)
... 36 more
Caused by: java.lang.NoClassDefFoundError: org/antlr/runtime/RecognitionException
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:597)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:147)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:108)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:80)
... 39 more
Caused by: java.lang.ClassNotFoundException: org.antlr.runtime.RecognitionException
at [java.net](http://java.net/).URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 47 more

经过排查,是缺少 antlr 相关jar包。同样在hive的lib下找到相关jar包,cp到flink的lib中。


hive antlr

再次运行demo,报错:

------------------------------------------------------------
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1741)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
at org.apache.flink.table.planner.delegation.BatchExecutor.execute(BatchExecutor.java:44)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
at com.example.flink.HiveWriteDemo.main(HiveWriteDemo.java:33)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 11 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1736)
... 21 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:359)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:884)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:866)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152)
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'Sort(orderBy=[dt ASC]) -> SinkConversionToRow -> Sink: Unnamed': Loading the input/output formats failed:
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:255)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:227)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:215)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
... 10 more
Caused by: java.lang.Exception: Loading the input/output formats failed:
at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:156)
at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:60)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:212)
... 20 more
Caused by: java.lang.RuntimeException: Deserializing the input/output formats failed: org/apache/hadoop/mapred/JobConf
at org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:68)
at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initInputOutputformatContainer(InputOutputFormatVertex.java:153)
... 22 more
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/JobConf
at java.lang.Class.getDeclaredFields0(Native Method)
at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
at java.lang.Class.getDeclaredField(Class.java:2068)
at [java.io](http://java.io/).ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1857)
at [java.io](http://java.io/).ObjectStreamClass.access$700(ObjectStreamClass.java:79)
at [java.io](http://java.io/).ObjectStreamClass$3.run(ObjectStreamClass.java:506)
at [java.io](http://java.io/).ObjectStreamClass$3.run(ObjectStreamClass.java:494)
at java.security.AccessController.doPrivileged(Native Method)
at [java.io](http://java.io/).ObjectStreamClass.<init>(ObjectStreamClass.java:494)
at [java.io](http://java.io/).ObjectStreamClass.lookup(ObjectStreamClass.java:391)
at [java.io](http://java.io/).ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)
at [java.io](http://java.io/).ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1939)
at [java.io](http://java.io/).ObjectInputStream.readClassDesc(ObjectInputStream.java:1805)
at [java.io](http://java.io/).ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2096)
at [java.io](http://java.io/).ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at [java.io](http://java.io/).ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at [java.io](http://java.io/).ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at [java.io](http://java.io/).ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at [java.io](http://java.io/).ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at [java.io](http://java.io/).ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at [java.io](http://java.io/).ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at [java.io](http://java.io/).ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at [java.io](http://java.io/).ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at [java.io](http://java.io/).ObjectInputStream.readObject(ObjectInputStream.java:464)
at [java.io](http://java.io/).ObjectInputStream.readObject(ObjectInputStream.java:422)
at java.util.HashMap.readObject(HashMap.java:1412)
at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at [java.io](http://java.io/).ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at [java.io](http://java.io/).ObjectInputStream.readSerialData(ObjectInputStream.java:2232)
at [java.io](http://java.io/).ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at [java.io](http://java.io/).ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at [java.io](http://java.io/).ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at [java.io](http://java.io/).ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at [java.io](http://java.io/).ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at [java.io](http://java.io/).ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at [java.io](http://java.io/).ObjectInputStream.defaultReadFields(ObjectInputStream.java:2341)
at [java.io](http://java.io/).ObjectInputStream.readSerialData(ObjectInputStream.java:2265)
at [java.io](http://java.io/).ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2123)
at [java.io](http://java.io/).ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at [java.io](http://java.io/).ObjectInputStream.readObject(ObjectInputStream.java:464)
at [java.io](http://java.io/).ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66)
... 23 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf
at [java.net](http://java.net/).URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 72 more

End of exception on server side>]
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 4 more

发现是缺少 hadoop 的mapreduce相关的jar包。因为我没有将官网写的的 flink-shaded-hadoop-2-uber-2.7.5-8.0.jar 放入lib中。
因为我们用的是hadoop3.2.1版本,flink官方没有提供这个版本的 uber jar。经过测试,将该 hadoop2 的uber jar放入或者将 mapreduce 的相关jar包放入都是是可以的

说明:这里比较奇怪的是,我们启动的flink集群是 on-yarn 模式,在hadoop所有机器都配置了 HADOOP_CLASSPATH 环境变量,按理说hadoop的依赖都应该加载了的,无论任务启动在哪台机器,都可以找到hadoop的依赖,但是这里还是报错找不到mapreduce的相关类。

加入 hadoop2 的uber jar后,再次重启flink集群,运行demo,就可以正常运行了。
最终,flink的lib中的jar包有:


flink lib

本地开发调试

关于依赖,与上文类似,只是情况还有一些复杂,先附上可以使用的pom文件的依赖

<dependencies>
        <!-- 这里是跟项目/业务相关的依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!-- 下面是整理好的依赖 -->

        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>


        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

        <!-- 以下是本地运行时,需要的依赖,因为服务器的lib目录中存在对应的jar包,
        因此下面的所有依赖都标上了 provided 的标志,打包时不会被打进去,对项目无影响 -->
        <!-- 因为有些jar包的类有冲突,因此运行前IDEA中操作如下:
         1.在项目名称上右键
         2.选择 Open Module Setting (在倒数四五个)
         3.点击左侧 Libraries
         4.删除 org.apache.hive:hive-exec 依赖
         5.运行项目即可-->
        <!-- flink-hive -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.calcite.avatica</groupId>
                    <artifactId>avatica-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- Hive Dependency -->
        <dependency>
            <groupId>com.example.demo</groupId>
            <artifactId>hive-exec-shaded</artifactId>
            <version>1.0-SNAPSHOT</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.thrift</groupId>
            <artifactId>libfb303</artifactId>
            <version>0.9.3</version>
            <type>pom</type>
        </dependency>
        <!-- 添加服务器上lib目录中的jar -->
        <dependency>
            <groupId>org.datanucleus</groupId>
            <artifactId>datanucleus-api-jdo</artifactId>
            <version>4.2.4</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.datanucleus</groupId>
            <artifactId>datanucleus-core</artifactId>
            <version>4.1.17</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.datanucleus</groupId>
            <artifactId>datanucleus-rdbms</artifactId>
            <version>4.1.19</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.datanucleus</groupId>
            <artifactId>javax.jdo</artifactId>
            <version>3.2.0-m12</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.48</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.antlr</groupId>
            <artifactId>antlr-runtime</artifactId>
            <version>3.5.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.antlr</groupId>
            <artifactId>antlr4-runtime</artifactId>
            <version>4.5</version>
            <scope>provided</scope>
        </dependency>

        <!-- mapreduce -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.2.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>3.2.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>3.2.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.2.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.2.1</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

1.flink的lib目录中 hadoop2-uber.jar 包替换为了 hadoop3.2.1以及mapreduce相关的jar
2.根据官网示例,pom中需要依赖 flink-table-planner-blink 以及 flink-table-api-java-bridge
3.可以看到 hive-exec.jar 的依赖被替换了
如果直接使用

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive-version}</version>
</dependency>

的话,本地开发启动时,会报错,与上文启动集群时类似

Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
at org.apache.hadoop.conf.Configuration.set(Configuration.java:1357)
at org.apache.hadoop.conf.Configuration.set(Configuration.java:1338)
at org.apache.hadoop.mapred.JobConf.setJar(JobConf.java:536)
at org.apache.hadoop.mapred.JobConf.setJarByClass(JobConf.java:554)
at org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:448)
at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:5141)
at org.apache.hadoop.hive.conf.HiveConf.<init>(HiveConf.java:5109)
at org.apache.flink.table.catalog.hive.HiveCatalog.createHiveConf(HiveCatalog.java:171)
at org.apache.flink.table.catalog.hive.HiveCatalog.<init>(HiveCatalog.java:140)

这是因为 hive-exec.jar 中含有 com.google.common.base.xxx 的类,如下图:


hive-exec.jar

这些类,来自 com.google.guava:guava:19.0 版本,该版本的 Preconditions 类没有 checkArgument(String,Object) 方法,所以报错。
而这些类被打包到 hive-exec.jar 包里面去了,只要我们在pom文件中引入 hive-exec 的依赖,就会自动下载 hive-exec.jar,即便我们使用 exclusions 标签剔除 com.google.guava:guava 这个依赖也没作用,因为相关的类存在于 hive-exec.jar ,而我们又不能剔除 hive-exec.jar ,因为我们本地调试运行程序,都需这个jar包的功能。
此时,需要我们对jar包做下改动,剔除 hive-exec.jar 中的 guava 相关类,或者重命名,使得程序在本地运行时,通过 com.google.common.base.Preconditions 路径无法在 hive-exec.jar 中找到相关的类。
因此,可以创建一个新项目,不包含任何代码,只是对 hive-exec 进行重新打包,项目的pom如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example.demo</groupId>
    <artifactId>hive-exec-shaded</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <hive-version>3.1.2</hive-version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.10.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive-version}</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <configuration>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <includes>
                                        <include>hive-exec-log4j2.properties</include>
                                        <include>hive-log4j2.properties</include>
                                        <include>package.jdo</include>
                                        <include>parquet-logging.properties</include>
                                        <include>parquet.thrift</include>
                                        <include>tez-container-log4j2.properties</include>

                                        <include>au/com/bytecode/opencsv/**</include>
                                        <include>com/**</include>
                                        <include>io/airlift/**</include>
                                        <include>javaewah/**</include>
                                        <include>javax/jdo/**</include>
                                        <include>javax/realtime/**</include>
                                        <include>META-INF/maven/**</include>
                                        <include>META-INF/org/**</include>
                                        <include>META-INF/services/**</include>
                                        <include>META-INF/ASL2.0</include>
                                        <include>META-INF/DEPENDENCIES</include>
                                        <include>META-INF/LICENSE</include>
                                        <include>META-INF/LICENSE.txt</include>
                                        <include>META-INF/MANIFEST.MF</include>
                                        <include>META-INF/NOTICE</include>
                                        <include>META-INF/NOTICE.txt</include>

                                        <include>org/json/**</include>
                                        <include>org/joda/**</include>
                                        <include>org/apache/avro/**</include>
                                        <include>org/apache/hive/**</include>
                                        <include>org/apache/orc/**</include>
                                        <include>org/apache/parquet/**</include>
                                        <include>org/apache/tez/**</include>
                                        <include>org/apache/thrift/**</include>
                                        <include>org/apache/commons/lang/**</include>
                                        <include>org/apache/commons/lang3/**</include>
                                        <include>org/apache/hadoop/mapred/**</include>
                                        <include>org/apache/hadoop/hive/**</include>
                                        <include>org/apache/hadoop/fs/DefaultFileAccess.class</include>
                                        <include>org/apache/hadoop/fs/ProxyFileSystem.class</include>
                                        <include>org/apache/hadoop/fs/ProxyLocalFileSystem$PFileChecksum.class</include>
                                        <include>org/apache/hadoop/fs/ProxyLocalFileSystem.class</include>
                                        <include>
                                            org/apache/hadoop/security/token/delegation/HiveDelegationTokenSupport.class
                                        </include>
                                        <include>
                                            org/apache/hadoop/security/token/delegation/MetastoreDelegationTokenSupport.class
                                        </include>
                                        <include>org/codehaus/jackson/**</include>

                                        <include>shaded/**</include>
                                        <include>javolution/**</include>
                                        <include>jodd/**</include>
                                    </includes>
                                    
                                </filter>
                            </filters>
                            <relocations>
                                <relocation>
                                    <pattern>com.google.common</pattern>
                                    <shadedPattern>com.cerence.shaded.common</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.google.protobuf</pattern>
                                    <shadedPattern>com.cerence.shaded.protobuf</shadedPattern>
                                </relocation>
                                <relocation>
                                    <pattern>com.google.thirdparty</pattern>
                                    <shadedPattern>com.cerence.shaded.thirdparty</shadedPattern>
                                </relocation>

                            </relocations>
                            
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

主要是使用mavne的 maven-shade-plugin 插件,将 com.google.common.* 路径进行重命名(或剔除)即可。
然后在应用时,引入的不是 hive-exec 依赖,而是依赖我们shaded后的项目即可。
依赖shaded的项目后,会自动下载 hive-exec-shaded.jar 以及 hive-exec.jar, 而我们不需要 hive-exec.jar,因为我们已经对 hive-exec.jar 做了修改,因此,需要在启动前,将 hive-exec.jar 剔除,不出现在classpath中。步骤如下:
1.项目名称上右键,选择 “Open Module Settings” (倒数第6个选项)


step1
  1. 删除 org.apache.hive:hive-exec 依赖


    step2

3.确定

本地开发调试提示

1.因为写入hive是要将内容写到HDFS上,因此本地开发时,可能会遇到写入HDFS的权限问题,可以在IDEA中启动前,设置HADOOP_USER_NAME的环境变量


IDEA Configuration

2.我们的依赖的scope大都是 provided ,而我们在调试时需要这些依赖存在于classpath中,因此,上图中,也设置 "Include dependencies with "Provided" scope"

3.目前flink集成hive是使用的Blink的table-planner,且只能使用 Batch 模式。这会使得我们使用时存在局限。

以上三个局限,第1个问题是可以理解的。如果有需要可以改写下源码,即可解决2,3两个问题。

org.apache.flink.connectors.hive.HiveTableSink 类的声明如下:


HiveTableSink

这就是为什么,当前默认的flink与hive集成,只能用于Batch模式,因为 HiveTabelSink 只继承了 OutputFormatTableSink 接口,该接口表示这是一个用于 Batch 模式的sink表。

如果简单改写下,使 HiveTableSink 实现 AppendStreamTableSink/ RetractStreamTableSink/ UpsertStreamTableSink 其中一个接口,即可在 BlinkPlanner 的 Stream 模式下工作。

而在 BlinkPlanner 的 Stream 模式下,就可以将 Table 转为 DataStream API ,对数据进行更灵活的操作了。这样第2点与第3点的局限性就不存在了。

4.本地开发调试代码时,根据目前api的要求,需要指定 hive-site.xml 文件的位置,我们将服务器上的 hive-site.xml 下载下来后,记得查看下里面的连接信息,尤其是ip等,将 0.0.0.0 替换为真实ip,以便我们在本地可以远程访问。

Git相关项目

hive-exec-shaded 项目:
https://github.com/935205406/hive-exec-shaded

flink-hive-demo 项目:
https://github.com/935205406/flink-hive-integration-demo

上一篇 下一篇

猜你喜欢

热点阅读