7 - 并发编程

2020-07-05  本文已影响0人  天命_风流

只执行一次

package singletion

import (
    "fmt"
    "sync"
    "testing"
    "unsafe"
)

type Singleton struct {
}

var singleInstance *Singleton
var once sync.Once

func GetSingletionObj() *Singleton{  // 使用这个函数,可以实现单例模式
    once.Do(func() {  // 似乎这段代码只会执行一次
        fmt.Println("Create Obj")
        singleInstance = new(Singleton)
    })
    return singleInstance
}

func TestGetSingleonObj(t *testing.T) {
    var wg = sync.WaitGroup{}
    for i := 0 ; i < 10 ; i++{
        wg.Add(1)
        go func() {
            obj := GetSingletionObj()
            fmt.Printf("%x\n", unsafe.Pointer(obj))
            wg.Done()
        }()
    }
    wg.Wait()
}

仅需任意任务完成

package first_response

import (
    "fmt"
    "runtime"
    "testing"
    "time"
)

func runTask(id int) string{
    time.Sleep(time.Millisecond * 15)
    return fmt.Sprintf("the result is from %d", id)
}

func FirstResponse() string{
    numOfRunner := 10
    ch := make(chan string)
    for i:= 0 ; i < numOfRunner ; i++{
        go func(i int) {
            defer func() {
                fmt.Println("over: ", i)
            }()
            ret := runTask(i)
            ch <- ret  // [1]
        }(i)
    }
    return <- ch
}

func TestFirstResponse(t *testing.T)  {
    t.Log("Before:", runtime.NumGoroutine())  // 2
    t.Log(FirstResponse())  // the result is from 4
    time.Sleep(time.Second * 1)
    t.Log("After:", runtime.NumGoroutine())  // 11
}
package first_response

import (
    "fmt"
    "runtime"
    "testing"
    "time"
)

func runTask(id int) string{
    time.Sleep(time.Millisecond * 15)
    return fmt.Sprintf("the result is from %d", id)
}

func FirstResponse() string{
    numOfRunner := 10
    ch := make(chan string, numOfRunner)  // 这里是 buffer channal
    for i:= 0 ; i < numOfRunner ; i++{
        go func(i int) {
            defer func() {
                fmt.Println("over: ", i)
            }()
            ret := runTask(i)
            ch <- ret
        }(i)
    }
    return <- ch
}

func TestFirstResponse(t *testing.T)  {
    t.Log("Before:", runtime.NumGoroutine())  // 2
    t.Log(FirstResponse())  // the result is from 4
    time.Sleep(time.Second * 1)
    t.Log("After:", runtime.NumGoroutine())  // 2
}

等待所有任务完成

package all_response

import (
    "fmt"
    "runtime"
    "testing"
    "time"
)

func runTask(id int) string{
    time.Sleep(time.Millisecond * 15)
    return fmt.Sprintf("the result is from %d", id)
}

func AllResponse() string{
    numOfRunner := 10
    ch := make(chan string, numOfRunner)
    for i:= 0 ; i < numOfRunner ; i++{
        go func(i int) {
            defer func() {
                fmt.Println("over: ", i)
            }()
            ret := runTask(i)
            ch <- ret  // [1]
        }(i)
    }

    finalRet := ""
    for j := 0 ; j < numOfRunner ; j++{  // 注意,这里是串行的
        finalRet += <- ch + "\n"
    }

    return finalRet
}

func TestFirstResponse(t *testing.T)  {
    t.Log("Before:", runtime.NumGoroutine())  // 2
    t.Log(AllResponse())  // the result is from 4
    time.Sleep(time.Second * 1)
    t.Log("After:", runtime.NumGoroutine())  // 2
}

资源池

package obj_pool

import (
    "errors"
    "time"
)

type ReusableObj struct {
    // 在这里定义 pool 中的对象
}

type ObjPool struct {
    bufChan chan *ReusableObj  // 使用一个 channal 存放 ReusableObj 的指针
}

func NewObjPool(numOfObj int) *ObjPool{
    objPool := ObjPool{}
    objPool.bufChan = make(chan *ReusableObj, numOfObj)
    for i := 0 ; i < numOfObj ; i++{
        objPool.bufChan <- &ReusableObj{}
    }
    return &objPool
}

func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error){
    select {
    case ret := <- p.bufChan :
        return ret, nil
    case <- time.After(timeout):
        return nil, errors.New("time out")
    }
}

func (p *ObjPool) ReleaseObj(obj *ReusableObj) error{
    select {
    case p.bufChan <- obj:
        return nil
    default:
        return errors.New("overflow")
    }
}

func TestObjPool(t *testing.T){
    pool := NewObjPool(10)  // 建立一个资源池,有 10 个资源对象

    if err := pool.ReleaseObj(&ReusableObj{}); err != nil{  // 尝试向满的资源池中放入新的资源
        t.Error(err)  // overflow
    }

    for i := 0 ; i < 11 ; i++{
        if v, err := pool.GetObj(time.Second * 1); err != nil {  // 注意,每次执行都会首先在 pool 中 get 一下
            t.Error(err)
        } else {  // 然后程序会执行下面的代码,这种方式你可能觉得有点奇怪,但这就是 go 的一大特色
            fmt.Printf("%T,%d\n", v, i)
            if err := pool.ReleaseObj(v); err != nil{
                t.Error(err)
            }
        }
    }
    fmt.Println("Done")
}

sync.Pool(对象缓存)

1. 对象获取

2.对象放回

3.使用方法

image.png

4.生命周期

5.代码实例

package sync_pool

import (
    "fmt"
    "sync"
    "testing"
)

func TestSyncPool(t *testing.T)  {
    pool := &sync.Pool{
        New: func() interface{} {  // Pool 中的 New 变量,需要指定为一个函数,用于创建值
            fmt.Println("Create a new object.")
            return 100
        },
    }

    v := pool.Get().(int)  //  所有资源都为空,v 取到为 100
    fmt.Println(v)
    pool.Put(3)  // 放入一个 3
    //runtime.GC()
    v1, _ := pool.Get().(int)  // v1 为 3
    fmt.Println(v1)
    v2, _ := pool.Get().(int)  // v2 为 100
    fmt.Println(v2)
}

func TestSyncPoolInMultiGroutine(t *testing.T){
    pool := &sync.Pool{
        New: func() interface{}{
            fmt.Println("Creat a new object.")
            return 10
        },
    }

    pool.Put(100)
    pool.Put(100)
    pool.Put(100)

    var wg sync.WaitGroup
    for i := 0 ; i < 10 ; i++{
        wg.Add(1)
        go func(id int) {
            fmt.Println(pool.Get())  // 前三个为 100,后面的都是 10
            wg.Done()
        }(i)
    }

    wg.Wait()
}

6.总结

上一篇 下一篇

猜你喜欢

热点阅读