使用spring boot和axon实现saga模式-- 中

2021-03-22  本文已影响0人  water_lang

在这个系统之前的文章我们介绍了什么是Saga模式。也描述了Saga的几种类型。最后,我们还描述了我们将会在一些场景中使用Saga模式来实现。

下面是我们关于Saga模式实现的本系列的提纲:
第一部分: 我们学习了《Saga》的基本内容。如果你不确定什么是Saga模式,我强烈建议你去看看那篇文章,然后再回到这篇文章。

第二部分(本文):我们将首先为一个常见场景用Saga模式 解决。

第三部分: 我们将继续未完成的工作,并用Axon Server来串联起各个服务。

第四部分:我们将测试我们的Saga应用程序。

这整个应用程序可以在Github上找到。

现在让我们开始动手吧。

Saga模式实现类型问题

正如我们前面所讨论的,我们将使用基于编排方式的Saga方法来实现Saga模式。当然基本协调方式的Saga也可以实现。

换句话说,编排器就是一个管理器,它编排参与的服务执行一个或多个本地事务。

下图显示了这种方法(基于编排)的一个典型示例:


图片.png

在这里,我们着眼于基于编排的Saga来解决订单管理的问题。这个订单管理问题可以应用于任何电子商务商店、食品配送应用程序或任何其他类似的用例。

当然,出于演示目的,我们将这个问题简化了很多,它比实际生产级别的复杂性要简单得多。

顶层组件

关于我们的Saga模式实现,这个应用程序有5个主要部分。各部分内容如下:

该服务暴露api,帮助在系统中创建订单。此外,该服务还管理订单聚合。订单聚合只是一个维护订单相关信息的实体。然而,订单服务还是订单管理Saga的一个子服务(维护订单内部的事务)。

支付服务根据订单管理Saga发出的创建发票命令进行操作。一旦它完成了它的工作,它就发布一个事件。这一事件将这个Saga推向了下一步流程。

该服务负责在系统中创建与订单相关的发货。它根据Saga管理器发出的命令执行对应的动作。一旦它完成了它的任务,它还会发布一个推动Saga推向下一步流程。

这不是一个服务。然而,核心api充当了各种服务之间的集成粘合剂,这些服务构成了这个Saga的一部分。在我们的示例中,核心api将包含Saga实现运行所需的各种命令和事件定义。

Axon服务器是Axon平台的一部分。我们将使用Axon框架来管理我们的聚合,如订单、支付、发货。此外,我们将使用Axon服务器来处理这三个服务之间的通信。如果你想了解更多关于Axon服务器的细节,可以查看我的文章

让我们从具体的细节开始吧

整个应用架构

下面是我们的应用程序的总体工程结构图。

.
├── core-apis
├── order-service
├── payment-service
├── pom.xml
├── saga-axon-server-spring-boot.iml
└── shipping-service

如您所见,它是一个多maven模块结构。每个服务都是maven子模块,是整个项目的一部分。

这个pom.xml文件将他们合并在一起。

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.progressivecoder.saga-pattern</groupId>
    <artifactId>saga-axon-server-spring-boot</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>

    <modules>
        <module>order-service</module>
        <module>payment-service</module>
        <module>core-apis</module>
        <module>shipping-service</module>
    </modules>

</project>

订单服务的实现

Order Service(以及所有其他服务),我们将用Spring Boot来构建应用程序。如果您不知道Spring Boot或想要更新,请参阅我关于Spring Boot微服务的文章

下面是这个应用的依赖:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Axon -->
        <dependency>
            <groupId>org.axonframework</groupId>
            <artifactId>axon-spring-boot-starter</artifactId>
            <version>4.0.3</version>
        </dependency>

        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!-- Swagger -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>

        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>

        <dependency>
            <groupId>javax.inject</groupId>
            <artifactId>javax.inject</artifactId>
            <version>1</version>
        </dependency>

        <dependency>
            <groupId>com.progressivecoder.saga-pattern</groupId>
            <artifactId>core-apis</artifactId>
            <version>${project.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
</dependencies>

以下是一些需要考虑的重要问题:

订单聚合根

订单聚合是Saga模式实现中最重要的部分之一。它是订单管理Saga工作的基础。让我们看看它是怎样的:

@Aggregate
public class OrderAggregate {

    @AggregateIdentifier
    private String orderId;

    private ItemType itemType;

    private BigDecimal price;

    private String currency;

    private OrderStatus orderStatus;

    public OrderAggregate() {
    }

    @CommandHandler
    public OrderAggregate(CreateOrderCommand createOrderCommand){
        AggregateLifecycle.apply(new OrderCreatedEvent(createOrderCommand.orderId, createOrderCommand.itemType,
                createOrderCommand.price, createOrderCommand.currency, createOrderCommand.orderStatus));
    }

    @EventSourcingHandler
    protected void on(OrderCreatedEvent orderCreatedEvent){
        this.orderId = orderCreatedEvent.orderId;
        this.itemType = ItemType.valueOf(orderCreatedEvent.itemType);
        this.price = orderCreatedEvent.price;
        this.currency = orderCreatedEvent.currency;
        this.orderStatus = OrderStatus.valueOf(orderCreatedEvent.orderStatus);
    }

    @CommandHandler
    protected void on(UpdateOrderStatusCommand updateOrderStatusCommand){
        AggregateLifecycle.apply(new OrderUpdatedEvent(updateOrderStatusCommand.orderId, updateOrderStatusCommand.orderStatus));
    }

    @EventSourcingHandler
    protected void on(OrderUpdatedEvent orderUpdatedEvent){
        this.orderId = orderId;
        this.orderStatus = OrderStatus.valueOf(orderUpdatedEvent.orderStatus);
    }
}

如您所见,这是一个典型的实体类。这里需要注意的主要事项是这里有axon自己的的注解@Aggregate和@AggregateIdentifier。这些注解允许Axon框架管理订单聚合实例。

此外,我们正在使用事件溯源来存储在此聚合上发生的事件。事件溯源是另一种微服务体系结构模式,它们存储领域事件然后构建聚合信息。如果你想了解更多的话,我有一个关于事件来源实现的详细文章

订单Service层

为了方便创建订单,我们还定义了一个订单服务接口及其相应的实现。

public interface OrderCommandService {

    public CompletableFuture<String> createOrder(OrderCreateDTO orderCreateDTO);

}
@Service
public class OrderCommandServiceImpl implements OrderCommandService {

    private final CommandGateway commandGateway;

    public OrderCommandServiceImpl(CommandGateway commandGateway) {
        this.commandGateway = commandGateway;
    }

    @Override
    public CompletableFuture<String> createOrder(OrderCreateDTO orderCreateDTO) {
        return commandGateway.send(new CreateOrderCommand(UUID.randomUUID().toString(), orderCreateDTO.getItemType(),
                orderCreateDTO.getPrice(), orderCreateDTO.getCurrency(), String.valueOf(OrderStatus.CREATED)));
    }
}

该service服务层实现使用Axon框架的命令网关向聚合发出命令。该命令在我们前面声明的聚合类中处理。

订单Controller层

Order Controller类是我们创建API端点的地方。此时,出于演示的目的,我们只有一个端点(一个api接口)。

@RestController
@RequestMapping(value = "/api/orders")
@Api(value = "Order Commands", description = "Order Commands Related Endpoints", tags = "Order Commands")
public class OrderCommandController {

    private OrderCommandService orderCommandService;

    public OrderCommandController(OrderCommandService orderCommandService) {
        this.orderCommandService = orderCommandService;
    }

    @PostMapping
    public CompletableFuture<String> createOrder(@RequestBody OrderCreateDTO orderCreateDTO){
        return orderCommandService.createOrder(orderCreateDTO);
    }
}

为了帮助Swagger找到这些端点并暴露swagger api,我们还得配置Swagger。

@Configuration
@EnableSwagger2
public class SwaggerConfig {

    @Bean
    public Docket apiDocket(){
        return new Docket(DocumentationType.SWAGGER_2)
                .select()
                .apis(RequestHandlerSelectors.basePackage("com.progressivecoder.ordermanagement"))
                .paths(PathSelectors.any())
                .build()
                .apiInfo(getApiInfo());
    }

    private ApiInfo getApiInfo(){
        return new ApiInfo(
                "Saga Pattern Implementation using Axon and Spring Boot",
                "App to demonstrate Saga Pattern using Axon and Spring Boot",
                "1.0.0",
                "Terms of Service",
                new Contact("Saurabh Dashora", "progressivecoder.com", "coder.progressive@gmail.com"),
                "",
                "",
                Collections.emptyList());
    }

}

订单管理Saga

Saga模式实现的核心是订单管理Saga。简而言之,这也是一个典型的Java类,它就是一个流程管理器,用于处理业务的流程。

@Saga
public class OrderManagementSaga {

    @Inject
    private transient CommandGateway commandGateway;

    @StartSaga
    @SagaEventHandler(associationProperty = "orderId")
    public void handle(OrderCreatedEvent orderCreatedEvent){
        String paymentId = UUID.randomUUID().toString();
        System.out.println("Saga invoked");

        //associate Saga
        SagaLifecycle.associateWith("paymentId", paymentId);

        System.out.println("order id" + orderCreatedEvent.orderId);

        //send the commands
        commandGateway.send(new CreateInvoiceCommand(paymentId, orderCreatedEvent.orderId));
    }

    @SagaEventHandler(associationProperty = "paymentId")
    public void handle(InvoiceCreatedEvent invoiceCreatedEvent){
        String shippingId = UUID.randomUUID().toString();

        System.out.println("Saga continued");

        //associate Saga with shipping
        SagaLifecycle.associateWith("shipping", shippingId);

        //send the create shipping command
        commandGateway.send(new CreateShippingCommand(shippingId, invoiceCreatedEvent.orderId, invoiceCreatedEvent.paymentId));
    }

    @SagaEventHandler(associationProperty = "orderId")
    public void handle(OrderShippedEvent orderShippedEvent){
        commandGateway.send(new UpdateOrderStatusCommand(orderShippedEvent.orderId, String.valueOf(OrderStatus.SHIPPED)));
    }

    @SagaEventHandler(associationProperty = "orderId")
    public void handle(OrderUpdatedEvent orderUpdatedEvent){
        SagaLifecycle.end();
    }
}

Core-APIs

现在是了解core - api模块的好时机(command,event没有setter方法)。

核心api只是一堆命令和事件的类,这些命令和事件将成为Saga模式实现的一部分。

命令(Commands)

在我们的应用程序中创建新订单时,将触发Create Order Command。此命令由订单聚合处理。

public class CreateOrderCommand {

    @TargetAggregateIdentifier
    public final String orderId;

    public final String itemType;

    public final BigDecimal price;

    public final String currency;

    public final String orderStatus;

    public CreateOrderCommand(String orderId, String itemType, BigDecimal price, String currency, String orderStatus) {
        this.orderId = orderId;
        this.itemType = itemType;
        this.price = price;
        this.currency = currency;
        this.orderStatus = orderStatus;
    }
}

接下来,创建订单时,订单管理Saga将触发Create Invoice Command。

public class CreateInvoiceCommand{

    @TargetAggregateIdentifier
    public final String paymentId;

    public final String orderId;

    public CreateInvoiceCommand(String paymentId, String orderId) {
        this.paymentId = paymentId;
        this.orderId = orderId;
    }
}

当发票创建和付款处理完成时,订单管理Saga也会触发Create Shipping Command。

public class CreateShippingCommand {

    @TargetAggregateIdentifier
    public final String shippingId;

    public final String orderId;

    public final String paymentId;

    public CreateShippingCommand(String shippingId, String orderId, String paymentId) {
        this.shippingId = shippingId;
        this.orderId = orderId;
        this.paymentId = paymentId;
    }
}

最后,我们有Update Order Status Command。当运输完成后,将触发此命令。

public class UpdateOrderStatusCommand {

    @TargetAggregateIdentifier
    public final String orderId;

    public final String orderStatus;

    public UpdateOrderStatusCommand(String orderId, String orderStatus) {
        this.orderId = orderId;
        this.orderStatus = orderStatus;
    }
}

事件(Events)

整个过程中的第一个事件是订单创建事件(Order Created Event.)。就像我们之前看到的,这个事件也是开始这个Saga的原因。

public class OrderCreatedEvent {

    public final String orderId;

    public final String itemType;

    public final BigDecimal price;

    public final String currency;

    public final String orderStatus;

    public OrderCreatedEvent(String orderId, String itemType, BigDecimal price, String currency, String orderStatus) {
        this.orderId = orderId;
        this.itemType = itemType;
        this.price = price;
        this.currency = currency;
        this.orderStatus = orderStatus;
    }
}

下一个事件是Invoice Created Event。Invoice服务发布此事件。我们将在下一篇文章中看到它的实现。

public class InvoiceCreatedEvent  {

    public final String paymentId;

    public final String orderId;

    public InvoiceCreatedEvent(String paymentId, String orderId) {
        this.paymentId = paymentId;
        this.orderId = orderId;
    }
}

在那之后,我们有Order Shipped Event。该事件由Shipping服务在完成必要的操作后发布。

public class OrderShippedEvent {

    public final String shippingId;

    public final String orderId;

    public final String paymentId;

    public OrderShippedEvent(String shippingId, String orderId, String paymentId) {
        this.shippingId = shippingId;
        this.orderId = orderId;
        this.paymentId = paymentId;
    }
}

最后,我们有Order Updated Event。此事件由订单聚合在更新订单状态后发布。

public class OrderUpdatedEvent {

    public final String orderId;

    public final String orderStatus;

    public OrderUpdatedEvent(String orderId, String orderStatus) {
        this.orderId = orderId;
        this.orderStatus = orderStatus;
    }
}

结论:

至此,我们已经成功实现了应用程序的两个主要部分——订单服务和核心api。

Order服务包含了我们的主要Saga模式实现代码。另一方面,核心api是我们订单管理Saga的支柱。

我们将在这里结束这篇文章,因为它已经变得相当长。然而,整个Saga实现的代码可以在Github上获得以供参考。

在下一篇文章中,我们将开始实现其他服务并将它们连接到Axon服务器。所以请继续关注。

上一篇 下一篇

猜你喜欢

热点阅读