Ovirt程序员

【Ovirt 笔记】Reactive Streams 的实现原理

2018-05-29  本文已影响19人  58bc06151329

文前说明

作为码农中的一员,需要不断的学习,我工作之余将一些分析总结和学习笔记写成博客与大家一起交流,也希望采用这种方式记录自己的学习之旅。

本文仅供学习交流使用,侵权必删。
不用于商业目的,转载请注明出处。

分析整理的版本为 Ovirt 4.2.3 版本。

1. 简介

1.1 流

1.1.1 发布订阅模式与观察者模式的区别

╭─────────────╮  Fire Event  ╭──────────────╮
│               │────────────>│                 │
│   Subject     │             │   Observer      │
│               │<────────────│                 │
╰─────────────╯  Subscribe  ╰────────────────╯
╭─────────────╮                 ╭───────────────╮   Fire Event  ╭──────────────╮
│               │  Publish Event │                  │─────────────>│                │
│  Publisher    │───────────────>│ Event Channel    │              │  Subscriber    │
│               │                │                  │<─────────────│                │
╰─────────────╯                ╰─────────────────╯    Subscribe ╰──────────────╯

1.2 特殊情况处理

1.2.1 发布者与订阅者不按照同样的速率工作

1.2.2 订阅者请求发布者的元素不可用

1.3 响应式流(Reactive Streams)

2. 实现分析与整理

响应式流模型

2.1 发布者与订阅者的交互

发布者与订阅者的交互交互步骤,以虚拟机迁移功能模块代码为例(发布者 EventPublisher 与订阅者 VmMigrationProgressMonitoring 的交互,订阅令牌为 Subscription)。

  1. 创建发布者和订阅者,它们分别是 PublisherSubscriber 接口的实例。
    • EventSubscriber 是订阅者的基类。
    • VmMigrationProgressMonitoring 是订阅者的其中一个子类。
    • EventPublisher 是发布者实现类。
  2. 订阅者通过调用发布者的 subscribe() 方法来尝试订阅(绑定)发布者。
@PostConstruct
private void subscribe() {
     resourceManager.subscribe(this);
}
public void subscribe(EventSubscriber subscriber) {
     log.debug("subscribe called with subscription id: {}", subscriber.getSubscriptionId());
     ReactorFactory.getWorker(this.parallelism).getPublisher().subscribe(subscriber);
}
  1. 订阅成功,发布者 EventPublisher 使用 Subscription 异步调用订阅者的 onSubscribe(Subscription s) 方法。
@Override
public void subscribe(final EventSubscriber subscriber) {
    final AtomicInteger count = new AtomicInteger();
    final SubscriptionHolder holder = new SubscriptionHolder(subscriber, count);
    Subscription subscription = new Subscription() {

            @Override
            public void request(int n) {
                count.addAndGet(n);
                process(holder);
            }

            @Override
            public void cancel() {
                clean(holder);
                subscriber.onComplete();
            }
     };
     subscriber.onSubscribe(subscription);
     this.matcher.add(holder);
}
  1. 订阅者可以通过调用 Subscriptionrequest(int n) 方法向发布者发送多个元素的请求。订阅者可以向发布者发送更多元素的多个请求,而不必等待其先前请求是否完成。
@Override
public void onSubscribe(Subscription sub) {
      subscription = sub;
      subscription.request(1);
}
  1. 发布者在所有先前的请求中调用订阅者的 onNext(T t) 方法,直到订阅者请求的元素数量上限,在每次调用中向订阅者发送一个元素(这里的元素为 Map<String, Object>)。
    • 如果发布者没有更多的元素要发送给订阅者,则发布者调用订阅者的 onComplete() 方法来发信号通知流,从而结束发布者与订阅者交互。
    • 如果订阅者请求 Long.MAX_VALUE 元素,则实际上是无限制的请求,并且流实际上是推送流。
    • 如果请求处理失败,则使用调用订阅者的 onError(Throwable t) 方法,并且发布者与订阅者交互结束。
Map<String, Object> map = this.decomposer.decompose(event);
......
subscriber.onNext(map);
if (map.containsKey(JsonRpcEvent.ERROR_KEY)) {
    subscriber.onError(new ClientConnectionException((String) map.get(JsonRpcEvent.ERROR_KEY)));
......
  1. 如果发布者随时遇到错误,会调用订阅者的 onError() 方法。
  2. 订阅者可以通过调用其 Subscriptioncancel() 方法来取消订阅。
    • 一旦订阅被取消,发布者与订阅者交互结束。然而,如果在请求取消之前存在未决请求,订阅者可以在取消订阅之后接收元素。
@Override
public void cancel() {
   clean(holder);
   subscriber.onComplete();
}

2.2 响应式流总结

用例的响应式流模型
onSubscribe onNext* (onError | onComplete)?

3. 结合业务实现分析

public StatusOnlyReturn migrate(Map<String, Object> migrationInfo) {
    JsonRpcRequest request = new RequestBuilder("VM.migrate").withParameter("vmID", getVmId(migrationInfo)).withParameter("params", migrationInfo).build();
    Map<String, Object> response = new FutureMap(this.client, request);
    return new StatusOnlyReturn(response);
}
def migrate(self, params):
        self._acquireCpuLockWithTimeout()
        try:
            # It is unlikely, but we could receive migrate()
            # request right after a VM was started or right
            # after a VM just went down
            if self._lastStatus in (vmstatus.WAIT_FOR_LAUNCH,
                                    vmstatus.DOWN):
                raise exception.NoSuchVM()
            if self.hasTransientDisks():
                return response.error('transientErr')
            self._migration_downtime = None
            self._migrationSourceThread = migration.SourceThread(
                self, **params)
            self._migrationSourceThread.start()
            self._migrationSourceThread.getStat()
            self.send_status_event()
            return self._migrationSourceThread.status
        finally:
            self._guestCpuLock.release()
def monitor_migration(self):
......
progress = Progress.from_job_stats(job_stats)
self._vm.send_migration_status_event()
def send_migration_status_event(self):
        migrate_status = self.migrateStatus()
        postcopy = self._post_copy == migration.PostCopyPhase.RUNNING
        status = {
            'progress': migrate_status['progress'],
            'postcopy': postcopy,
        }
        if 'downtime' in migrate_status:
            status['downtime'] = migrate_status['downtime']
        self._notify('VM_migration_status', status)
def _notify(self, operation, params):
        sub_id = '|virt|%s|%s' % (operation, self.id)
        self.cif.notify(sub_id, {self.id: params})
def notify(self, event_id, params=None):
        """
        Send notification using provided subscription id as
        event_id and a dictionary as event body. Before sending
        there is notify_time added on top level to the dictionary.

        Please consult event-schema.yml in order to build an appropriate event.
        https://github.com/oVirt/vdsm/blob/master/lib/api/vdsm-events.yml

        Args:
            event_id (string): unique event name
            params (dict): event content
        """
        if not params:
            params = {}

        if not self.ready:
            self.log.warning('Not ready yet, ignoring event %r args=%r',
                             event_id, params)
            return

        json_binding = self.servers['jsonrpc']

        def _send_notification(message):
            json_binding.reactor.server.send(
                message, config.get('addresses', 'event_queue'))

        try:
            notification = Notification(event_id, _send_notification,
                                        json_binding.bridge.event_schema)
            notification.emit(params)
            self.log.debug("Sending notification %s with params %s ",
                           event_id, params)
        except KeyError:
            self.log.warning("Attempt to send an event when jsonrpc binding"
                             " not available")
class Notification(object):
    """
    Represents jsonrpc notification message. It builds proper jsonrpc
    notification and pass it a callback which is responsible for
    sending it.
    """
    log = logging.getLogger("jsonrpc.Notification")

    def __init__(self, event_id, cb, event_schema):
        self._event_id = event_id
        self._cb = cb
        self._event_schema = event_schema

    def emit(self, params):
        """
        emit method, builds notification message and sends it.

        Args:
            params(dict): event content

        Returns: None
        """
        self._add_notify_time(params)
        self._event_schema.verify_event_params(self._event_id, params)
        notification = json.dumps({'jsonrpc': '2.0',
                                   'method': self._event_id,
                                   'params': params})

        self.log.debug("Sending event %s", notification)
        self._cb(notification)

    def _add_notify_time(self, body):
        body['notify_time'] = int(monotonic_time() * 1000)
{"jsonrpc": "2.0", "method": "|virt|VM_migration_status|<vmId>", "params": {<vmId>: {"progress": "0", "postcopy": "2"}}}
if (id == null || NullNode.class.isInstance(id)) {
      JsonRpcEvent event = JsonRpcEvent.fromJsonNode(node);
      String method = client.getHostname() + event.getMethod();
      event.setMethod(method);
      if (log.isDebugEnabled()) {
          log.debug("Event arrived from " + client.getHostname() + " containing " + event.getParams());
      }
      processNotifications(event);
      return;
}
public void process(JsonRpcEvent event) {
        Set<SubscriptionHolder> holders = matcher.match(event);
        holders.stream()
                .peek(holder -> holder.putEvent(event))
                .filter(holder -> holder.canProcess())
                .forEach(holder -> this.executorService.submit(new EventCallable(holder, this.decomposer)));
}
public Set<SubscriptionHolder> match(JsonRpcEvent event) {
        String[] ids = parse(event.getMethod());
        Set<SubscriptionHolder> subscriptions = new HashSet<>();
        SubscriptionHolder holder = this.unique_id.get(ids[3]);
        if (holder != null) {
            subscriptions.add(holder);
        }
        Predicate predicate = new Predicate() {

            @Override
            public boolean apply(int one, int two) {
                return one == two;
            }
        };
        addHolders(subscriptions, this.operation, 2, ids, predicate);
        addHolders(subscriptions, this.component, 1, ids, predicate);
        addHolders(subscriptions, this.receiver, 0, ids, predicate);
        return subscriptions;
}
public Void call() throws Exception {
            Subscriber<Map<String, Object>> subscriber = this.holder.getSubscriber();
            JsonRpcEvent event = null;
            while ((event = this.holder.canProcessMore()) != null) {
                Map<String, Object> map = this.decomposer.decompose(event);
                if (map.containsKey(JsonRpcEvent.ERROR_KEY)) {
                    subscriber.onError(new ClientConnectionException((String) map.get(JsonRpcEvent.ERROR_KEY)));
                } else {
                    subscriber.onNext(map);
                }
            }
            return null;
}
public Map<String, Object> decompose(JsonRpcEvent event) {
        try {
            return mapper.readValue(event.getParams(),
                    new TypeReference<Map<String, Object>>() {
                    });
        } catch (IOException e) {
            logException(log, "Event decomposition failed", e);
            return null;
        }
}
@Override
    public void onNext(Map<String, Object> map) {
        try {
            map.remove(VdsProperties.notify_time);
            map.entrySet().forEach(vmInfo -> {
                Guid vmId = new Guid(vmInfo.getKey());
                Map<?, ?> properties = (Map<?, ?>) vmInfo.getValue();
                int progress = Integer.valueOf(properties.get(VdsProperties.vm_migration_progress).toString());
                VmStatistics vmStatistics = resourceManager.getVmManager(vmId).getStatistics();
                vmStatistics.setMigrationProgressPercent(progress);
                resourceManager.getEventListener().migrationProgressReported(vmId, progress);
                Integer actualDowntime = (Integer) properties.get(VdsProperties.MIGRATION_DOWNTIME);
                if (actualDowntime != null) {
                    resourceManager.getEventListener().actualDowntimeReported(vmId, actualDowntime);
                }
            });
        } finally {
            subscription.request(1);
        }
}
响应流执行过程
上一篇 下一篇

猜你喜欢

热点阅读