我爱编程程序员知识共享

Spring Integration Java DSL初识

2018-09-03  本文已影响2人  老老戟

1。简介

在本教程中,我们将了解Java for creating the spring整合integrations DSL应用。
我们把文件移动整合我们built in介绍春Integration and use the DSL相反。

2。依赖关系

Spring集成Java DSL是Spring集成内核的一部分。
因此,我们可以添加依赖:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-core</artifactId>
    <version>5.0.6.RELEASE</version>
</dependency>

为了处理我们的文件移动应用程序,我们还需要Spring集成文件:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
    <version>5.0.6.RELEASE</version>
</dependency>

3。Spring集成Java DSL

在Java DSL之前,用户将在XML中配置Spring集成组件。DSL引入了一些FLUENT建设者,从中我们可以很容易地创建一个完整的Spring集成管道,完全是在Java中。所以,假设我们想要创建一个通道,它可以通过管道传输任何数据。在过去,我们的做法:

<int:channel id="input"/>

<int:transformer input-channel="input" expression="payload.toUpperCase()" />

现在我们可以这样做:

@Bean
public IntegrationFlow upcaseFlow() {
    return IntegrationFlows.from("input")
      .transform(String::toUpperCase)
      .get();
}

4。文件移动应用程序

为了开始我们的文件移动集成,我们需要一些简单的构建块。

4.1。集成流程我们需要的第一个构建块是集成流,我们可以从集成流生成器中得到:

IntegrationFlows.from(...)

可以从几个类型,但在本教程中,我们将描述三种方法:

    MessageSources
    MessageChannels, and
    Strings

我们很快就会讨论这三个问题。

在我们调用之后,我们现在可以使用一些定制方法:

IntegrationFlow flow = IntegrationFlows.from(sourceDirectory())
  .filter(onlyJpgs())
  .handle(targetDirectory())
  // add more components
  .get();

最后,IntegrationFlows将始终生成一个IntegrationFlow实例,它是任何Spring Integration应用程序的最终产品。

这种获取输入、执行适当的转换和发出结果的模式是所有Spring Integration应用程序的基础。

4.2。描述输入源

首先,为了移动文件,我们需要向集成流指示应该在哪里查找文件,为此,我们需要MessageSource:

@Bean
public MessageSource<File> sourceDirectory() {
  // .. create a message source
}

简单地说,MasaSeCURCE是一个可以在应用程序外部产生消息的地方。

更具体地说,我们需要一些能够将外部源适配到Spring消息表示的东西。由于这种适应集中于输入,这些通常被称为输入通道适配器。

spring-integration-file依赖关系为我们提供了一个输入通道适配器,这对我们的用例非常有用:FileReadMessageSource:

@Bean
public MessageSource<File> sourceDirectory() {
    FileReadingMessageSource messageSource = new FileReadingMessageSource();
    messageSource.setDirectory(new File(INPUT_DIR));
    return messageSource;
}

这里,我们的FieleAdIdMeasGeSueCE将读取由IpPudidir提供的目录,并从中创建一个消息源。

让我们将它指定为一个集成流中的源。

4.3。配置输入源

现在,如果我们认为这是一个长寿的应用程序,我们可能希望能够在文件进入时注意到它们,而不仅仅是在启动时移动已经存在的文件。

为了便于这一点,还可以采取额外的配置器作为输入源的进一步定制:

IntegrationFlows.from(sourceDirectory(), configurer -> configurer.poller(Pollers.fixedDelay(10000)));

在这种情况下,我们可以通过告诉Spring Integration每10秒轮询该源(本例中为文件系统)来增强输入源的弹性。

当然,这不适用于我们的文件输入源,我们可以将这个查询器添加到任何消息源中。

4.4。从输入源过滤消息

接下来,假设我们希望我们的文件移动应用程序只移动特定的文件,比如说有JPG扩展的图像文件。

为此,我们可以使用GenericSelector:

@Bean
public GenericSelector<File> onlyJpgs() {
    return new GenericSelector<File>() {
 
        @Override
        public boolean accept(File source) {
          return source.getName().endsWith(".jpg");
        }
    };
}

因此,让我们再次更新我们的集成流程:

IntegrationFlows.from(sourceDirectory())
  .filter(onlyJpgs());

或者,因为这个过滤器非常简单,我们可以用lambda定义它

IntegrationFlows.from(sourceDirectory())
  .filter(source -> ((File) source).getName().endsWith(".jpg"));

4.5。使用服务激活器处理消息

现在我们有了一个过滤的文件列表,我们需要把它们写入一个新的位置。

当我们考虑Spring集成中的输出时,服务激活器就是我们要做的。

让我们从Spring集成文件中使用FrimWrreMexAgServices服务激活器:

@Bean
public MessageHandler targetDirectory() {
    FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
    handler.setFileExistsMode(FileExistsMode.REPLACE);
    handler.setExpectReply(false);
    return handler;
}

这里,我们的文件写入消息处理程序将把它接收到的每个消息有效载荷写入OutPuxDIR。

再次,让我们更新:

IntegrationFlows.from(sourceDirectory())
  .filter(onlyJpgs())
  .handle(targetDirectory());

顺便说一下,StEngEnTurPress的用法。因为集成流可以是双向的,所以这个调用指示这个特定的管道是一种方式。

4.6。激活我们的集成流程

当我们添加了所有的组件时,我们需要将集成流注册为bean来激活它:

@Bean
public IntegrationFlow fileMover() {
    return IntegrationFlows.from(sourceDirectory(), c -> c.poller(Pollers.fixedDelay(10000)))
      .filter(onlyJpgs())
      .handle(targetDirectory())
      .get();
}

get方法提取一个需要注册为Spring bean的集成流实例。

一旦我们的应用程序上下文加载,我们的集成流中包含的所有组件就被激活。

现在,我们的应用程序将开始将文件从源目录移动到目标目录。

5。附加组件

在基于DSL的文件移动应用程序中,我们创建了一个入站通道适配器、一个消息过滤器和一个服务激活器。

让我们看看其他常见的Spring集成组件,看看我们如何使用它们。

5.1。消息通道

如前所述,消息通道是初始化流的另一种方式:

IntegrationFlows.from("anyChannel")

我们可以把它叫做“请查找或创建一个叫做任意通道的频道bean”。然后,读取从其他流输入到任何通道的任何数据。

但是,实际上它比这更通用。

简单地说,一个频道抽象出消费者的生产者,我们可以把它看作一个Java队列。可以在流中的任意点插入通道。

比如说,当文件从一个目录移动到下一个目录时,我们希望对文件进行优先级排序:

@Bean
public PriorityChannel alphabetically() {
    return new PriorityChannel(1000, (left, right) -> 
      ((File)left.getPayload()).getName().compareTo(
        ((File)right.getPayload()).getName()));
}

然后,我们可以在流之间插入一个调用:

@Bean
public IntegrationFlow fileMover() {
    return IntegrationFlows.from(sourceDirectory())
      .filter(onlyJpgs())
      .channel("alphabetically")
      .handle(targetDirectory())
      .get();
}

有几十个通道可供选择,其中一些比较方便的通道用于并发、审计或中间持久性(比如Kafka或JMS缓冲区)。

此外,当与桥结合时,信道可以是强大的。

5.2。桥

当我们想要组合两个通道时,我们使用一个桥。

让我们设想一下,不是直接写入输出目录,而是将文件移动应用程序写入另一个通道:

@Bean
public IntegrationFlow fileReader() {
    return IntegrationFlows.from(sourceDirectory())
      .filter(onlyJpgs())
      .channel("holdingTank")
      .get();
}

现在,因为我们只是把它写在一个通道上,我们可以从那里桥到其他流。

让我们创建一个桥来查询我们的存储箱,并将它们写入目的地:

@Bean
public IntegrationFlow fileWriter() {
    return IntegrationFlows.from("holdingTank")
      .bridge(e -> e.poller(Pollers.fixedRate(1, TimeUnit.SECONDS, 20)))
      .handle(targetDirectory())
      .get();
}

同样,因为我们写入了中间通道,现在我们可以添加另一个流,该流接受这些相同的文件并以不同的速率写入它们:

@Bean
public IntegrationFlow anotherFileWriter() {
    return IntegrationFlows.from("holdingTank")
      .bridge(e -> e.poller(Pollers.fixedRate(2, TimeUnit.SECONDS, 10)))
      .handle(anotherTargetDirectory())
      .get();
}

正如我们所看到的,各个网桥可以控制不同处理程序的轮询配置。

一旦我们的应用程序上下文被加载,我们现在就有了一个更复杂的应用程序,它将开始将文件从源目录移动到两个目标目录。

6。结论

在本文中,我们看到了使用Spring集成Java DSL构建不同的集成流水线的各种方法。

本质上,我们能够从以前的教程中重新创建文件移动应用程序,这次使用纯Java。

此外,我们还研究了一些其他的组件,如通道和桥。

本教程中使用的完整源代码可在GITHUB上使用。

上一篇 下一篇

猜你喜欢

热点阅读