Flutter throttle 和 debounce的实现

2021-08-11  本文已影响0人  Codepgq

RxDart中截出

新增文件,把下面代码导入即可

import 'dart:async';
import 'dart:collection';

/// The strategy that is used to determine how and when a new window is created.
enum WindowStrategy {
  /// cancels the open window (if any) and immediately opens a fresh one.
  everyEvent,

  /// waits until the current open window completes, then when the
  /// source [Stream] emits a next event, it opens a new window.
  eventAfterLastWindow,

  /// opens a recurring window right after the very first event on
  /// the source [Stream] is emitted.
  firstEventOnly,

  /// does not open any windows, rather all events are buffered and emitted
  /// whenever the handler triggers, after this trigger, the buffer is cleared.
  onHandler
}

class _BackpressureStreamSink<S, T> implements ForwardingSink<S, T> {
  final WindowStrategy _strategy;
  final Stream<dynamic> Function(S event) _windowStreamFactory;
  final T Function(S event) _onWindowStart;
  final T Function(List<S> queue) _onWindowEnd;
  final int _startBufferEvery;
  final bool Function(List<S> queue) _closeWindowWhen;
  final bool _ignoreEmptyWindows;
  final bool _dispatchOnClose;
  final Queue<S> queue = DoubleLinkedQueue<S>();
  final int maxLengthQueue;
  var skip = 0;
  var _hasData = false;
  var _mainClosed = false;
  StreamSubscription<dynamic> _windowSubscription;

  _BackpressureStreamSink(
      this._strategy,
      this._windowStreamFactory,
      this._onWindowStart,
      this._onWindowEnd,
      this._startBufferEvery,
      this._closeWindowWhen,
      this._ignoreEmptyWindows,
      this._dispatchOnClose,
      this.maxLengthQueue,
      );

  @override
  void add(EventSink<T> sink, S data) {
    _hasData = true;
    maybeCreateWindow(data, sink);

    if (skip == 0) {
      queue.add(data);

      if (maxLengthQueue != null && queue.length > maxLengthQueue) {
        queue.removeFirstElements(queue.length - maxLengthQueue);
      }
    }

    if (skip > 0) {
      skip--;
    }

    maybeCloseWindow(sink);
  }

  @override
  void addError(EventSink<T> sink, Object e, StackTrace st) =>
      sink.addError(e, st);

  @override
  void close(EventSink<T> sink) {
    _mainClosed = true;

    if (_strategy == WindowStrategy.eventAfterLastWindow) {
      return;
    }

    // treat the final event as a Window that opens
    // and immediately closes again
    if (_dispatchOnClose && queue.isNotEmpty) {
      resolveWindowStart(queue.last, sink);
    }

    resolveWindowEnd(sink, true);

    queue.clear();

    _windowSubscription?.cancel();
    sink.close();
  }

  @override
  FutureOr onCancel(EventSink<T> sink) => _windowSubscription?.cancel();

  @override
  void onListen(EventSink<T> sink) {}

  @override
  void onPause(EventSink<T> sink) => _windowSubscription?.pause();

  @override
  void onResume(EventSink<T> sink) => _windowSubscription?.resume();

  void maybeCreateWindow(S event, EventSink<T> sink) {
    switch (_strategy) {
    // for example throttle
      case WindowStrategy.eventAfterLastWindow:
        if (_windowSubscription != null) return;

        _windowSubscription = singleWindow(event, sink);

        resolveWindowStart(event, sink);

        break;
    // for example scan
      case WindowStrategy.firstEventOnly:
        if (_windowSubscription != null) return;

        _windowSubscription = multiWindow(event, sink);

        resolveWindowStart(event, sink);

        break;
    // for example debounce
      case WindowStrategy.everyEvent:
        _windowSubscription?.cancel();

        _windowSubscription = singleWindow(event, sink);

        resolveWindowStart(event, sink);

        break;
      case WindowStrategy.onHandler:
        break;
    }
  }

  void maybeCloseWindow(EventSink<T> sink) {
    if (_closeWindowWhen != null && _closeWindowWhen(unmodifiableQueue)) {
      resolveWindowEnd(sink);
    }
  }

  StreamSubscription<dynamic> singleWindow(S event, EventSink<T> sink) =>
      buildStream(event, sink).take(1).listen(
        null,
        onError: sink.addError,
        onDone: () => resolveWindowEnd(sink, _mainClosed),
      );

  // opens a new Window which is kept open until the main Stream
  // closes.
  StreamSubscription<dynamic> multiWindow(S event, EventSink<T> sink) =>
      buildStream(event, sink).listen(
            (dynamic _) => resolveWindowEnd(sink),
        onError: sink.addError,
        onDone: () => resolveWindowEnd(sink),
      );

  Stream<dynamic> buildStream(S event, EventSink<T> sink) {
    Stream stream;

    _windowSubscription?.cancel();

    stream = _windowStreamFactory(event);

    return stream;
  }

  void resolveWindowStart(S event, EventSink<T> sink) {
    if (_onWindowStart != null) {
      sink.add(_onWindowStart(event));
    }
  }

  void resolveWindowEnd(EventSink<T> sink, [bool isControllerClosing = false]) {
    if (isControllerClosing &&
        _strategy == WindowStrategy.eventAfterLastWindow) {
      if (_dispatchOnClose &&
          _hasData &&
          queue.length > 1 &&
          _onWindowEnd != null) {
        sink.add(_onWindowEnd(unmodifiableQueue));
      }

      queue.clear();
      _windowSubscription?.cancel();
      _windowSubscription = null;

      sink.close();
      return;
    }

    if (isControllerClosing ||
        _strategy == WindowStrategy.eventAfterLastWindow ||
        _strategy == WindowStrategy.everyEvent) {
      _windowSubscription?.cancel();
      _windowSubscription = null;
    }

    if (isControllerClosing && !_dispatchOnClose) {
      return;
    }

    if (_hasData && (queue.isNotEmpty || !_ignoreEmptyWindows)) {
      if (_onWindowEnd != null) {
        sink.add(_onWindowEnd(unmodifiableQueue));
      }

      // prepare the buffer for the next window.
      // by default, this is just a cleared buffer
      if (!isControllerClosing && _startBufferEvery > 0) {
        skip = _startBufferEvery > queue.length
            ? _startBufferEvery - queue.length
            : 0;

        // ...unless startBufferEvery is provided.
        // here we backtrack to the first event of the last buffer
        // and count forward using startBufferEvery until we reach
        // the next event.
        //
        // if the next event is found inside the current buffer,
        // then this event and any later events in the buffer
        // become the starting values of the next buffer.
        // if the next event is not yet available, then a skip
        // count is calculated.
        // this count will skip the next Future n-events.
        // when skip is reset to 0, then we start adding events
        // again into the new buffer.
        //
        // example:
        // startBufferEvery = 2
        // last buffer: [0, 1, 2, 3, 4]
        // 0 is the first event,
        // 2 is the n-th event
        // new buffer starts with [2, 3, 4]
        //
        // example:
        // startBufferEvery = 3
        // last buffer: [0, 1]
        // 0 is the first event,
        // the n-the event is not yet dispatched at this point
        // skip becomes 1
        // event 2 is skipped, skip becomes 0
        // event 3 is now added to the buffer
        if (_startBufferEvery < queue.length) {
          queue.removeFirstElements(_startBufferEvery);
        } else {
          queue.clear();
        }
      } else {
        queue.clear();
      }
    }
  }

  List<S> get unmodifiableQueue => List<S>.unmodifiable(queue);
}

/// A highly customizable [StreamTransformer] which can be configured
/// to serve any of the common rx backpressure operators.
///
/// The [StreamTransformer] works by creating windows, during which it
/// buffers events to a [Queue].
///
/// The [StreamTransformer] works by creating windows, during which it
/// buffers events to a [Queue]. It uses a  [WindowStrategy] to determine
/// how and when a new window is created.
///
/// onWindowStart and onWindowEnd are handlers that fire when a window
/// opens and closes, right before emitting the transformed event.
///
/// startBufferEvery allows to skip events coming from the source [Stream].
///
/// ignoreEmptyWindows can be set to true, to allow events to be emitted
/// at the end of a window, even if the current buffer is empty.
/// If the buffer is empty, then an empty [List] will be emitted.
/// If false, then nothing is emitted on an empty buffer.
///
/// dispatchOnClose will cause the remaining values in the buffer to be
/// emitted when the source [Stream] closes.
/// When false, the remaining buffer is discarded on close.
class BackpressureStreamTransformer<S, T> extends StreamTransformerBase<S, T> {
  /// Determines how the window is created
  final WindowStrategy strategy;

  /// Factory method used to create the [Stream] which will be buffered
  final Stream<dynamic> Function(S event) windowStreamFactory;

  /// Handler which fires when the window opens
  final T Function(S event) onWindowStart;

  /// Handler which fires when the window closes
  final T Function(List<S> queue) onWindowEnd;

  /// Maximum length of the buffer.
  /// Specify this value to avoid running out of memory when adding too many events to the buffer.
  /// If it's `null`, maximum length of the buffer is unlimited.
  final int maxLengthQueue;

  /// Used to skip an amount of events
  final int startBufferEvery;

  /// Predicate which determines when the current window should close
  final bool Function(List<S> queue) closeWindowWhen;

  /// Toggle to prevent, or allow windows that contain
  /// no events to be dispatched
  final bool ignoreEmptyWindows;

  /// Toggle to prevent, or allow the final set of events to be dispatched
  /// when the source [Stream] closes
  final bool dispatchOnClose;

  /// Constructs a [StreamTransformer] which buffers events emitted by the
  /// [Stream] that is created by [windowStreamFactory].
  ///
  /// Use the various optional parameters to precisely determine how and when
  /// this buffer should be created.
  ///
  /// For more info on the parameters, see [BackpressureStreamTransformer],
  /// or see the various back pressure [StreamTransformer]s for examples.
  BackpressureStreamTransformer(
      this.strategy,
      this.windowStreamFactory, {
        this.onWindowStart,
        this.onWindowEnd,
        this.startBufferEvery = 0,
        this.closeWindowWhen,
        this.ignoreEmptyWindows = true,
        this.dispatchOnClose = true,
        this.maxLengthQueue,
      });

  @override
  Stream<T> bind(Stream<S> stream) {
    final sink = _BackpressureStreamSink(
      strategy,
      windowStreamFactory,
      onWindowStart,
      onWindowEnd,
      startBufferEvery,
      closeWindowWhen,
      ignoreEmptyWindows,
      dispatchOnClose,
      maxLengthQueue,
    );
    return forwardStream(stream, sink);
  }
}

extension _RemoveFirstNQueueExtension<T> on Queue<T> {
  /// Removes the first [count] elements of this queue.
  void removeFirstElements(int count) {
    for (var i = 0; i < count; i++) {
      removeFirst();
    }
  }
}


class ThrottleStreamTransformer<T> extends BackpressureStreamTransformer<T, T> {
  /// Construct a [StreamTransformer] that emits a value from the source [Stream],
  /// then ignores subsequent source values while the window [Stream] is open,
  /// then repeats this process.
  ///
  /// If [leading] is true, then the first item in each window is emitted.
  /// If [trailing] is true, then the last item in each window is emitted.
  ThrottleStreamTransformer(
      Stream Function(T event) window, {
        bool trailing = false,
        bool leading = true,
      }) : super(
    WindowStrategy.eventAfterLastWindow,
    window,
    onWindowStart: leading ? (event) => event : null,
    onWindowEnd: trailing ? (queue) => queue.last : null,
    dispatchOnClose: trailing,
    maxLengthQueue: trailing ? 2 : 0,
  );
}

class TimerStream<T> extends Stream<T> {
  final StreamController<T> _controller;

  /// Constructs a [Stream] which emits [value] after the specified [Duration].
  TimerStream(T value, Duration duration)
      : _controller = _buildController(value, duration);

  @override
  StreamSubscription<T> listen(void Function(T event) onData,
      {Function onError, void Function() onDone, bool cancelOnError}) {
    return _controller.stream.listen(
      onData,
      onError: onError,
      onDone: onDone,
      cancelOnError: cancelOnError,
    );
  }

  static StreamController<T> _buildController<T>(T value, Duration duration) {
    final watch = Stopwatch();
    Timer timer;
     StreamController<T> controller;
    Duration totalElapsed = Duration.zero;

    void onResume() {
      // Already cancelled or is not paused.
      if (totalElapsed == null || timer != null) return;

      totalElapsed = totalElapsed + watch.elapsed;
      watch.start();

      timer = Timer(duration - totalElapsed, () {
        controller.add(value);
        controller.close();
      });
    }

    controller = StreamController(
      sync: true,
      onListen: () {
        watch.start();
        timer = Timer(duration, () {
          controller.add(value);
          controller.close();
        });
      },
      onPause: () {
        timer?.cancel();
        timer = null;
        watch.stop();
      },
      onResume: onResume,
      onCancel: () {
        timer?.cancel();
        timer = null;
        totalElapsed = null;
      },
    );
    return controller;
  }
}

abstract class ForwardingSink<T, R> {
  /// Handle data event
  void add(EventSink<R> sink, T data);

  /// Handle error event
  void addError(EventSink<R> sink, Object error, StackTrace st);

  /// Handle close event
  void close(EventSink<R> sink);

  /// Fires when a listener subscribes on the underlying [Stream].
  void onListen(EventSink<R> sink);

  /// Fires when a subscriber pauses.
  void onPause(EventSink<R> sink);

  /// Fires when a subscriber resumes after a pause.
  void onResume(EventSink<R> sink);

  /// Fires when a subscriber cancels.
  FutureOr onCancel(EventSink<R> sink);
}

/// @private
/// Helper method which forwards the events from an incoming [Stream]
/// to a new [StreamController].
/// It captures events such as onListen, onPause, onResume and onCancel,
/// which can be used in pair with a [ForwardingSink]
Stream<R> forwardStream<T, R>(
    Stream<T> stream, ForwardingSink<T, R> connectedSink) {
  ArgumentError.checkNotNull(stream, 'stream');
  ArgumentError.checkNotNull(connectedSink, 'connectedSink');

  StreamController<R> controller;
  StreamSubscription<T> subscription;

  @pragma('vm:prefer-inline')
  @pragma('dart2js:tryInline')
  void runCatching(void Function() block) {
    try {
      block();
    } catch (e, s) {
      connectedSink.addError(controller, e, s);
    }
  }

  final onListen = () {
    runCatching(() => connectedSink.onListen(controller));

    subscription = stream.listen(
          (data) => runCatching(() => connectedSink.add(controller, data)),
      onError: (Object e, StackTrace st) =>
          runCatching(() => connectedSink.addError(controller, e, st)),
      onDone: () => runCatching(() => connectedSink.close(controller)),
    );
  };

  final onCancel = () {
    final onCancelSelfFuture = subscription.cancel();
    final onCancelConnectedFuture = connectedSink.onCancel(controller);
    final futures = <Future>[
      if (onCancelSelfFuture is Future) onCancelSelfFuture,
      if (onCancelConnectedFuture is Future) onCancelConnectedFuture,
    ];
    return Future.wait<dynamic>(futures);
  };

  final onPause = () {
    subscription.pause();
    runCatching(() => connectedSink.onPause(controller));
  };

  final onResume = () {
    subscription.resume();
    runCatching(() => connectedSink.onResume(controller));
  };

  // Create a new Controller, which will serve as a trampoline for
  // forwarded events.
  if (stream.isBroadcast) {
    controller = StreamController<R>.broadcast(
      onListen: onListen,
      onCancel: onCancel,
      sync: true,
    );
  } else {
    controller = StreamController<R>(
      onListen: onListen,
      onPause: onPause,
      onResume: onResume,
      onCancel: onCancel,
      sync: true,
    );
  }

  return controller.stream;
}

class DebounceStreamTransformer<T> extends BackpressureStreamTransformer<T, T> {
  /// Constructs a [StreamTransformer] which buffers events into a [List] and
  /// emits this [List] whenever the current [window] fires.
  ///
  /// The [window] is reset whenever the [Stream] that is being transformed
  /// emits an event.
  DebounceStreamTransformer(Stream Function(T event) window)
      : super(
    WindowStrategy.everyEvent,
    window,
    onWindowEnd: (Iterable<T> queue) => queue.last,
    maxLengthQueue: 1,
  );
}

extension Throttle<T> on Stream<T> {
  Stream<T> throttle(Duration duration,{bool trailing = false, bool leading = true}) {
    return transform(ThrottleStreamTransformer<T>(
          (_) => TimerStream<bool>(true, duration),
      trailing: trailing,
      leading: leading,
    ),);
  }

  Stream<T> debounce(Duration duration) => transform(
      DebounceStreamTransformer<T>((_) => TimerStream<void>(null, duration)));
}

上一篇下一篇

猜你喜欢

热点阅读