二、Spring Cloud Stream整合Kafka

2022-04-05  本文已影响0人  一介书生独醉江湖
使用idea创建两个module , kafka-producer , kafka-consumer
简书链接:https://www.jianshu.com/p/d7771682688b
1)父级 (kafka-test) pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <!--<relativePath/> &lt;!&ndash; lookup parent from repository &ndash;&gt;-->
    </parent>
    <groupId>com.example.test</groupId>
    <artifactId>kafka-test</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>pom</packaging>
    <name>kafka-test</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.SR5</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <!--<version>2.6.4</version>  去掉这一行 -->
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                <excludes>
                    <exclude>
                        <groupId>org.projectlombok</groupId>
                        <artifactId>lombok</artifactId>
                    </exclude>
                </excludes>
            </configuration>
            </plugin>
        </plugins>
    </build>
    <modules>
        <module>kafka-producer</module>
        <module>kafka-consumer</module>
    </modules>
</project>

子级(生产者 kafka-producer)  pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.example.test</groupId>
        <artifactId>kafka-test</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <!--<relativePath/> &lt;!&ndash; lookup parent from repository &ndash;&gt;-->
    </parent>
    <groupId>com.example.test</groupId>
    <artifactId>kafka-producer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-producer</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
子级(消费者 kafka-consumer)  pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.example.test</groupId>
        <artifactId>kafka-test</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <!--<relativePath/> &lt;!&ndash; lookup parent from repository &ndash;&gt;-->
    </parent>
    <groupId>com.example.test</groupId>
    <artifactId>kafka-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafka-consumer</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

子级(生产者 kafka-producer)  application.yml
server:
  port: 8181

spring:
  application:
    name: kafka_producer
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092   #Kafka的消息中间件服务器
          zk-nodes: localhost:2181  #Zookeeper的节点,如果集群,后面加,号分隔
          auto-create-topics: true  #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
      bindings:
        stream-demo:                          #这里可以任意写,消费者应与之一致
          destination: custom-message-topic   #这里可以任意写,消费者应与之一致,消息发往的目的地
          content-type: application/json      #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain

子级(消费者 kafka-consumer)  application.yml
server:
  port: 8081

spring:
  application:
    name: kafka_consumer
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092   #Kafka的消息中间件服务器
          zk-nodes: localhost:2181  #Zookeeper的节点,如果集群,后面加,号分隔
          auto-create-topics: true  #如果设置为false,就不会自动创建Topic 有可能你Topic还没创建就直接调用了。
      bindings:
        stream-demo:                          #这里可以任意写,生产者应与之一致
          destination: custom-message-topic   #这里可以任意写,生产者应与之一致,消息发往的目的地
          content-type: application/json      #消息发送的格式,接收端不用指定格式,但是发送端要; 文本则为 text/plain

子级(生产者 kafka-producer) 
package com.example.test.kafkaproducer.test;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * @Author ds
 * @Date 2022-04-02
 */
public interface StreamClient {

    String STREAM_DEMO = "stream-demo";

    @Output(StreamClient.STREAM_DEMO)
    MessageChannel streamDataOut();


}

package com.example.test.kafkaproducer.test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author ds
 * @Date 2022-04-02
 */
@RestController
@EnableBinding(StreamClient.class)
public class TestController {

    @Autowired
    private StreamClient streamClient;

    @GetMapping("/produce")
    public String produce(){
        for(int i = 0; i < 100 ; i++){
            streamClient.streamDataOut().send(MessageBuilder.withPayload("消息" + i).build());
        }
        return "成功";
    }
}

子级(消费者 kafka-consumer) 
package com.example.test.kafkaconsumer.test;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * @Author ds
 * @Date 2022-04-02
 */
public interface StreamClient {

    String STREAM_DEMO = "stream-demo";

    @Input(StreamClient.STREAM_DEMO)
    SubscribableChannel streamDataInput();
}
package com.example.test.kafkaconsumer.test;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

/**
 * @Author ds
 * @Date 2022-04-02
 */
@Slf4j
@EnableBinding(StreamClient.class)
public class ReceiveData {

    @StreamListener(StreamClient.STREAM_DEMO)
    public void consume(String message){
        log.info("接收消息: {} " , message);
    }
}
调用接口http://localhost:8181/produce
image.png image.png
配置过程中遇到的问题以及处理记录 : 
遇到问题1: 使用idea自带maven工具clean出现以下错误

[ERROR] [ERROR] Some problems were encountered while processing the POMs:
[FATAL] Non-resolvable parent POM for com.example.test:kafka-producer:0.0.1-SNAPSHOT: Could not find artifact com.example.test:kafka-test:pom:0.0.1-SNAPSHOT and 'parent.relativePath' points at no local POM @ line 5, column 10
[FATAL] Non-resolvable parent POM for com.example.test:kafka-consumer:0.0.1-SNAPSHOT: Could not find artifact com.example.test:kafka-test:pom:0.0.1-SNAPSHOT and 'parent.relativePath' points at no local POM @ line 5, column 13
 @ 
[ERROR] The build could not read 2 projects -> [Help 1]
[ERROR]   
[ERROR]   The project com.example.test:kafka-producer:0.0.1-SNAPSHOT (/Users/ds/Documents/gate/code/kafka-test/kafka-producer/pom.xml) has 1 error
[ERROR]     Non-resolvable parent POM for com.example.test:kafka-producer:0.0.1-SNAPSHOT: Could not find artifact com.example.test:kafka-test:pom:0.0.1-SNAPSHOT and 'parent.relativePath' points at no local POM @ line 5, column 10 -> [Help 2]
[ERROR]   
[ERROR]   The project com.example.test:kafka-consumer:0.0.1-SNAPSHOT (/Users/ds/Documents/gate/code/kafka-test/kafka-consumer/pom.xml) has 1 error
[ERROR]     Non-resolvable parent POM for com.example.test:kafka-consumer:0.0.1-SNAPSHOT: Could not find artifact com.example.test:kafka-test:pom:0.0.1-SNAPSHOT and 'parent.relativePath' points at no local POM @ line 5, column 13 -> [Help 2]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/ProjectBuildingException
[ERROR] [Help 2] http://cwiki.apache.org/confluence/display/MAVEN/UnresolvableModelException
解决过程1:
在terminal中使用mvn clean install 命令重新清理编译打包,会输出具体的错误信息

[ERROR] COMPILATION ERROR : 
[INFO] -------------------------------------------------------------
[ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/StreamClient.java:[3,51] 程序包org.springframework.cloud.stream.annotation不存在
[ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/StreamClient.java:[4,37] 程序包org.springframework.messaging不存在
[ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/StreamClient.java:[15,5] 找不到符号
  符号:   类 MessageChannel
  位置: 接口 com.example.test.kafkaproducer.test.StreamClient
[ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/TestController.java:[3,45] 程序包org.springframework.messaging.support不存在
[ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/StreamClient.java:[14,6] 找不到符号
  符号:   类 Output
  位置: 接口 com.example.test.kafkaproducer.test.StreamClient
[ERROR] /Users/ds/Documents/gate/code/kafka-test/kafka-producer/src/main/java/com/example/test/kafkaproducer/test/TestController.java:[22,47] 找不到符号
......省略部分异常(太长)

解决过程2:
把pom.xml中的 <artifactId>spring-cloud-dependencies</artifactId>重新编写一下,点击Import Changes(自动引入的不用)

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

重新mvn clean install

......省略部分异常(太长)
Caused by: java.lang.NoClassDefFoundError: Lorg/springframework/kafka/listener/CommonErrorHandler;
        at java.lang.Class.getDeclaredFields0(Native Method) ~[na:1.8.0_201]
        at java.lang.Class.privateGetDeclaredFields(Class.java:2583) ~[na:1.8.0_201]
        at java.lang.Class.getDeclaredFields(Class.java:1916) ~[na:1.8.0_201]
        at org.springframework.util.ReflectionUtils.getDeclaredFields(ReflectionUtils.java:738) ~[spring-core-5.3.18.jar:5.3.18]
        ... 82 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.springframework.kafka.listener.CommonErrorHandler
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_201]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_201]
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) ~[na:1.8.0_201]
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_201]
        ... 86 common frames omitted


解决过程3:
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <!--<version>2.6.4</version>  去掉这一行 -->
        </dependency>

  因为spring boot 自己管理版本,用了spring boot,又自己指定了版本所以导致jar包冲突
  把pom中的version移除,使用spring boot管理的版本

重新mvn clean install

......省略部分异常(太长)
Description:

A component required a bean of type 'com.example.test.kafkaproducer.test.StreamClient' that could not be found.
Action:
Consider defining a bean of type 'com.example.test.kafkaproducer.test.StreamClient' in your configuration.
......省略部分异常(太长)
java.lang.IllegalStateException: Failed to load ApplicationContext
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'testController': Injection of resource dependencies failed; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.example.test.kafkaproducer.test.StreamClient' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@javax.annotation.Resource(shareable=true, lookup=, name=, description=, authenticationType=CONTAINER, type=class java.lang.Object, mappedName=)}
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'com.example.test.kafkaproducer.test.StreamClient' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@javax.annotation.Resource(shareable=true, lookup=, name=, description=, authenticationType=CONTAINER, type=class java.lang.Object, mappedName=)}

解决过程4:
在TestController中加入注解
@EnableBinding(StreamClient.class)

重新mvn clean install 成功

[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 9.349 s
参考:
https://start.spring.io/
https://blog.csdn.net/Liyq_19/article/details/123740533
https://www.cnblogs.com/owenma/p/15463237.html
https://blog.csdn.net/qq_40708522/article/details/123842483
https://blog.csdn.net/cckevincyh/article/details/107645351?utm_medium=distribute.pc_aggpage_search_result.none-task-blog-2~aggregatepage~first_rank_ecpm_v1~rank_v31_ecpm-1-107645351.pc_agg_new_rank&utm_term=stream%E6%B3%A8%E5%85%A5%E4%B8%8D%E4%BA%86MessageChannel%E7%9A%84bean&spm=1000.2123.3001.4430

上一篇下一篇

猜你喜欢

热点阅读