领域驱动+CQRS+AXON实践
一、概念理解
1、领域驱动设计(DDD)
传统方式的不足
过度耦合
一般业务初期,系统的功能大都非常简单,普通的CRUD就能满足,此时系统是清晰的。然而随着迭代的不断演化,业务逻辑变得越来越复杂,系统也越来越冗杂。模块彼此关联,谁都很难说清模块的具体功能意图是啥。修改一个功能时,往往光回溯该功能需要的修改点就需要很长时间,更别提修改带来的不可预知的影响面。
用DDD则可以很好地解决领域模型到设计模型的同步、演化,最后再将反映了领域的设计模型转为实际的代码。
注:模型是我们解决实际问题所抽象出来的概念模型,领域模型则表达与业务相关的事实;设计模型则描述了所要构建的系统。
贫血症和失忆症
贫血领域对象(Anemic Domain Object)是指仅用作数据载体,而没有行为和动作的领域对象。
当前大多开发模式,都是以数据为中心,以数据库ER设计作驱动,对象只是数据的载体,没有行为。即便对架构进行了分层,但是分层架构在这种开发模式下,也只是对数据的移动、处理和实现的过程。
更好的开发方式是采用领域模型,将数据和行为封装在一起,并与现实世界中的业务对象相映射。各类具备明确的职责划分,将领域逻辑分散到领域对象中。
值对象
当一个对象用于对事务进行描述而没有唯一标识时,它被称作值对象(Value Object)。
例:比如性别信息,我们只需要知道{"sex":"男"}这样的值信息就能够满足要求了,这避免了我们对标识追踪带来的系统复杂性。
它具有不变性、相等性和可替换性。在实践中,需要保证值对象创建后就不能被修改,即不允许外部再修改其属性。
聚合根
Aggregate(聚合)是一组相关对象的集合,作为一个整体被外界访问,聚合由根实体,值对象和实体组成,聚合根(Aggregate Root)是这个聚合的根节点。
如何创建好的聚合?
- 边界内的内容具有一致性:在一个事务中只修改一个聚合实例。如果你发现边界内很难接受强一致,不管是出于性能或产品需求的考虑,应该考虑剥离出独立的聚合,采用最终一致的方式。
- 设计小聚合:大部分的聚合都可以只包含根实体,而无需包含其他实体。即使一定要包含,可以考虑将其创建为值对象。
- 通过唯一标识来引用其他聚合或实体:当存在对象之间的关联时,建议引用其唯一标识而非引用其整体对象。如果是外部上下文中的实体,引用其唯一标识或将需要的属性构造值对象。如果聚合创建复杂,推荐使用工厂方法来屏蔽内部复杂的创建逻辑。
聚合内部多个组成对象的关系可以用来指导数据库创建,但不可避免存在一定的抗阻。如聚合中存在List<值对象>,那么在数据库中建立1:N的关联需要将值对象单独建表,此时是有id的,建议不要将该id暴露到资源库外部,对外隐蔽。
领域服务
一些重要的领域行为或操作,可以归类为领域服务。它既不是实体,也不是值对象的范畴。
领域事件
领域事件是对领域内发生的活动进行的建模。
参考资料:
领域驱动设计在互联网业务开发中的实践
领域驱动设计实现之路
觉得美团技术团队写的《领域驱动设计在互联网业务开发中的实践》这篇文章挺好的,建议看看!
2、CQRS
CQRS理解
CQRS使用分离的接口将数据查询操作(Queries)和数据修改操作(Commands)分离开来,这也意味着在查询和更新过程中使用的数据模型也是不一样的。这样读和写逻辑就隔离开来了。
CQRS Model1
使用CQRS分离了读写职责之后,可以对数据进行读写分离操作来改进性能,可扩展性和安全。如下图:
CQRS Model2
主数据库处理CUD,从库处理R,从库的的结构可以和主库的结构完全一样,也可以不一样,从库主要用来进行只读的查询操作。在数量上从库的个数也可以根据查询的规模进行扩展,在业务逻辑上,也可以根据专题从主库中划分出不同的从库。从库也可以实现成ReportingDatabase,根据查询的业务需求,从主库中抽取一些必要的数据生成一系列查询报表来存储。
CQRS模式的优点如下:
1)分工明确,可以负责不同的部分
2)将业务上的命令和查询的职责分离能够提高系统的性能、可扩展性和安全性。并且在系统的演化中能够保持高度的灵活性,能够防止出现CRUD模式中,对查询或者修改中的某一方进行改动,导致另一方出现问题的情况。
3)可以从数据驱动(Data-Driven) 转到任务驱动(Task-Driven)以及事件驱动(Event-Driven).
但是在以下场景中,可能不适宜使用CQRS:
1)领域模型或者业务逻辑比较简单,这种情况下使用CQRS会把系统搞复杂。
2)对于简单的,CRUD模式的用户界面以及与之相关的数据访问操作已经足够的话,没必要使用CQRS,这些都是一个简单的对数据进行增删改查。
3)不适合在整个系统中到处使用该模式。在整个数据管理场景中的特定模块中CQRS可能比较有用。但是在有些地方使用CQRS会增加系统不必要的复杂性。
Event Sourcing
事件溯源不是直接存储对象状态,而是存储一系列事件,这些事件描述了过去在对象上发生的所有变化。对象当前的状态是通过在一个“空”实例上重放所有发生过的事件重新计算后得出。
举个预定座位的例子如下:
事件溯源举例
详细介绍,请看:[Introducing Event Sourcing](https://docs.microsoft.com/en-us/previous-versions/msp-n-p/jj591559(v=pandp.10)
事件溯源只能保证最终一致性。也就是说,在一个事件发生了之后,其他系统不会立即感知到它,在它们收到事件之前会有一定的延迟(比如 100 毫秒),所以你所投射的数据可能不是最新的。不过,塞翁失马,焉知非福。最终一致性的系统具有容错能力,可以解决服务中断问题。
CQRS实现
CQRS的实现,采用的就是Event Sourcing 机制,结构如下:
CQRS
从图中可以看到,操作通过Command发送到CommandBus上,然后特定的CommandHandler处理请求,产生对应的Event,event通过repository持久化。
事件持久化完成后,接下来就是会把这些事件发布出去(发送到分布式消息队列),给消费者消费了,也就是给所有的Event Handler处理。这些Event Handler可能是更新Query端的ReadDB,也可能是发送邮件,也可能是调用外部系统的接口。
详细请看:Command and Query Responsibility Segregation (CQRS) pattern
3、AXON
Axon Framework 通过支持开发者应用命令查询职责分离(CQRS)架构模式,来帮助构建可伸缩、可扩展和可维护的应用程序。
Axon Framework相关资料:
1)reference-guide
2)AxonFramework-Github
二、Demo
实践项目背景是一个银行账户的管理,主要包含创建账户,以及取款,以及查询存款这几个功能。
项目背景比较简单,因为我们并不是为了探索DDD的设计问题,这个需要长期的经验积累,才能完成一个好的业务领域设计,本项目主要是实践下CQRS架构和AXON框架。
本项目分为Command端和Query端,服务使用的是Spring Boot实现。
1、Command端实现
咱们的业务是对银行账户进行操作,业务场景中的聚合如下:
@Aggregate
public class BankAccountAggregate {
@AggregateIdentifier
private AccountId accountId;
private String accountName;
private long balance;
......
}
我们采用的是Axon-Spring 集成包,因此创建聚合时,加上注解Aggregate即可。
对这个聚合做的操作有:创建账户,取款操作。CQRS模式下,一个操作即发送一个命令,对应本项目就是CreateAccountCommand和WithdrawMoneyCommand这两个命令。
Command 经过总线发出来,Command Handler接受Commnd并进行处理:
@Component
public class BankAcountCommandHandler {
private static final Logger logger = getLogger(BankAcountCommandHandler.class);
@Autowired
private Repository<BankAccountAggregate> repository;
public BankAcountCommandHandler(Repository<BankAccountAggregate> repository) {
this.repository = repository;
}
@CommandHandler
public String handle(CreateAccountCommand command) throws Exception {
logger.debug("create account command handler");
Aggregate<BankAccountAggregate> aggregate = repository.newInstance(() -> new BankAccountAggregate(command.getAccountId(),
command.getAccountName(), command.getAmount()));
return aggregate.identifier().toString();
}
@CommandHandler
public void handle(WithdrawMoneyCommand command) {
logger.debug("withdraw money command handler");
Aggregate<BankAccountAggregate> aggregate = repository.load(command.getAccountId().toString());
aggregate.execute(aggregateRoot -> aggregateRoot.withDrawMoney(command.getAccountId(), command.getAmount()));
}
}
注意ComandHandler写在Aggregate内部和外部是有区别的,内部AXON会自动帮我们Load Instance;而在外部,我们则需要手动Load。
Axon是事件驱动模式的,任何对聚合状态的修改操作,都会生成事件。对应本例,事件有:AccountCreatedEvent和MoneyWithdrawnEvent这两个事件。首先AXON会存储相应的事件,代码中的仓储为
@Autowired
private Repository<BankAccountAggregate> repository;
仓储的定义如下:
@Configuration
public class AxonConfig {
@Autowired
private EventStore eventStore;
@Bean
public AggregateFactory<BankAccountAggregate> bankAccountAggregateAggregateFactory() {
SpringPrototypeAggregateFactory<BankAccountAggregate> aggregateFactory = new SpringPrototypeAggregateFactory<>();
aggregateFactory.setPrototypeBeanName("bankAccountAggregate");
return aggregateFactory;
}
@Bean
public Repository<BankAccountAggregate> bankAccountAggregateRepository() {
EventSourcingRepository<BankAccountAggregate> repository = new EventSourcingRepository<BankAccountAggregate>(
bankAccountAggregateAggregateFactory(),
eventStore
);
return repository;
}
}
注意定义Repository Bean,有如下两种定义方式:
1)在聚合的@Aggregate注解里,指定Repository的名字;
2)reporsitory 的bean的名字遵循:聚合的名字(注意首字母小写)+‘Repository'。比如本例中聚合名为BankAccountAggregate,那么Repository的Bean名就为bankAccountAggregateRepository。
如果上述两个bean名都没有找到,那么AXON会定义一个EventSourcingRepository,不过前提是EventStore Available。
将事件保存后,会将事件发出,对事件感兴趣者接受事件并进行相应处理,command端处理如下:
@Aggregate
public class BankAccountAggregate {
private static final Logger logger = getLogger(BankAccountAggregate.class);
@AggregateIdentifier
private AccountId accountId;
private String accountName;
private long balance;
......
/**
* 创建账户事件处理.
*/
@EventHandler
public void on(AccountCreatedEvent event) {
this.accountId = event.getAccountId();
this.accountName = event.getAccountName();
this.balance = event.getAmount();
logger.info("Account {} is created with balance {}", accountId, this.balance);
}
/**
* 取款事件处理.
*/
@EventHandler
public void on(MoneyWithdrawnEvent event) {
long result = this.balance - event.getAmount();
if (result < 0)
logger.error("Cannot withdraw more money than the balance!");
else {
this.balance = result;
logger.info("Withdraw {} from account {}, balance result: {}", event.getAmount(), accountId, balance);
}
}
}
2、Query端实现
Query端,存储Aggregate的快照用于查询。具体实现为订阅Command产生的事件,然后更新Query端的数据库,Query端依然使用Jpa存储到mysql数据库中。
首先定义一个BankAccountEntry。
@Entity
@Data
public class BankAccountEntry {
@Id
@GeneratedValue
private long id;
private String accountId;
private long balance;
public BankAccountEntry(String accountId, long balance) {
this.accountId = accountId;
this.balance = balance;
}
}
订阅事件并进行数据库更新:
@Component
public class BankAccountEventListener {
private BankAccountRepository repository;
@Autowired
public BankAccountEventListener(BankAccountRepository repository) {
this.repository = repository;
}
@EventHandler
public void on(AccountCreatedEvent event) {
repository.save(new BankAccountEntry(event.getAccountId().toString(), event.getAmount()));
}
@EventHandler
public void on(MoneyWithdrawnEvent event) {
BankAccountEntry bankAccountEntry = repository.findOneByAccountId(event.getAccountId().toString());
bankAccountEntry.setBalance(bankAccountEntry.getBalance() - event.getAmount());
repository.save(bankAccountEntry);
}
}
Jpa的Repository为:
@Repository
public interface BankAccountRepository extends JpaRepository<BankAccountEntry, String> {
BankAccountEntry findOneByAccountId(String accountId);
}
更多关于Jpa的资料:Spring Data JPA 入门系列
3、Spring Boot Ctronller
@RestController
@RequestMapping("/bank")
public class BankAccountController {
private static final Logger logger = getLogger(BankAccountController.class);
@Autowired
private BankAccountRepository repository;
@Autowired
private CommandGateway commandGateway;
@RequestMapping(value = "/create", method = RequestMethod.GET)
public void create() {
AccountId id = new AccountId();
logger.debug("Create account,account id: {}", id.toString());
commandGateway.send(new CreateAccountCommand(id, "MyAccount", 1000));
}
@RequestMapping(value = "/withdraw/{accountId}", method = RequestMethod.GET)
public void withdraw(@PathVariable String accountId) {
logger.debug("Withdraw,account id: {}", accountId);
commandGateway.send(new WithdrawMoneyCommand(new AccountId(accountId), 500));
}
@RequestMapping(value = "/query/{accountId}", method = RequestMethod.GET)
public void query(@PathVariable String accountId) {
logger.debug("query, account id: {}", accountId);
BankAccountEntry bankAccountEntry = repository.findOneByAccountId(accountId);
logger.info("query result is {}", new Gson().toJson(bankAccountEntry));
}
}
CommandGateway提供了四种发送Comman的方法:
- send(command, CommandCallback) 发送command,根据执行结果调用CommandCallback中的onSuccess或onFailure方法
- sendAndWait(command) 发送完command,等待执行完成并返回结果
- sendAndWait(command, timeout, TimeUnit) 这个好理解,比上面多了一个超时
- send(command) 该方法返回一个CompletableFuture,不用等待command的执行,立刻返回。结果通过future获取。
4、AXON配置介绍
Axon启动最少要指定如下几个模块:
1) CommandBus
CommandBus是用来分发Command到对应CommandHandler的机制。每一个Command只会发送到一个CommandHandler去,当有多个CommandHandler去订阅一个CommandMessage时, 最后一个覆盖前面所有。Axon内置了四种CommandBus:
- SimpleCommandBus ,默认直接在发送线程里去执行command handler,执行后保存Aggregate状态和发送事件也都在同一个线程上,适用于大多数情况。
- AsynchrounousCommandBus,默认使用一个CachedThreadPool来起一个新线程去处理command。CachedThreadPool线程调用时,会检查是否有可用的线程,没有则创建。闲置线程60s后自动关闭。也可以通过config指定其他的线程池来采用不同的线程调度策略。
- DisruptorCommandBus,适用于多线程场景。SimpleCommandBus在遇到多线程调用时,为了保证aggregate的状态,必须要加锁,这样就降低了效率。DisruptorCommandBus用了开源的并发处理框架Disruptor,用两组线程来处理多线程场景,一组用于执行command handler去更新aggregate的状态,一组用于存储和发送所产生的event到EventStore。
- DistributedCommandBus,不像其他CommandBus,DistributedCommandBus并不调用任何command handler,它只是在不同JVM的commandbus之间建立一个“桥梁”。
** 2)EventBus**
EventBus用于把event发送到subscribe它的各个handler去。Axon提供了两种EventBus的实现,都支持订阅和跟踪:
- SimpleEventBus,默认的EventBus,不持久化event,一旦发送到消费者去,就会销毁。
- EmbeddedEventStore,可以持久化event,以便以后replay。
** 3)Repository**
即Aggregate的持久化方式。Axon内置了两种
- Standard Repositories,代表是GenericJpaRepository,直接把Aggregate的最新状态存到db去。
- Event Sourcing Repositories,并不直接保存Aggregate的最新状态,而是保存对Aggregate造成影响的所有Event,通过Event回溯来恢复Aggregate状态。
我们也可以自己实现Repository,,此时最好继承抽象类LockingRepository,对于aggregate wrapper type,建议使用AnnotatedAggregate。
** 4)EventStorageEngine**
提供event在底层storage读写的机制,内置了若干种:
- InMemoryEventStorageEngine,存储到内存中
- JpaEventStorageEngine,使用JPA进行存储
- JdbcEventStorageEngine,使用jdbc
- MongoEventStorageEngine,使用Mongodb存储event。
** 5)Serializer**
由于是事件驱动框架,序列化器必不可少。Axon内置了三种:XStreamSerializer, JavaSerializer, JacksonSerializer,默认是XStreamSerializer,使用XStream来做序列化,理论上比Java自带的序列化器要快。
核心maven依赖:
<!--axon jar包-->
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-core</artifactId>
<version>${axon.version}</version>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring</artifactId>
<version>${axon.version}</version>
</dependency>
<dependency>
<groupId>org.axonframework</groupId>
<artifactId>axon-spring-boot-autoconfigure</artifactId>
<version>${axon.version}</version>
</dependency>
<!--Spring boot jar包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>