golang学习篇章

并发小工具整理

2021-01-25  本文已影响0人  Best博客

小工具

并发场景用一些经历过生产检验的小工具,比自己临时磨刀要安全,go-zero的core里面很多他们自己写好的小工具,没啥依赖,你要觉得好完全可以copy到你自己的项目里面使用,真香。

并发下多个 goroutine ,任意一个返回error,则拿到error并finish不在阻塞

更多使用魔法https://github.com/tal-tech/zero-doc/blob/main/doc/mapreduce.md

package mr

import (
    "errors"
    "io/ioutil"
    "log"
    "runtime"
    "sync/atomic"
    "testing"
    "time"

    "github.com/stretchr/testify/assert"
    "github.com/tal-tech/go-zero/core/stringx"
    "github.com/tal-tech/go-zero/core/syncx"
)

//调用链路 Finish -> MapReduceVoid-> MapReduce -> MapReduceWithSource->executeMappers
// 多个goroutine 有一个返回error,如果还没有执行的func就会不执行了,已经执行了的,业务就不管它了,也不会去等待它返回
func TestFinish(t *testing.T) {
    var total uint32
    err := Finish(func() error {
        atomic.AddUint32(&total, 2)
        err := errors.New("aaaa")
        return err
    }, func() error {
        atomic.AddUint32(&total, 3)
        return nil
    }, func() error {
        atomic.AddUint32(&total, 5)
        return nil
    })

    assert.Equal(t, uint32(10), atomic.LoadUint32(&total))
    assert.Nil(t, err)
}

并发场景下的func 串的方式执行

  1. 当一个func被并发调用,只会有一个goroutine拿到锁执行,该func在执行期间,并发的调用行为都不会真正执行,直接返回结果,但该func执行完毕后,又来了并发调用则又走以上流程。 适用与突增并发类型,用于保证该func同时执行的次数为1
    举例:func执行花了1秒,期间该函数被并发调用了1000次,则该func在这1S调用的1000次里面实际只执行1次;但是如果你这1000次是每隔1s调用一次,则该函数就会被真正执行1000次。 内部用到了lock
    https://github.com/tal-tech/zero-doc/blob/main/doc/sharedcalls.md
package syncx

import (
    "errors"
    "fmt"
    "sync"
    "sync/atomic"
    "testing"
    "time"
)
//值得注意的是,不是分布式控制的,只能达到去重目的,并不能保证真的只执行一次,因为源代码里面有个delete掉key的行为
// 可能是作者的思想是只保证fn()在执行期间来的并发就不需要执行,直接返回结果
// 但一开始我还以为是只执行一次
func TestExclusiveCallDo(t *testing.T) {
    g := NewSharedCalls()
    v, err := g.Do("key", func() (interface{}, error) {
        return "bar", nil
    })
    if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want {
        t.Errorf("Do = %v; want %v", got, want)
    }
    if err != nil {
        t.Errorf("Do error = %v", err)
    }
}

// 这个这样子写  能保证只执行一次,但其实时伪保证,是在故意加大fn执行时间
func TestExclusiveCallDoDupSuppress(t *testing.T) {
    g := NewSharedCalls()
    c := make(chan string)
    var calls int32
    fn := func() (interface{}, error) {
        atomic.AddInt32(&calls, 1)
        return <-c, nil
    }

    const n = 10
    var wg sync.WaitGroup
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func() {
            v, err := g.Do("key", fn)
            if err != nil {
                t.Errorf("Do error: %v", err)
            }
            if v.(string) != "bar" {
                t.Errorf("got %q; want %q", v, "bar")
            }
            wg.Done()
        }()
    }
    time.Sleep(100 * time.Millisecond) // let goroutines above block
    c <- "bar"
    wg.Wait()
    if got := atomic.LoadInt32(&calls); got != 1 {
        t.Errorf("number of calls = %d; want 1", got)
    }
}

生成只执行一次的函数

package syncx

import (
    "fmt"
    "github.com/stretchr/testify/assert"
    "testing"
    "time"
    "sync"
)

func Once(fn func()) func() {
    once := new(sync.Once)
    return func() {
        once.Do(fn)
    }
}

//只打印一条内容,因为add只执行一次
func TestOnce(t *testing.T) {
    var v int
    add := Once(func() {
        v++
    })

    for i := 0; i < 5; i++ {
        add()
    }
    fmt.Printf("--- 执行次数 ---")
    time.Sleep(time.Second)
}

让函数触发多次只执行一次,并能拿到返回结果,非并发安全

package syncx

import (
    "sync"
    "time"
    "sync/atomic"

    "github.com/tal-tech/go-zero/core/timex"
)

const defaultRefreshInterval = time.Second

type (
    ImmutableResourceOption func(resource *ImmutableResource)

    ImmutableResource struct {
        fetch           func() (interface{}, error)
        resource        interface{}
        err             error
        lock            sync.RWMutex
        refreshInterval time.Duration
        lastTime        *AtomicDuration
    }
)

func NewImmutableResource(fn func() (interface{}, error), opts ...ImmutableResourceOption) *ImmutableResource {
    // cannot use executors.LessExecutor because of cycle imports
    ir := ImmutableResource{
        fetch:           fn,
        refreshInterval: defaultRefreshInterval,
        lastTime:        NewAtomicDuration(),
    }
    for _, opt := range opts {
        opt(&ir)
    }
    return &ir
}

func (ir *ImmutableResource) Get() (interface{}, error) {
    ir.lock.RLock()
    resource := ir.resource
    ir.lock.RUnlock()
    if resource != nil {
        return resource, nil
    }

    ir.maybeRefresh(func() {
        res, err := ir.fetch()
        ir.lock.Lock()
        if err != nil {
            ir.err = err
        } else {
            ir.resource, ir.err = res, nil
        }
        ir.lock.Unlock()
    })

    ir.lock.RLock()
    resource, err := ir.resource, ir.err
    ir.lock.RUnlock()
    return resource, err
}

func (ir *ImmutableResource) maybeRefresh(execute func()) {
    now := timex.Now()
    lastTime := ir.lastTime.Load()
    if lastTime == 0 || lastTime+ir.refreshInterval < now {
        ir.lastTime.Set(now)
        execute()
    }
}

// Set interval to 0 to enforce refresh every time if not succeeded. default is time.Second.
func WithRefreshIntervalOnFailure(interval time.Duration) ImmutableResourceOption {
    return func(resource *ImmutableResource) {
        resource.refreshInterval = interval
    }
}



type AtomicDuration int64

func NewAtomicDuration() *AtomicDuration {
    return new(AtomicDuration)
}

func ForAtomicDuration(val time.Duration) *AtomicDuration {
    d := NewAtomicDuration()
    d.Set(val)
    return d
}

func (d *AtomicDuration) CompareAndSwap(old, val time.Duration) bool {
    return atomic.CompareAndSwapInt64((*int64)(d), int64(old), int64(val))
}

func (d *AtomicDuration) Load() time.Duration {
    return time.Duration(atomic.LoadInt64((*int64)(d)))
}

func (d *AtomicDuration) Set(val time.Duration) {
    atomic.StoreInt64((*int64)(d), int64(val))
}


// 
func main(){
    var count int
    r := NewImmutableResource(func() (interface{}, error) {
        count++
        time.Sleep(time.Millisecond*1) //这里sleep一下并发就不能保证只执行一次了
        return "hello", nil
    })
    for i:=0;i<100;i++{
        go func() {
            res, err := r.Get()  //就需要你在这里的Get加锁后再执行了
            assert.Equal(t, "hello", res)
            assert.Equal(t, 1, count)
            assert.Nil(t, err)
        }()
    }

    time.Sleep(time.Second * 1)

    // again
    res, err := r.Get()
}

上一篇 下一篇

猜你喜欢

热点阅读