Go Observer

2021-02-18  本文已影响0人  JunChow520

生产者消费者模式

并发编程中最常见的是生产者消费者模式,该模式主要是通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。简单来说,生产者产出数据后放入成果队列中,同时消费者从成果队列中取出数据。这样就让生产消费变成了异步的两个过程。当成果队列中没有数据时,消费者就会进入饥饿的等待中,当成果队列中数据已满时,生产者则面临因产品挤压导致CPU被剥夺的下岗问题。

生产消费模式

生产消费模式的特点在于多个消费者监听消息队列,一旦消息到达消费者会马上消费,谁先抢到会算谁的。若队列中没有消息则消费者会继续监听。

package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
)

//生产者
func Producer(factor int, out chan<- int) {
    for i := 0; ; i++ {
        out <- i * factor
    }
}

//消费者
func Consumer(in <-chan int) {
    for item := range in {
        fmt.Println(item)
    }
}

func main() {
    //成果队列
    ch := make(chan int, 64)
    //生产3倍数序列
    go Producer(2, ch)
    //消费生成的队列
    go Consumer(ch)
    //延迟退出
    //time.Sleep(time.Second * time.Duration(2))
    //CTRL+C退出
    sig := make(chan os.Signal, 1)
    signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
    fmt.Println("quit %v", <-sig)
}

生产者消费者问题

生产者消费者问题是线程模型中的经典问题,即生产者和消费者在同一时间段内公用同一内存空间,生产者向空间中生产数据,而消费者取走数据。简单来说,就是N个线程同时进行生成,N个线程同时进行消费,两种角色通过内存缓冲区进行通信。

生产者消费者模式

软件开发过程中经常会遇到,某个模块负责产生数据,产生的数据交由另一个模块来负责处理,这里的模块是广义的,可以是类、函数、线程、进程等。产生数据的模块会被形象地称之为生产者,处理数据的模块则称之为消费者。在生产者消费者模式中,还需要一个缓冲区来作为中介,即生产者将数据放入缓存区,消费者从缓冲区取出数据。

生产者消费者模式是通过一个容器来解决生产者和消费者之间的强耦合关系,生产者和消费者彼此之间无需直接通讯,而是通过阻塞队列来进行通讯。因此生产者生产完数据后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列中获取。阻塞队列相当于一个缓冲区(数据仓库),用来平衡生产者和消费者的处理能力。阻塞队列实际上是用来给生产者和消费者解耦的,让生产者和消费者只依赖于缓冲区而非对方。因此生产者消费者模式可视为一个多线程并发写作的模式。

堵塞队列

若不采用生产者消费者模式,消费者直接调用生产者的某个方法会存在一个弊端,由于函数调用是同步的,因此在生产者方法没有返回之前,消费者只能一直处于等待状态,即发生阻塞。如果生产者生产数据很慢则消费者一直会浪费资源。

使用了生产者消费者模式后,生产者和消费者可以是两个独立的并发主体。生产者将制造出来的数据往缓存区一丢,就可以再去生产下一个数据,不用依赖于消费者的处理速度。

使用缓冲区的好处在于如果制造数据的速度时快时慢,当数据制造快时消费者会来不及处理,未处理的数据可以暂存在缓冲区中,等生产者制造速度慢下来的时候,消费者在慢慢处理掉。

堵塞队列

生产者消费者模式也可采用生产者产出任务直接交给消费者多线程来处理,若消费者能处理就直接处理,如果处理不过来则放入缓冲区。

生产者消费者模式

发布订阅模式

发布订阅模式是一种特殊的生产消费模式,多个观察者订阅消息频道,只要发布者发布消息,所有订阅者都能接收到消息,因此订阅者之间都是平等的。

不同之处在于生产消费模式中主动权在消费者手中,发布订阅主动权则在发布者手中。另外生产消费中所有消费者会抢占消息,发布订阅则是所有订阅者共享消息。

发布订阅(publish-and-subscribe)模型简写为pub/sub模型,该模型中消息生产者称之为发布者(publisher),消息消费者则称之为订阅者(subscriber)。生产者和消费者之间是M:N的关系。在传统生产者消费者模型中,会将消息发布到一个队列中,而发布订阅模型则会将消息发布给一个主题。

发布订阅模型中,每条消息都会传送给多个订阅者,发布者不知道也不关心哪一个订阅者正在接收主题消息。订阅者和发布者可以在运行时动态添加,因此是一种松耦合关系。

另外发布订阅模式中,发布者并不会直接通知订阅者,也就是说发布者和订阅者彼此之间互不相识。一般会采用第三者即在消息队列中的经纪人(Broker)来交流沟通,亦或是代理或消息代理或事件总线。换句话说,发布订阅模式中发布者和订阅者之间不是松耦合而是完全解耦的。

Broker

发布订阅模式是用于不同系统组件之间传递消息的模式,这些组件不知道彼此身份的任何信息,经纪人是如何通过所有消息的呢?实际上,最常用的方式是基于主题和基于内容。

发布订阅模式属于广义的观察者模式,发布订阅模式是观察者模式中最常见的一种实现,从解耦合和重用的角度上来看,更优于观察者模式。

观察者模式中,观察者需要直接订阅目标事件,在目标发出内容改变的事件后会直接接收事件并做出响应。而在发布订阅模式中,发布者和订阅者之间多了一个发布通道,以方便从发布者接收事件,另一方面向订阅者发布事件,订阅者需要从事件通道中订阅事件。以此避免发布者和订阅者之间产生依赖。

观察者模式

观察者模式(Observer Pattern)定义了对象之间一对多依赖,当某个对象改变状态时,它所有的依赖着都会接收到通知并自动更新。发生改变的对象称之为观察者,一个观察目标可以对应多个观察者,多个观察者之间没有任何联系,可根据需要增加或删除观察者,使系统更易于扩展。当对象间存在一对多的关系时,则可使用观察者模式。

//Observer 观察者接口
type Observer interface {
    Notify(param interface{})
}

//Notifier 通知者
type Notifier struct {
    observers []Observer //观察者
}

//Register 添加观察者
func (n *Notifier) Register(observer Observer) {
    n.observers = append(n.observers, observer)
}

//Remove 退出观察者
func (n *Notifier) Remove(observer Observer) {
    observers := make([]Observer, 0)
    for _, item := range n.observers {
        if item != observer {
            observers = append(observers, observer)
        }
    }
    n.observers = observers
}

//Notify 分发通知
func (n *Notifier) Notify(param interface{}) {
    for _, observer := range n.observers {
        observer.Notify(param)
    }
}

//NewNotifier 创建
func NewNotifier() *Notifier {
    return &Notifier{
        observers: make([]Observer, 0),
    }
}

观察者模式又称为发布-订阅模式(Publish/Subscribe)、模型-视图模式(Model/View)、源-监听器模式(Source/Listener)、从属模式(Dependents)。

观察者模式属于行为模式,行为模式关注的是对象之间的通讯,观察者模式是观察者和被观察者之间通讯。

观察者模式
type Observer interface {
    Update()
}

type Subject interface {
    Notify()
    AddObserver(observer Observer)
    RemoveObserver(observer Observer)
}

例如:使用观察者模式实现对天气预报的观察

type Weather struct {
    observers []Observer
}

func (w *Weather) AddObserver(observer Observer) {
    w.observers = append(w.observers, observer)
}
func (w *Weather) RemoveObserver(observer Observer) {
    for index, item := range w.observers {
        if item == observer {
            w.observers = append(w.observers[:index], w.observers[index+1:]...)
        }
    }
}
func (w *Weather) Notify() {
    for _, item := range w.observers {
        item.Update()
    }
}

type Observer1 struct{}

func (o *Observer1) Update() {
    fmt.Println("observer1 update")
}

var (
    subject   Subject
    observer1 Observer
)

func main() {
    subject = &Weather{}
    observer1 = &Observer1{}
    subject.AddObserver(observer1)
    subject.Notify()
}

观察者模式的优点

观察者模式的缺点

//Observer 观察者接口
type Observer interface {
    Notify(param interface{})
}

//Notifier 通知者
type Notifier struct {
    observers []Observer //观察者
}

//Register 添加观察者
func (n *Notifier) Register(observer Observer) {
    n.observers = append(n.observers, observer)
}

//Remove 退出观察者
func (n *Notifier) Remove(observer Observer) {
    observers := make([]Observer, 0)
    for _, item := range n.observers {
        if item != observer {
            observers = append(observers, observer)
        }
    }
    n.observers = observers
}

//Notify 分发通知
func (n *Notifier) Notify(param interface{}) {
    for _, observer := range n.observers {
        observer.Notify(param)
    }
}

//NewNotifier 创建
func NewNotifier() *Notifier {
    return &Notifier{
        observers: make([]Observer, 0),
    }
}

观察者模式和发布订阅模式之间的区别

区别
上一篇下一篇

猜你喜欢

热点阅读