WebClient进阶之合并多个Mono为一个Flux
2022-09-24 本文已影响0人
Hiper
在项目中遇到一个问题,需要请求多个地址并将结果收集起来,借助WebClient实现了这个功能。
主要思路是通过WebClient创建多个Mono
,然后通过Flux.merge()
方法将List<Mono<String>>
转化为Mono<List<String>>
,然后调用block()
请求全部完成。
首先编写WebClient配置类
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient() {
//配置固定大小连接池
ConnectionProvider provider = ConnectionProvider
.builder("tax-core")
// 等待超时时间
.pendingAcquireTimeout(Duration.ofSeconds(500))
// 最大连接数
.maxConnections(5)
// 等待队列大小
.pendingAcquireMaxCount(1000)
.build();
HttpClient httpClient = HttpClient.create(provider);
// 使用Reactor
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
}
}
然后是测试类,两个Get映射,一个模拟网络接口,一个为测试。
@RestController
public class TestController {
@Autowired
WebClient webClient;
/**
* 测试合并多个结果
* @return
* @throws InterruptedException
*/
@GetMapping("/test/mono/merge")
public List<String> testMerge() throws InterruptedException {
List<Mono<String>> monoList = new ArrayList<>();
for(int i=0; i<30; i++) {
Mono<String> mono = webClient.get()
.uri("http://localhost:8080/test/{num}", i)
.retrieve()
.bodyToMono(String.class);
monoList.add(mono);
}
Flux<String> flux = Flux.merge(monoList);
//阻塞当前线程直到所有请求完成
return return flux.collectList().block();
}
/**
* 模拟网络响应
* @param i 参数
* @return 字符串
* @throws InterruptedException
*/
@GetMapping("/test/{i}")
public String testMerge(@PathVariable int i) throws InterruptedException {
Thread.sleep(1000L);
return "test is " + i;
}
}
结果如下:
[test is 1, test is 2, test is 3, test is 4, test is 0, test is 6, test is 7, test is 5, test is 9, test is 8, test is 10, test is 11, test is 12, test is 13, test is 14, test is 15, test is 16, test is 17, test is 18, test is 19, test is 23, test is 24, test is 20, test is 22, test is 21, test is 25, test is 29, test is 28, test is 26, test is 27]