开源

streetscape.gl学习笔记(四)

2020-05-20  本文已影响0人  questionuncle

这篇笔记想写很久了,但是作者最近人生大事不少,耽搁了一阵子。本篇不续接上篇streetscape.gl学习笔记(三)
内容,上篇未完内容将在原篇上进行更新。本篇主要介绍streetscape.gl Loaders。
为何突然转到该主题上,皆因作者最近倒腾的一个小案例。使用React+TypeScript,搭建一个捕鱼船舶看板。这是一个非常典型的物联网展示应用,自己以前也的项目重构记中也介绍了如何通过Kafka+WebSocket实现后台消息实时推送到前端展示。于是作者就想看看streetscape.gl其作为车辆信号展示看板它是如何实现的。
首先从我们最常见的场景,实时显示小车定位的XVIZLiveLoader看起。官网文档是这么讲的:

XVIZLiveLoader

Connects to a live XVIZ system using a WebSocket. Implements XVIZLoaderInterface.
XVIZLiveLoader是通过WebSocket来连接一个实时传输XVIZ协议数据的类,其继承自XVIZLoaderInterface.
A live XVIZ system describes a running system does not have a start and end time available in the metadata and will send XVIZ data immediately upon connection.
一个实时的XVIZ系统描述了一个正在运行的系统,其元数据中没有可以获得的起止时间,并且一旦连接成功就会发送XVIZ数据。
The XVIZLiveLoader will also immediately update the scene to the timestamp of the latest received XVIZ message.
当接收到最新的XVIZ消息时,XVIZLiveLoader也将立即更新场景。

Constructor

import {XVIZLiveLoader} from 'streetscape.gl';

new XVIZLiveLoader({
  serverConfig: {
    serverUrl: 'ws://localhost:8081'
  }
});
Options

serverConfig (Object)
    serverConfig.serverUrl (String) - url of the WebSocket server
    serverConfig.queryParams (Object, optional) - additional query parameters to use when connecting to the server
    serverConfig.defaultLogLength (Number, optional) - fallback value if the duration option is not specified.
    serverConfig.retryAttempts (Number, optional) - number of retries if a connection error is encountered. Default 3.
logProfile (String, optional) - Name of the profile to load the log with
bufferLength (Number, optional) - the length of the buffer to keep in memory. Uses the same unit as timestamp. If specified, older frames may be discarded during playback, to avoid crashes due to excessive memory usage. Default 30 seconds.
worker (String|Boolean, optional) - Use a worker for message processing. Default true.
Type Boolean: enable/disable default worker
Type String: the worker URL to use
maxConcurrency (Number, optional) - the maximum number of worker threads to spawn. Default 3.

如何使用

在examples/get-started中看看
app.js中

// __IS_STREAMING__ and __IS_LIVE__ are defined in webpack.config.js
const exampleLog = require(__IS_STREAMING__
  ? './log-from-stream'
  : __IS_LIVE__
    ? './log-from-live'
    : './log-from-file').default;

通过webpack.config.js中的IS_STREAMINGIS_LIVE配置来判断该次使用的Loader,这里我们看看log-from-live。
log-from-live.js

import {XVIZLiveLoader} from 'streetscape.gl';

export default new XVIZLiveLoader({
  logGuid: 'mock',
  bufferLength: 10,
  serverConfig: {
    defaultLogLength: 30,
    serverUrl: 'ws://localhost:8081'
  },
  worker: true,
  maxConcurrency: 4
});

从这个示例中可以看出XVIZLiveLoader如何使用,需要配置一些参数。
logGuid:该数据源的guid号
bufferLength:在内存存放多长buffer的数据,和时间戳有相同的时间单位。如果指定的话,当回放时旧帧将会被抛弃以避免浪费内存。默认值是30s。
serverConfig.defaultLogLength:当duration参数没被指定时,设定回退的值。
serverConfig.serverUrl:WebSocket server的url地址
worker:是否使用worker线程来进行消息处理
maxConcurrency:最大worker线程数


exampleLog获得一个XVIZLiveLoader类型的实例后,如何使用的呢。在app.js中我们可以看到

class Example extends PureComponent {
  state = {
    log: exampleLog,
    settings: {
      viewMode: 'PERSPECTIVE',
      showTooltip: false
    }
  };

  componentDidMount() {
    this.state.log.on('error', console.error).connect();
  }

 ...
  render() {
    const {log, settings} = this.state;

    return (
      <div id="container">
        <div id="control-panel">
          <XVIZPanel log={log} name="Metrics" />
          <hr />
          <XVIZPanel log={log} name="Camera" />
          <hr />
          <Form
            data={APP_SETTINGS}
            values={this.state.settings}
            onChange={this._onSettingsChange}
          />
          <StreamSettingsPanel log={log} />
        </div>
    ...
    </div>
  )}
}

它被传入state中,当组件挂载完成时,log开始尝试连接并监听error消息。
随后,在render中又作为属性传递给其他组件。那其他组件拿到log后做了些什么呢。我们可以看一下核心组件LogViewer是怎么对待log的。
在modules/core/src/components/log-viewer/index.js中我们可以看到

const getLogState = log => ({
  frame: log.getCurrentFrame(),
  metadata: log.getMetadata(),
  streamsMetadata: log.getStreamsMetadata()
});

export default connectToLog({getLogState, Component: LogViewer});

其从log中获取当前帧、元数据及数据流的元数据,并将其与组件进行绑定,生成一个高阶组件。

如何实现

看完如何使用,我们不禁想问如何实现的。我们从获取当前帧开始,在这之前我们要了解新建一个XVIZLiveLoader实例、组件挂载后实现connect连接做了什么

XVIZLiveLoader实例

new XVIZLiveLoader({
  logGuid: 'mock',
  bufferLength: 10,
  serverConfig: {
    defaultLogLength: 30,
    serverUrl: 'ws://localhost:8081'
  },
  worker: true,
  maxConcurrency: 4
});

定位到xviz-live-loader.js文件中

/*
 * Handle connecting to XVIZ socket and negotiation of the XVIZ protocol version
 *
 * This loader is used when connecting to a "live" XVIZ websocket.
 * This implies that the metadata does not have a start or end time
 * and that we want to display the latest message as soon as it arrives.
 */
export default class XVIZLiveLoader extends XVIZWebsocketLoader {
  /**
   * constructor
   * @params serverConfig {object}
   *   - serverConfig.serverUrl {string}
   *   - serverConfig.defaultLogLength {number, optional} - default 30
   *   - serverConfig.queryParams {object, optional}
   *   - serverConfig.retryAttempts {number, optional} - default 3
   * @params worker {string|function, optional}
   * @params maxConcurrency {number, optional} - default 3
   * @params logProfile {string, optional}
   * @params bufferLength {number, optional}
   */
  constructor(options = {}) {
    super(options);

    // Construct websocket connection details from parameters
    this.requestParams = getSocketRequestParams(options);
    assert(this.requestParams.bufferLength, 'bufferLength must be provided');

    this.retrySettings = {
      retries: this.requestParams.retryAttempts,
      minTimeout: 500,
      randomize: true
    };

    // Setup relative stream buffer storage by splitting bufferLength 1/3 : 2/3
    const bufferChunk = this.requestParams.bufferLength / 3;

    // Replace base class object
    this.streamBuffer = new XVIZStreamBuffer({
      startOffset: -2 * bufferChunk,
      endOffset: bufferChunk
    });
  }
...
}

可以看到XVIZLiveLoader继承自XVIZWebsocketLoader

XVIZStreamBuffer

The XVIZStreamBuffer class manages loaded XVIZ timeslices in memory for easy access.

Constructor

const streamBuffer = new XVIZStreamBuffer();

Parameters:

There are three types of buffer: unlimited, offset, and fixed. Use the constructor options to set
an offset buffer (relative to playhead). To set a fixed buffer with absolute timestamps, see
setFixedBuffer.

可以看到XVIZStreamBuffer是用以加载XVIZ数据时间片到内存中以方便访问,而这里的startOffset、endOffset用以控制在内存中驻留的时间片长度。
再定位到xviz-websocket-loader.js看看XVIZWebsocketLoader的定义

/**
 * Connect to XVIZ 2 websocket manage storage of XVIZ data into a XVIZStreamBuffer
 *
 * This class is a Websocket base class and is expected to be subclassed with
 * the following methods overridden:
 *
 * - _onOpen()
 */
export default class XVIZWebsocketLoader extends XVIZLoaderInterface {
  /**
   * constructor
   * @params serverConfig {object}
   *   - serverConfig.serverUrl {string}
   *   - serverConfig.defaultLogLength {number, optional} - default 30
   *   - serverConfig.queryParams {object, optional}
   *   - serverConfig.retryAttempts {number, optional} - default 3
   * @params worker {string|function, optional}
   * @params maxConcurrency {number, optional} - default 3
   * @params debug {function} - Debug callback for the XVIZ parser.
   * @params logGuid {string}
   * @params logProfile {string, optional}
   * @params duration {number, optional}
   * @params timestamp {number, optional}
   * @params bufferLength {number, optional}
   */
  constructor(options = {}) {
    super(options);

    this.socket = null;

    this.retrySettings = {
      retries: 3,
      minTimeout: 500,
      randomize: true
    };

    this.streamBuffer = new XVIZStreamBuffer();

    // Handler object for the websocket events
    // Note: needs to be last due to member dependencies
    this.WebSocketClass = options.WebSocketClass || WebSocket;
  }
...
}

可以看到XVIZWebsocketLoader 继承自XVIZLoaderInterface

export default class XVIZLoaderInterface {
  constructor(options = {}) {
    this.options = options;
    this._debug = options.debug || (() => {});
    this.callbacks = {};

    this.listeners = [];
    this.state = {};
    this._updates = 0;
    this._version = 0;
    this._updateTimer = null;
  }
...
}

这个构造函数相对简单,主要是初始化一些参数,如:事件回调、监听数组、数据版本等

connect

在组件挂载函数中,调用了connect和on函数

  componentDidMount() {
    this.state.log.on('error', console.error).connect();
  }

这里我们定位到xviz-websocket-loader.js中connect函数定义

  /**
   * Open an XVIZ socket connection with automatic retry
   *
   * @returns {Promise} WebSocket connection
   */
  connect() {
    assert(this.socket === null, 'Socket Manager still connected');

    this._debug('stream_start');
    const {url} = this.requestParams;

    // Wrap retry logic around connection
    return PromiseRetry(retry => {
      return new Promise((resolve, reject) => {
        try {
          const ws = new this.WebSocketClass(url);
          ws.binaryType = 'arraybuffer';

          ws.onmessage = message => {
            const hasMetadata = Boolean(this.getMetadata());

            return parseStreamMessage({
              message: message.data,
              onResult: this.onXVIZMessage,
              onError: this.onError,
              debug: this._debug.bind(this, 'parse_message'),
              worker: hasMetadata && this.options.worker,
              maxConcurrency: this.options.maxConcurrency
            });
          };

          ws.onerror = this.onError;
          ws.onclose = event => {
            this._onWSClose(event);
            reject(event);
          };

          // On success, resolve the promise with the now ready socket
          ws.onopen = () => {
            this.socket = ws;
            this._onWSOpen();
            resolve(ws);
          };
        } catch (err) {
          reject(err);
        }
      }).catch(event => {
        this._onWSError(event);
        const isAbnormalClosure = event.code > 1000 && event.code !== 1005;

        // Retry if abnormal or connection never established
        if (isAbnormalClosure || !this.socket) {
          retry();
        }
      });
    }, this.retrySettings).catch(this._onWSError);
  }

函数头上的解释是:自动尝试打开一个XVIZ socket连接,返回Promise类型的WebSocket连接
其中用到promise-retry这个包,该包的介绍是:
Retries a function that returns a promise, leveraging the power of the retry module to the promises world.
这是一个非常好的实践。因为物联网应用会偶发网络断联,通过设置一定容差的连接尝试,实现续联。
其中WebSocket的使用可以参考WebSocket

          ws.onmessage = message => {
            const hasMetadata = Boolean(this.getMetadata());

            return parseStreamMessage({
              message: message.data,
              onResult: this.onXVIZMessage,
              onError: this.onError,
              debug: this._debug.bind(this, 'parse_message'),
              worker: hasMetadata && this.options.worker,
              maxConcurrency: this.options.maxConcurrency
            });
          };

websocket当收到消息时,首先判断是否有元数据。然后调用parseStreamMessage来处理StreamMessage。
其定义如下:
parseStreamMessage will parse the data and handle GLB encoded
XVIZ as well as other formats of the data.

XVIZ parsing functions will decode the binary container, parse the JSON and resolve binary
references. The application will get a "patched" JSON structure, with the difference from the basic
JSON protocol format being that certain arrays will be compact typed arrays instead of classic
JavaScript arrays.

If an attribute has been hydrated from binary then it will be transformed into the corresponding
TypeArray. Typed arrays do not support nesting so all numbers will be laid out flat and the
application needs to know how many values represent one element, for instance 3 values represent the
x, y, z coordinates of a point.

parseXVIZMessage

import {parseXVIZMessage, XVIZ_MESSAGE} from '@xviz/parser';

parseXVIZMessage({
  message,
  onResult: data => {
    switch (data.type) {
      case XVIZ_MESSAGE.METADATA: // do something
      case XVIZ_MESSAGE.TIMESLICE: // do something
      case XVIZ_MESSAGE.INCOMPLETE: // do something
    }
  },
  onError: console.error,
  worker: true
  maxConcurrency: 4
});

Parameters:

XVIZ_MESSAGE

Enum of stream message types.

  onXVIZMessage = message => {
    switch (message.type) {
      case LOG_STREAM_MESSAGE.METADATA:
        this._onXVIZMetadata(message);
        this.emit('ready', message);
        break;

      case LOG_STREAM_MESSAGE.TIMESLICE:
        this._onXVIZTimeslice(message);
        this.emit('update', message);
        break;

      case LOG_STREAM_MESSAGE.DONE:
        this.emit('finish', message);
        break;

      default:
        this.emit('error', message);
    }
  };

当解析出的消息类型是METADATA则调用内部函数_onXVIZMetadata,并发出ready消息

  _onXVIZMetadata(metadata) {
    this.set('metadata', metadata);
    if (metadata.streams && Object.keys(metadata.streams).length > 0) {
      this.set('streamSettings', metadata.streams);
    }

    if (!this.streamBuffer) {
      throw new Error('streamBuffer is missing');
    }
    this.logSynchronizer = this.logSynchronizer || new StreamSynchronizer(this.streamBuffer);

    const timestamp = this.get('timestamp');
    const newTimestamp = Number.isFinite(timestamp) ? timestamp : metadata.start_time;
    if (Number.isFinite(newTimestamp)) {
      this.seek(newTimestamp);
    }
  }

这里主要设置一些参数,并定义了一个logSynchronizer其为StreamSynchronizer的一个实例。用以获取实际的stream数据。

StreamSynchronizer

The StreamSynchronizer class looks into a XVIZStreamBuffer and retrieves the most relevant datum from each stream that "matches" the current timestamp.

当解析出的消息类型是TIMESLICE则调用内部函数_onXVIZTimeslice,并发出update消息

  _onXVIZTimeslice(timeslice) {
    const oldStreamCount = this.streamBuffer.streamCount;
    const bufferUpdated = this.streamBuffer.insert(timeslice);
    if (bufferUpdated) {
      this._bumpDataVersion();
    }

    if (getXVIZConfig().DYNAMIC_STREAM_METADATA && this.streamBuffer.streamCount > oldStreamCount) {
      const streamsMetadata = {};
      const streamSettings = this.get('streamSettings');

      for (const streamName in timeslice.streams) {
        streamsMetadata[streamName] = timeslice.streams[streamName].__metadata;

        // Add new stream name to stream settings (default on)
        if (!(streamName in streamSettings)) {
          streamSettings[streamName] = true;
        }
      }
      this.set('streamsMetadata', streamsMetadata);
    }

    return bufferUpdated;
  }

其主要作用是将timeslice加入到当前的streamBuffer中,并当timeslice有新的stream且其中的metadata是DYNAMIC_STREAM_METADATA时在streamsMetadata添加该metadata,并更新streamsMetadata

自此一个XVIZLiveLoader实例化完毕并实现connect连接

getCurrentFrame

接下来看看如何获取当前帧的,getCurrentFrame的源码可以定位到xviz-loader-interface.js文件

  getCurrentFrame = createSelector(
    this,
    [this.getStreamSettings, this.getCurrentTime, this.getLookAhead, this._getDataVersion],
    // `dataVersion` is only needed to trigger recomputation.
    // The logSynchronizer has access to the timeslices.
    (streamSettings, timestamp, lookAhead) => {
      const {logSynchronizer} = this;
      if (logSynchronizer && Number.isFinite(timestamp)) {
        logSynchronizer.setTime(timestamp);
        logSynchronizer.setLookAheadTimeOffset(lookAhead);
        return logSynchronizer.getCurrentFrame(streamSettings);
      }
      return null;
    }
  );

这里有个很好的实践——createSelector,其在/modules/src/utils/create-selector.js中定义

import {createSelector} from 'reselect';

// reselect selectors do not update if called with the same arguments
// to support calling them without arguments, pass logLoader version
export default function createLogSelector(logLoader, ...args) {
  const selector = createSelector(...args);
  return () => selector(logLoader._version);
}

这里使用了一个reselect的库,

Reselect

Simple “selector” library for Redux (and others) inspired by getters in NuclearJS, subscriptions in re-frame and this proposal from speedskater.

上一篇下一篇

猜你喜欢

热点阅读