分布式事务的21种武器 - 3

2023-05-01  本文已影响0人  DeepNoMind

在分布式系统中,事务的处理分布在不同组件、服务中,因此分布式事务的ACID保障面临着一些特殊难点。本系列文章介绍了21种分布式事务设计模式,并分析其实现原理和优缺点,在面对具体分布式事务问题时,可以选择合适的模式进行处理。原文: Exploring Solutions for Distributed Transactions (3)

David Dvořáček @Unsplash

在不同业务场景下,可以有不同的解决方案,常见方法有:

  1. 阻塞重试(Blocking Retry)
  2. 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
  3. 基于后台队列的异步处理(Using Queues to Process Asynchronously in the Background)
  4. TCC补偿(TCC Compensation Matters)
  5. 本地消息表(异步保证)/发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
  6. MQ事务(MQ Transaction)
  7. Saga模式(Saga Pattern)
  8. 事件驱动(Event Sourcing)
  9. 命令查询职责分离(Command Query Responsibility Segregation, CQRS)
  10. 原子提交(Atomic Commitment)
  11. 并行提交(Parallel Commits)
  12. 事务复制(Transactional Replication)
  13. 一致性算法(Consensus Algorithms)
  14. 时间戳排序(Timestamp Ordering)
  15. 乐观并发控制(Optimistic Concurrency Control)
  16. 拜占庭容错(Byzantine Fault Tolerance, BFT)
  17. 分布式锁(Distributed Locking)
  18. 分片(Sharding)
  19. 多版本并发控制(Multi-Version Concurrency Control, MVCC)
  20. 分布式快照(Distributed Snapshots)
  21. 主从复制(Leader-Follower Replication)

本文将介绍Saga、事件驱动以及CQRS三种模式。

7. Saga模式(Saga Pattern)
图片来源: https://docs.aws.amazon.com/prescriptive-guidance/latest/modernization-data-persistence/saga-pattern.html
import pika
import json

# Define the RabbitMQ connection parameters
credentials = pika.PlainCredentials('guest', 'guest')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)

# Define the messages to be sent between services
start_order_message = {'order_id': '12345', 'items': [{'id': '1', 'name': 'item1'}, {'id': '2', 'name': 'item2'}]}
payment_message = {'order_id': '12345', 'amount': 100.0}
shipping_message = {'order_id': '12345', 'items': [{'id': '1', 'name': 'item1'}, {'id': '2', 'name': 'item2'}]}

# Define the compensation messages to be sent in case of failure
cancel_payment_message = {'order_id': '12345', 'amount': 100.0}
cancel_shipping_message = {'order_id': '12345', 'items': [{'id': '1', 'name': 'item1'}, {'id': '2', 'name': 'item2'}]}

# Define the function to send messages to the RabbitMQ broker
def send_message(queue_name, message):
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    channel.queue_declare(queue=queue_name)
    channel.basic_publish(exchange='', routing_key=queue_name, body=json.dumps(message))
    connection.close()

# Define the function to handle the start of the order
def start_order():
    # Send the start order message to the Order service
    send_message('start_order', start_order_message)

# Define the function to handle the payment of the order
def payment():
    try:
        # Send the payment message to the Payment service
        send_message('payment', payment_message)
    except Exception as e:
        # Send the cancel payment message to the Payment service in case of failure
        send_message('cancel_payment', cancel_payment_message)
        raise e

# Define the function to handle the shipping of the order
def shipping():
    try:
        # Send the shipping message to the Shipping service
        send_message('shipping', shipping_message)
    except Exception as e:
        # Send the cancel shipping message to the Shipping service in case of failure
        send_message('cancel_shipping', cancel_shipping_message)
        raise e

# Define the function to handle the cancellation of the order
def cancel_order():
    # Send the cancel payment message to the Payment service
    send_message('cancel_payment', cancel_payment_message)

    # Send the cancel shipping message to the Shipping service
    send_message('cancel_shipping', cancel_shipping_message)

# Define the main function to execute the Saga
def execute_saga():
    try:
        # Start the order
        start_order()

        # Perform the payment
        payment()

        # Perform the shipping
        shipping()
    except Exception as e:
        # Cancel the order in case of failure
        cancel_order()
        raise e

# Call the main function to execute the Saga
execute_saga()

示例代码

优点

缺点

适用场景


尝试-确认-取消(TCC)模式与Saga模式的相似之处

尝试-确认-取消(TCC)模式与Saga模式的不同之处


8. 事件驱动(Event Sourcing)
图片来源: https://eventuate.io/whyeventsourcing.html 图片来源: https://eventuate.io/whyeventsourcing.html
import uuid
import json
import time

class BankAccount:
    def __init__(self):
        self.balance = 0
        self.event_sourcing = EventSourcing()

    def deposit(self, amount):
        event = Event('deposit', {'amount': amount})
        self.event_sourcing.add_event(event)
        self.balance += amount

    def withdraw(self, amount):
        if self.balance < amount:
            raise ValueError('Insufficient balance')
        event = Event('withdraw', {'amount': amount})
        self.event_sourcing.add_event(event)
        self.balance -= amount

    def get_balance(self):
        return self.balance

    def get_events(self):
        return self.event_sourcing.get_events()

    def get_event_by_id(self, event_id):
        return self.event_sourcing.get_event_by_id(event_id)

    def replay_events(self):
        self.balance = 0
        for event in self.event_sourcing.get_events():
            if event.type == 'deposit':
                self.balance += event.data['amount']
            elif event.type == 'withdraw':
                self.balance -= event.data['amount']

class Event:
    def __init__(self, type, data):
        self.id = uuid.uuid4()
        self.timestamp = int(time.time())
        self.type = type
        self.data = data

class EventSourcing:
    def __init__(self):
        self.event_store = EventStore()

    def add_event(self, event):
        self.event_store.store_event(event)

    def get_events(self):
        return self.event_store.get_events()

    def get_event_by_id(self, event_id):
        return self.event_store.get_event_by_id(event_id)

class EventStore:
    def __init__(self):
        self.events = []

    def store_event(self, event):
        self.events.append(event)

    def get_events(self):
        return self.events

    def get_event_by_id(self, event_id):
        for event in self.events:
            if event.id == event_id:
                return event
        raise ValueError('Event not found')

class Auditor:
    def __init__(self, event_store):
        self.event_store = event_store

    def log_events(self):
        for event in self.event_store.get_events():
            print(json.dumps({'id': str(event.id), 'type': event.type, 'data': event.data}))

account = BankAccount()
auditor = Auditor(account.event_sourcing.event_store)

account.deposit(100)
account.withdraw(50)
account.deposit(75)

print('Current balance:', account.get_balance())

print('All events:')
auditor.log_events()

event_id = account.get_events()[1].id
event = account.get_event_by_id(event_id)
print('Event details:', json.dumps({'id': str(event.id), 'type': event.type, 'data': event.data}))

示例代码

优点

缺点

适用场景

注意事项


9. 命令查询职责分离(Command Query Responsibility Segregation, CQRS)
图片来源: https://www.codeproject.com/Articles/555855/Introduction-to-CQRS
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional

class Command(ABC):
    pass

class CreateProductCommand(Command):
    def __init__(self, name: str, price: float):
        self.name = name
        self.price = price

class UpdateProductCommand(Command):
    def __init__(self, product_id: str, name: Optional[str] = None, price: Optional[float] = None):
        self.product_id = product_id
        self.name = name
        self.price = price

class DeleteProductCommand(Command):
    def __init__(self, product_id: str):
        self.product_id = product_id

class Query(ABC):
    pass

class GetProductQuery(Query):
    def __init__(self, product_id: str):
        self.product_id = product_id

class GetAllProductsQuery(Query):
    pass

class Product:
    def __init__(self, id: str, name: str, price: float):
        self.id = id
        self.name = name
        self.price = price

class ProductRepository:
    def __init__(self):
        self.products = []

    def create(self, name: str, price: float) -> Product:
        product = Product(str(len(self.products) + 1), name, price)
        self.products.append(product)
        return product

    def get(self, id: str) -> Optional[Product]:
        for product in self.products:
            if product.id == id:
                return product
        return None

    def get_all(self) -> List[Product]:
        return self.products

    def update(self, id: str, name: Optional[str] = None, price: Optional[float] = None) -> Optional[Product]:
        for product in self.products:
            if product.id == id:
                if name is not None:
                    product.name = name
                if price is not None:
                    product.price = price
                return product
        return None

    def delete(self, id: str) -> bool:
        for product in self.products:
            if product.id == id:
                self.products.remove(product)
                return True
        return False

class ProductCommandHandler:
    def __init__(self, repository: ProductRepository):
        self.repository = repository

    def handle(self, command: Command) -> Optional[Product]:
        if isinstance(command, CreateProductCommand):
            return self.repository.create(command.name, command.price)
        elif isinstance(command, UpdateProductCommand):
            return self.repository.update(command.product_id, command.name, command.price)
        elif isinstance(command, DeleteProductCommand):
            success = self.repository.delete(command.product_id)
            return success

class ProductQueryHandler:
    def __init__(self, repository: ProductRepository):
        self.repository = repository

    def handle(self, query: Query) -> Optional[Any]:
        if isinstance(query, GetProductQuery):
            return self.repository.get(query.product_id)
        elif isinstance(query, GetAllProductsQuery):
            return self.repository.get_all()

class ProductService:
    def __init__(self, command_handler: ProductCommandHandler, query_handler: ProductQueryHandler):
        self.command_handler = command_handler
        self.query_handler = query_handler

    def create_product(self, name: str, price: float) -> Product:
        command = CreateProductCommand(name, price)
        return self.command_handler.handle(command)

    def get_product(self, id: str) -> Optional[Product]:
        query = GetProductQuery(id)
        return self.query_handler.handle(query)

    def get_all_products(self) -> List[Product]:
        query = GetAllProductsQuery()
        return self

示例代码

优点

缺点

适用场景

CQRS和事件驱动的结合

图片来源: https://stackoverflow.com/questions/56728979/event-sourcing-why-a-dedicated-event-store
参考文献

Saga pattern

Microservices Pattern: Sagas

Saga Pattern

Saga Pattern Microservices

Saga Pattern for Microservices Distributed Transactions

Microservice Design Pattern - Saga

Event Driven Saga Pattern

How to Use Saga Pattern in Microservices

Saga Orchestration for Microservices Using the Outbox Pattern

Saga Without the Headaches

Event Sourcing

Event Sourcing - why a dedicated event store?

Beginner's Guide to Event Sourcing

Microservices Pattern: Event Sourcing

Event Sourcing pattern

Event Sourcing

Event Sourcing explained

CQRS Event Sourcing JAVA

Introduction to CQRS

CQRS Pattern

bliki: CQRS

Microservices Pattern: Command Query Responsibility Segregation (CQRS)

A Beginner's Guide to CQRS

CQRS Desgin Pattern in Microservices Architecture

The Command and Query Responsibility Segregation(CQRS)

Event Driven CQRS Pattern

CQRS Pattern

CQRS Software Architecture Pattern: The Good, the Bad, and the Ugly


你好,我是俞凡,在Motorola做过研发,现在在Mavenir做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。
微信公众号:DeepNoMind

上一篇 下一篇

猜你喜欢

热点阅读