DDDJava 杂谈技术架构

领域驱动+CQRS+AXON实践

2019-07-16  本文已影响65人  PioneerYi

一、概念理解

1、领域驱动设计(DDD)

传统方式的不足

过度耦合

一般业务初期,系统的功能大都非常简单,普通的CRUD就能满足,此时系统是清晰的。然而随着迭代的不断演化,业务逻辑变得越来越复杂,系统也越来越冗杂。模块彼此关联,谁都很难说清模块的具体功能意图是啥。修改一个功能时,往往光回溯该功能需要的修改点就需要很长时间,更别提修改带来的不可预知的影响面。

用DDD则可以很好地解决领域模型到设计模型的同步、演化,最后再将反映了领域的设计模型转为实际的代码。

注:模型是我们解决实际问题所抽象出来的概念模型,领域模型则表达与业务相关的事实;设计模型则描述了所要构建的系统。

贫血症和失忆症

贫血领域对象(Anemic Domain Object)是指仅用作数据载体,而没有行为和动作的领域对象。

当前大多开发模式,都是以数据为中心,以数据库ER设计作驱动,对象只是数据的载体,没有行为。即便对架构进行了分层,但是分层架构在这种开发模式下,也只是对数据的移动、处理和实现的过程。

更好的开发方式是采用领域模型,将数据和行为封装在一起,并与现实世界中的业务对象相映射。各类具备明确的职责划分,将领域逻辑分散到领域对象中。

值对象

当一个对象用于对事务进行描述而没有唯一标识时,它被称作值对象(Value Object)。
例:比如性别信息,我们只需要知道{"sex":"男"}这样的值信息就能够满足要求了,这避免了我们对标识追踪带来的系统复杂性。

它具有不变性、相等性和可替换性。在实践中,需要保证值对象创建后就不能被修改,即不允许外部再修改其属性。

聚合根

Aggregate(聚合)是一组相关对象的集合,作为一个整体被外界访问,聚合由根实体,值对象和实体组成,聚合根(Aggregate Root)是这个聚合的根节点。

如何创建好的聚合?

聚合内部多个组成对象的关系可以用来指导数据库创建,但不可避免存在一定的抗阻。如聚合中存在List<值对象>,那么在数据库中建立1:N的关联需要将值对象单独建表,此时是有id的,建议不要将该id暴露到资源库外部,对外隐蔽。

领域服务

一些重要的领域行为或操作,可以归类为领域服务。它既不是实体,也不是值对象的范畴。

领域事件

领域事件是对领域内发生的活动进行的建模。

参考资料:
领域驱动设计在互联网业务开发中的实践
领域驱动设计实现之路

觉得美团技术团队写的《领域驱动设计在互联网业务开发中的实践》这篇文章挺好的,建议看看!

2、CQRS

CQRS理解

CQRS使用分离的接口将数据查询操作(Queries)和数据修改操作(Commands)分离开来,这也意味着在查询和更新过程中使用的数据模型也是不一样的。这样读和写逻辑就隔离开来了。


CQRS Model1

使用CQRS分离了读写职责之后,可以对数据进行读写分离操作来改进性能,可扩展性和安全。如下图:


CQRS Model2

主数据库处理CUD,从库处理R,从库的的结构可以和主库的结构完全一样,也可以不一样,从库主要用来进行只读的查询操作。在数量上从库的个数也可以根据查询的规模进行扩展,在业务逻辑上,也可以根据专题从主库中划分出不同的从库。从库也可以实现成ReportingDatabase,根据查询的业务需求,从主库中抽取一些必要的数据生成一系列查询报表来存储。

CQRS模式的优点如下:
1)分工明确,可以负责不同的部分
2)将业务上的命令和查询的职责分离能够提高系统的性能、可扩展性和安全性。并且在系统的演化中能够保持高度的灵活性,能够防止出现CRUD模式中,对查询或者修改中的某一方进行改动,导致另一方出现问题的情况。
3)可以从数据驱动(Data-Driven) 转到任务驱动(Task-Driven)以及事件驱动(Event-Driven).

但是在以下场景中,可能不适宜使用CQRS:
1)领域模型或者业务逻辑比较简单,这种情况下使用CQRS会把系统搞复杂。
2)对于简单的,CRUD模式的用户界面以及与之相关的数据访问操作已经足够的话,没必要使用CQRS,这些都是一个简单的对数据进行增删改查。
3)不适合在整个系统中到处使用该模式。在整个数据管理场景中的特定模块中CQRS可能比较有用。但是在有些地方使用CQRS会增加系统不必要的复杂性。

Event Sourcing

事件溯源不是直接存储对象状态,而是存储一系列事件,这些事件描述了过去在对象上发生的所有变化。对象当前的状态是通过在一个“空”实例上重放所有发生过的事件重新计算后得出。

举个预定座位的例子如下:


事件溯源举例

详细介绍,请看:[Introducing Event Sourcing](https://docs.microsoft.com/en-us/previous-versions/msp-n-p/jj591559(v=pandp.10)

事件溯源只能保证最终一致性。也就是说,在一个事件发生了之后,其他系统不会立即感知到它,在它们收到事件之前会有一定的延迟(比如 100 毫秒),所以你所投射的数据可能不是最新的。不过,塞翁失马,焉知非福。最终一致性的系统具有容错能力,可以解决服务中断问题。

CQRS实现

CQRS的实现,采用的就是Event Sourcing 机制,结构如下:


CQRS

从图中可以看到,操作通过Command发送到CommandBus上,然后特定的CommandHandler处理请求,产生对应的Event,event通过repository持久化。

事件持久化完成后,接下来就是会把这些事件发布出去(发送到分布式消息队列),给消费者消费了,也就是给所有的Event Handler处理。这些Event Handler可能是更新Query端的ReadDB,也可能是发送邮件,也可能是调用外部系统的接口。

详细请看:Command and Query Responsibility Segregation (CQRS) pattern

3、AXON

Axon Framework 通过支持开发者应用命令查询职责分离(CQRS)架构模式,来帮助构建可伸缩、可扩展和可维护的应用程序。

Axon Framework

相关资料:
1)reference-guide
2)AxonFramework-Github

二、Demo

实践项目背景是一个银行账户的管理,主要包含创建账户,以及取款,以及查询存款这几个功能。

项目背景比较简单,因为我们并不是为了探索DDD的设计问题,这个需要长期的经验积累,才能完成一个好的业务领域设计,本项目主要是实践下CQRS架构和AXON框架。

本项目分为Command端和Query端,服务使用的是Spring Boot实现。

1、Command端实现

咱们的业务是对银行账户进行操作,业务场景中的聚合如下:

@Aggregate
public class BankAccountAggregate {
    @AggregateIdentifier
    private AccountId accountId;
    private String accountName;
    private long balance;
    ......
}

我们采用的是Axon-Spring 集成包,因此创建聚合时,加上注解Aggregate即可。

对这个聚合做的操作有:创建账户,取款操作。CQRS模式下,一个操作即发送一个命令,对应本项目就是CreateAccountCommand和WithdrawMoneyCommand这两个命令。

Command 经过总线发出来,Command Handler接受Commnd并进行处理:

@Component
public class BankAcountCommandHandler {
    private static final Logger logger = getLogger(BankAcountCommandHandler.class);

    @Autowired
    private Repository<BankAccountAggregate> repository;

    public BankAcountCommandHandler(Repository<BankAccountAggregate> repository) {
        this.repository = repository;
    }

    @CommandHandler
    public String handle(CreateAccountCommand command) throws Exception {
        logger.debug("create account command handler");
        Aggregate<BankAccountAggregate> aggregate = repository.newInstance(() -> new BankAccountAggregate(command.getAccountId(),
            command.getAccountName(), command.getAmount()));
        return aggregate.identifier().toString();
    }

    @CommandHandler
    public void handle(WithdrawMoneyCommand command) {
        logger.debug("withdraw money command handler");
        Aggregate<BankAccountAggregate> aggregate = repository.load(command.getAccountId().toString());
        aggregate.execute(aggregateRoot -> aggregateRoot.withDrawMoney(command.getAccountId(), command.getAmount()));
    }
}

注意ComandHandler写在Aggregate内部和外部是有区别的,内部AXON会自动帮我们Load Instance;而在外部,我们则需要手动Load。

Axon是事件驱动模式的,任何对聚合状态的修改操作,都会生成事件。对应本例,事件有:AccountCreatedEvent和MoneyWithdrawnEvent这两个事件。首先AXON会存储相应的事件,代码中的仓储为

@Autowired
private Repository<BankAccountAggregate> repository;

仓储的定义如下:

@Configuration
public class AxonConfig {

    @Autowired
    private EventStore eventStore;

    @Bean
    public AggregateFactory<BankAccountAggregate> bankAccountAggregateAggregateFactory() {
        SpringPrototypeAggregateFactory<BankAccountAggregate> aggregateFactory = new SpringPrototypeAggregateFactory<>();
        aggregateFactory.setPrototypeBeanName("bankAccountAggregate");
        return aggregateFactory;
    }

    @Bean
    public Repository<BankAccountAggregate> bankAccountAggregateRepository() {
        EventSourcingRepository<BankAccountAggregate> repository = new EventSourcingRepository<BankAccountAggregate>(
            bankAccountAggregateAggregateFactory(),
            eventStore
        );
        return repository;
    }
}

注意定义Repository Bean,有如下两种定义方式:
1)在聚合的@Aggregate注解里,指定Repository的名字;
2)reporsitory 的bean的名字遵循:聚合的名字(注意首字母小写)+‘Repository'。比如本例中聚合名为BankAccountAggregate,那么Repository的Bean名就为bankAccountAggregateRepository。

如果上述两个bean名都没有找到,那么AXON会定义一个EventSourcingRepository,不过前提是EventStore Available。

将事件保存后,会将事件发出,对事件感兴趣者接受事件并进行相应处理,command端处理如下:

@Aggregate
public class BankAccountAggregate {

    private static final Logger logger = getLogger(BankAccountAggregate.class);

    @AggregateIdentifier
    private AccountId accountId;
    private String accountName;
    private long balance;
    ......

    /**
     * 创建账户事件处理.
     */
    @EventHandler
    public void on(AccountCreatedEvent event) {
        this.accountId = event.getAccountId();
        this.accountName = event.getAccountName();
        this.balance = event.getAmount();
        logger.info("Account {} is created with balance {}", accountId, this.balance);
    }

    /**
     * 取款事件处理.
     */
    @EventHandler
    public void on(MoneyWithdrawnEvent event) {
        long result = this.balance - event.getAmount();
        if (result < 0)
            logger.error("Cannot withdraw more money than the balance!");
        else {
            this.balance = result;
            logger.info("Withdraw {} from account {}, balance result: {}", event.getAmount(), accountId, balance);
        }
    }
}

2、Query端实现

Query端,存储Aggregate的快照用于查询。具体实现为订阅Command产生的事件,然后更新Query端的数据库,Query端依然使用Jpa存储到mysql数据库中。

首先定义一个BankAccountEntry。

@Entity
@Data
public class BankAccountEntry {
    @Id
    @GeneratedValue
    private long id;
    private String accountId;
    private long balance;

    public BankAccountEntry(String accountId, long balance) {
        this.accountId = accountId;
        this.balance = balance;
    }
}

订阅事件并进行数据库更新:

@Component
public class BankAccountEventListener {
    private BankAccountRepository repository;

    @Autowired
    public BankAccountEventListener(BankAccountRepository repository) {
        this.repository = repository;
    }

    @EventHandler
    public void on(AccountCreatedEvent event) {
        repository.save(new BankAccountEntry(event.getAccountId().toString(), event.getAmount()));
    }

    @EventHandler
    public void on(MoneyWithdrawnEvent event) {
        BankAccountEntry bankAccountEntry = repository.findOneByAccountId(event.getAccountId().toString());
        bankAccountEntry.setBalance(bankAccountEntry.getBalance() - event.getAmount());
        repository.save(bankAccountEntry);
    }
}

Jpa的Repository为:

@Repository
public interface BankAccountRepository extends JpaRepository<BankAccountEntry, String> {
    BankAccountEntry findOneByAccountId(String accountId);
}

更多关于Jpa的资料:Spring Data JPA 入门系列

3、Spring Boot Ctronller

@RestController
@RequestMapping("/bank")
public class BankAccountController {

    private static final Logger logger = getLogger(BankAccountController.class);

    @Autowired
    private BankAccountRepository repository;

    @Autowired
    private CommandGateway commandGateway;

    @RequestMapping(value = "/create", method = RequestMethod.GET)
    public void create() {
        AccountId id = new AccountId();
        logger.debug("Create account,account id: {}", id.toString());
        commandGateway.send(new CreateAccountCommand(id, "MyAccount", 1000));
    }

    @RequestMapping(value = "/withdraw/{accountId}", method = RequestMethod.GET)
    public void withdraw(@PathVariable String accountId) {
        logger.debug("Withdraw,account id: {}", accountId);
        commandGateway.send(new WithdrawMoneyCommand(new AccountId(accountId), 500));
    }

    @RequestMapping(value = "/query/{accountId}", method = RequestMethod.GET)
    public void query(@PathVariable String accountId) {
        logger.debug("query, account id: {}", accountId);
        BankAccountEntry bankAccountEntry = repository.findOneByAccountId(accountId);
        logger.info("query result is {}", new Gson().toJson(bankAccountEntry));
    }
}

CommandGateway提供了四种发送Comman的方法:

4、AXON配置介绍

Axon启动最少要指定如下几个模块:

1) CommandBus
CommandBus是用来分发Command到对应CommandHandler的机制。每一个Command只会发送到一个CommandHandler去,当有多个CommandHandler去订阅一个CommandMessage时, 最后一个覆盖前面所有。Axon内置了四种CommandBus:

** 2)EventBus**
EventBus用于把event发送到subscribe它的各个handler去。Axon提供了两种EventBus的实现,都支持订阅和跟踪:

** 3)Repository**
即Aggregate的持久化方式。Axon内置了两种

我们也可以自己实现Repository,,此时最好继承抽象类LockingRepository,对于aggregate wrapper type,建议使用AnnotatedAggregate。

** 4)EventStorageEngine**
提供event在底层storage读写的机制,内置了若干种:

** 5)Serializer**
由于是事件驱动框架,序列化器必不可少。Axon内置了三种:XStreamSerializer, JavaSerializer, JacksonSerializer,默认是XStreamSerializer,使用XStream来做序列化,理论上比Java自带的序列化器要快。

核心maven依赖:

 <!--axon jar包-->
 <dependency>
     <groupId>org.axonframework</groupId>
     <artifactId>axon-core</artifactId>
     <version>${axon.version}</version>
 </dependency>
 <dependency>
     <groupId>org.axonframework</groupId>
     <artifactId>axon-spring</artifactId>
     <version>${axon.version}</version>
 </dependency>
 <dependency>
     <groupId>org.axonframework</groupId>
     <artifactId>axon-spring-boot-autoconfigure</artifactId>
     <version>${axon.version}</version>
 </dependency>

<!--Spring boot jar包-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
上一篇下一篇

猜你喜欢

热点阅读