微服务开发中的问题和解决方法

2019-10-10  本文已影响0人  xuyaozuo

问题

目标:

归纳一些常见的微服务开发问题, 分享一些通用设计方案, 并希望使用一些通用库解决微服务开发中的问题。

微服务中遇到的问题:

缓存
异步任务

场景:特定条件触发爬虫。定时聚合各个服务的数据,轻量的计算任务。跨多个服务的分布式服务的事务。

异步任务需要满足的条件:

消息推送:

保证数据入库和消息发送的一致性:

客户端(外部服务调用)

通用的简单的监控方式

goroutine 管理

go func() {
    // long time run code ....
}
分区管理

在很多情况下需要分区管理

代码式例

过程监控

对应需要监控一个特定的过程,使用一个通用的接口提供 log tracing metric 三种方式的监控。

// 对操作行为的记录,在操作之前生成一个未提交的记录,完成后提交记录
func ExampleBasicUse() {
    factory := EasyRecorders("test-record")

    func(ctx context.Context) {
        var err error
        recorder, ctx := factory.ActionRecorder(ctx, "do-something") // 生成一个记录
        defer func() {
            recorder.Commit(err, BoolField("remote err", true)) // 提交这个记录
        }()
        // err = doSomething(ctx)
    }(context.Background())
}

更加易于在拦截器中使用

func ExampleWrap() {
    factory := EasyRecorders("test-record")

    type operation func (ctx context.Context) error

    wrap := func(oper operation) operation {
        return func (ctx context.Context) error {
            var err error
            recorder, ctx := factory.ActionRecorder(ctx, "test-function") // 生成一个记录
            defer func() {
                recorder.Commit(err) // 提交这个记录
            }()
            err = oper(ctx)
            return err
        }
    }

    wrapped := wrap(func(ctx context.Context) error {
        // 真正的业务逻辑
        return errors.New("err occur")
    })

    wrapped(context.Background())
    return
}
缓存

缓存主要是围绕防止缓存击穿设计的,

func ExampleBasicUse() {
    var m = &sync.Map{}

    handler := ResourceHandler{
        ThroughLimit: rate.NewLimiter(rate.Every(100 *time.Millisecond), 10), // 全局的缓存击穿限流
        // 主要的功能
        FindFromCacheFunc: func(ctx context.Context, request Request) (*Resource, error) {
            // 从缓存中获取数据
            v, ok := m.Load(request.ID)
            if !ok { // 如果数据不存在则返回空
                return nil, nil
            }
            return &Resource{
                Data: v,
            }, nil
        },
        FetchFromStoreFunc: func(ctx context.Context, request Request) (*Resource, error) {
            // 从数据库(上游获取数据)
            // data = database.query()
            var data interface{}
            return &Resource{
                Data: data,
            }, nil
        },
        UpdateCacheFunc: func(ctx context.Context, request Request, res Resource) (*Resource, error) {
            // 跟新缓存数据
            m.Store(request.ID, res.Data)
            return &res, nil
        },

        // 辅助的工作函数
        ThroughOnCacheErrFunc: func(ctx context.Context, resource Request, err error) (goThrough bool) {
            // 缓存失效的时候是否需要击穿,全局的击穿限流依然保持
            if err == errors.New("good err") {
                return true
            }
            return false
        },
        Downgrade: func(ctx context.Context, resource Request, err error) (*Resource, error) {
            // 缓存失效的时候是否启用降级的数据
            if err == errors.New("good err") {
                return &Resource{
                    Data: "default value",
                }, nil
            }
            return nil, err
        },
    }

    factory := record.EasyRecorders("test-cache-record") // 记录器

    mid := NewRecorderMid(factory) // 插件 就是一个拦截器

    repository := NewRepository(handler, zap.L(), mid) // 生成一个repository对象

    ctx, _ := context.WithTimeout(context.Background(), time.Second)

    res, err := rep.Find(ctx, Request{
        ID: "1",
    })

    if err != nil {
        log.Println("find res is err")
        return
    }

    if res == nil {
        log.Println("can not found resource")
        return
    }

    log.Println("did get data:", res.Data)
}

任务管理
func ExampleBasicUse() {
    // 任务调度器,负责触发任务,任务存储
    scheduler := NewMemoScheduler(time.Minute) 

    // 执行器,执行具体的业务逻辑
    executor := ExecutorFunc(func(ctx Context, res *Result) {
        // 执行业务逻辑的代码
        var err error
        // doSomething async
        if err == nil {
            res.Finish() //标记为结束, 不会再执行。
        } else {
            res.WaitAndReDo(3 * time.Minute) // 可能执行失败,等三分钟再执行。
        }
        return
    })

    mid := NewRecorderMiddle(record.EasyRecorders("task-record")) // 插件,用于监控任务变化。

    // 构建一个taskManager 任务管理器
    taskManager := NewManager(scheduler, executor, ManagerOption{
        MaxBusterTask: 10, // 同时并发执行的任务
        DefaultExecMaxDuration: time.Minute, //最大执行的任务的时间
        DefaultRecoverCount: 10, // 任务可以从失败中恢复的次数
        Middle: []Middle{mid}, // record mid
    })

    //具体如何执行一个任务的流程

    opt := Option{}. // 一个任务具体的选项
        SetOverLap(true). // 如果任务已经存在则可以覆盖这个任务. 会先尽量停止已有的任务,再开始新的任务
        SetExpectStartTime(time.Now().Add(time.Minute)) //定时执行,这个任务可以被延后执行

    // 声明一个新的任务
    err := taskManager.ApplyNewTask(context.Background(), "task-key-1", opt)
    if err != nil {
        log.Println("apply new task err:", err)
    }
}

消息投递

    func (rep *Repository) saveResult(data Data, events []Event) error {}
func ExampleNotifierBasicUse() {
    // publisher 是一个推送的抽象接口
    var publisher Publisher = MockPublisher{
        PublishFunc: func(ctx context.Context, message []OutputMessage) error {
            log.Println("did push message")
            return nil
        },
    }

    // stream 可以中断并且重续,并从一个流节点开始,这个是内存版本的stream 也有基于mongodb change stream的
    var stream OutputStream = NewMemoMessageStream() 

    notifier := New(stream, publisher, Option{
        MaxBlockCount: 1, // 可以合并请求的发送数量
        MaxBlockDuration: time.Second, // 可以合并请求的发送时间
    })

    notifier.Start() //start 之后就会不断从 stream 对象中获取数据
    // add data into stream
    notifier.Close()
}

分区

func ExampleLeaderBasicUse() {
    var parts []*Partition
    for i := 0 ; i < 10; i ++ {
        member := newSimpleTestMember("node192.168.0.1")
        partition := NewPartition(PartitionID(i), member, nil)
        parts = append(parts, partition)
    }

    leaderGroup := NewLeaders(parts...)

    // SyncLeader 是同步leader, 当member 成为某一个分区leader的时候,会调用func匿名函数,如果失去leader 资格,context 会被关闭
    leaderGroup.SyncLeader(context.Background(), func(ctx context.Context, part *Partition) {
        // start notify kafka topic
        // start task manager
        // start push part message
    })
}

func newSimpleTestMember(nodeID string) *Member {
    // Elector 是一个选举裁决者的接口
    var elector Elector = NewMemoElector()

    // 生成选举数据
    var electionFactory ElectionFactory = &ConstElectionFactory{
        ConstID: nodeID, // 唯一表示当前节点的唯一id, 可以用ip container_id 随机数等等。
    }

    member := NewMember(elector, electionFactory, Option{
        MaxElectDuration: 2 * time.Second,
        MaxKeepLiveDuration: time.Second,
    })

    member.Start()

    // SyncLeader 是同步leader, 当member 成为leader的时候,会调用func匿名函数,如果失去leader 资格,context 会被关闭
    /*member.SyncLeader(context.Background(), func(ctx context.Context) {

    })*/
}

客户端代理
func ExampleBasicUse() {
    // agent 是一个接口,其对应的即是真实的client。
    var agent RecoverableAgent // = .....

    // 监控用插件
    recordMid := NewRecorderMiddle(record.EasyRecorders("client-test"))

    // 基础的限流插件
    breakerMid := NewBasicBreakerMiddle(
        rate.NewLimiter(rate.Every(time.Second), 10),
        agent,
        time.Second, // 错误限流后等待的最少时间
        3 * time.Second, // 错误限流后最长的等待时间
    )

    opt := Option{}.
        SetParallelCount(10). // 并发数量
        AddMiddle(breakerMid, recordMid)

    client := New(agent, opt)

    err := client.Do(context.Background(), func(ctx context.Context, agent Agent) error {
        var err error
        // dbClient := agent.(*DbClient)
        // dbClient.query() ...
        // dbClient.Update() ...
        // return err
        return err
    }, ActionOption{}.
        SetPartition(1)) // 设置分区id, 分区能保证的是相同的分区id同时只有一个在执行。

    if err != nil {
        log.Println("client do err:", err)
    }
}

未解决以上问题提供了一些基础库

https://github.com/feynman-go/workshop

野gorounte管理

使用一个探针对象去控制和管理 goroutine

func ExampleBasicUse() {
    // 创建一个探针
    pb := New(func(ctx context.Context) {
        // do something cost long time
        select {
        case <- ctx.Done():
        case <- time.After(time.Minute):
        }
    })

    pb.Start()
    // do something else

    // stop will cancel context
    pb.Stop()

    // wait goroutine stop
    <- pb.Stopped()
}

可以使用 context 推出的互斥量
func ExampleBasicUse() {
    mx := &Mutex{}
    ctx, _ := context.WithTimeout(context.Background(), time.Second)
    if mx.Hold(ctx) { // other invoke will block in this method
        // do something
        mx.Release()
    }
}
上一篇 下一篇

猜你喜欢

热点阅读