flink-on-k8s Application-mode 问题

2021-07-13  本文已影响0人  邵红晓

问题1

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that 
implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an 
error: Table sink 'default_catalog.default_database.fs_table' doesn't support consuming update changes 
which is 
produced by node GroupAggregate(groupBy=[id, channel], select=[id, channel, COUNT(*) AS cnt])
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) ~[flink-
dist_2.12-1.13.0.jar:1.13.0]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.j
ava:222) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.0.jar:1.13.0]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(App
licationDispatcherBootstrap.java:242) ~[flink-dist_2.12-1.13.0.jar:1.13.0]

解决

1、注意引用,flink-connector-kafka_2.12和flink-sql-connector-kafka_2.12不要同时出现在pom.xml

      <!--flink 算子使用-->
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-connector-kafka_2.12</artifactId>
           <version>${flink.version}</version>
       </dependency>
       <!--flink sql 算子使用-->
       <dependency>
           <groupId>org.apache.flink</groupId>
           <artifactId>flink-sql-connector-kafka_2.12</artifactId>
           <version>${flink.version}</version>
       </dependency>

2、Flink 加载 table Factory 使用的时SPI机制,而我们打的的flink jar包是不包含META-INF.services


image.png

目录自己建好,并且要打入jar中
maven打包插件,将META-INF.services目录下的文件打入jar中,一下是maven插件

         <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.0</version>
                <configuration>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                    <transformers>
                        <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                            <resource>META-INF/spring.handlers</resource>
                        </transformer>
                        <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <mainClass>com.fxc.rpc.impl.member.MemberProvider</mainClass>
                        </transformer>
                        <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                            <resource>META-INF/spring.schemas</resource>
                        </transformer>
                    </transformers>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

org.apache.flink.table.factories.TableFactory内容

org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory

org.apache.flink.table.factories.Factory内容

org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory

问题2

Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink 
and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems,
 please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/. 
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies. 

Unable to mount volumes for pod "my-second-cluster-taskmanager-1-2_flink(e6164522-e2f6-11eb-b173-
eeeeeeeeeeee)": timeout expired waiting for volumes to attach or mount 
for pod "flink"/"my-second-cluster-taskmanager-1-2". list of unmounted volumes=[hadoop-config-volume]. 

list of unattached volumes=[hadoop-config-volume flink-config-volume default-token-qbpfh]
MountVolume.SetUp failed for volume "flink-config-volume" : configmap "flink-config-my-second-cluster" not found

解决

1、1.11版本以后可以直接在Flink Client的机器上(提交作业的机器上,该机器上需要有kubectl客户端环境以及flink环境)
export HADOOP_CONF_DIR然后运行flink run-application启动Flink任务,这样Flink Client会自动通过ConfigMap将Hadoop配置ship到JobManager和TaskManager pod并且加到classpath的
flink-1.13.0/bin/config.sh新增一行export HADOOP_CONF_DIR=/var/lib/jenkins/flink-k8s/file/hadoopconf
内容如下:$ ls hadoopconf/
core-site.xml hdfs-site.xml yarn-site.xml

2、打包镜像Dockerfile也需要指定
COPY hadoopconf/*.xml $FLINK_HOME/hadoopconf/
ENV HADOOP_CONF_DIR=$FLINK_HOME/hadoopconf/
3、依赖的hadoop jar有问题
下载(也可以自己编译)flink Pre-bundled Hadoop 2.7.5 https://flink.apache.org/downloads.html
jar导入到$FLINK_HOME/lib中

问题3

Caused by: org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: No task slot 
allocated for job ID 7b1c33d4ebbe8d47623c52d33b838f4a and allocation ID 56306de50eed492c961f884c131b2f9e. 

解决

直接加大taskmanager内存解决-Dtaskmanager.memory.process.size=1024m

问题4

java.net.UnknownHostException: xxx-hadoop

解决

修改k8s组件中coredns confmap配置,重新部署pod

.:53 {
errors
health
hosts {
ip0 hostname0
ip1 hostname1
...
}
...
}

问题5
flink 命令参数 -C 使用问题
直接使用maven仓库http://maven.aliyun.com/nexus/content/groups/public/org/apache/flink/flink-sql-connector-kafka_2.12/1.13.0/flink-sql-connector-kafka_2.12-1.13.0.jar
拉取jar包,发现无法拉取成功,验证不是网络不通问题

解决
修改为linux httpd服务,将jar放入/var/www/html/jar/ 目录下可以实现自动拉取

参考
https://blog.csdn.net/qq_31866793/article/details/114883944
https://blog.csdn.net/u013516966/article/details/106536525
https://segmentfault.com/a/1190000039198813
https://segmentfault.com/a/1190000023280126
http://apache-flink.147419.n8.nabble.com/Flink-on-k8s-1-11-3-hdfs-taskmanager-td9907.html
https://blog.csdn.net/cenjianteng/article/details/102654070

上一篇 下一篇

猜你喜欢

热点阅读