Android进阶之路Android开发经验谈Android技术知识

Fresco-图片加载之线程切换与多路复用

2018-05-21  本文已影响20人  susion哒哒

接上节,看一下我们没有看的两个Producer: ThreadHandoffProducerBitmapMemoryCacheKeyMultiplexProducer

  /**
   * Bitmap cache get -> thread hand off -> multiplex -> bitmap cache
   * @param inputProducer producer providing the input to the bitmap cache
   * @return bitmap cache get to bitmap cache sequence
   */
  private Producer<CloseableReference<CloseableImage>> newBitmapCacheGetToBitmapCacheSequence(
      Producer<CloseableReference<CloseableImage>> inputProducer) {

    BitmapMemoryCacheProducer bitmapMemoryCacheProducer =
        mProducerFactory.newBitmapMemoryCacheProducer(inputProducer);

    BitmapMemoryCacheKeyMultiplexProducer bitmapKeyMultiplexProducer =
        mProducerFactory.newBitmapMemoryCacheKeyMultiplexProducer(bitmapMemoryCacheProducer);

    ThreadHandoffProducer<CloseableReference<CloseableImage>> threadHandoffProducer =
        mProducerFactory.newBackgroundThreadHandoffProducer(
            bitmapKeyMultiplexProducer,
            mThreadHandoffProducerQueue);

    return mProducerFactory.newBitmapMemoryCacheGetProducer(threadHandoffProducer);
  }

ThreadHandoffProducer

官方是这样解释它的功能的:Uses ExecutorService to move further computation to different threa。可以这样理解它的功能,它可以把接下来的 inputProducer要做的事情切换到另一个线程。它的核心实现类是:ThreadHandoffProducerQueue

public class ThreadHandoffProducerQueue {
  //...
  private final Deque<Runnable> mRunnableList;
  private final Executor mExecutor;
}

从成员变量大致可以看出,它内部利用队列维护一个Runnable的List,Executor执行这个队列中的runnable。

看一下ThreadHandoffProducerproduceResults

 @Override
  public void produceResults(final Consumer<T> consumer, final ProducerContext context) {
    final ProducerListener producerListener = context.getListener();
    final String requestId = context.getId();
    final StatefulProducerRunnable<T> statefulRunnable = new StatefulProducerRunnable<T>(
        consumer,
        producerListener,
        PRODUCER_NAME,
        requestId) {
      @Override
      protected void onSuccess(T ignored) {
        producerListener.onProducerFinishWithSuccess(requestId, PRODUCER_NAME, null);
        mInputProducer.produceResults(consumer, context);
      }
    //....
    };
    context.addCallbacks(
        new BaseProducerContextCallbacks() {
          @Override
          public void onCancellationRequested() {
            statefulRunnable.cancel();
            mThreadHandoffProducerQueue.remove(statefulRunnable);
          }
        });
    mThreadHandoffProducerQueue.addToQueueOrExecute(statefulRunnable);
  }

onSuccess()方法是在runnable被添加到Queue中后,运行在Runnable的run方法中的,因此是运行中指定的新线程中。

那么ThreadHandoffProducer在这里切换到哪个线程呢?其实决定线程切换的是ThreadHandoffProducerQueue,因此,只需要知道ThreadHandoffProducerQueue的Executor是什么就好了,追踪源码:

 public ImagePipelineFactory(ImagePipelineConfig config) {
    mConfig = Preconditions.checkNotNull(config);
    mThreadHandoffProducerQueue = new ThreadHandoffProducerQueue(
        config.getExecutorSupplier().forLightweightBackgroundTasks());
  }

mLightWeightBackgroundExecutor =
    Executors.newFixedThreadPool(
        NUM_LIGHTWEIGHT_BACKGROUND_THREADS,
        new PriorityThreadFactory(
            Process.THREAD_PRIORITY_BACKGROUND, "FrescoLightWeightBackgroundExecutor", true));

ThreadHandoffProducer的作用是把接下来的任务切换到后台线程。且都在一个后台线程中执行。NUM_LIGHTWEIGHT_BACKGROUND_THREADS的值为1

BitmapMemoryCacheKeyMultiplexProducer

这个类实际上继承自MultiplexProducer, 主要功能实现都在父类中,因此先主要看一下父类,它的主要作用是 : Producer for combining multiple identical requests into a single request.
大致意思就是,这个类会把所有相同的图片请求组合成一个单一的请求,如果这个请求完成,那么所有的图片请求都获得这个请求结果并返回。如果所有相同的图片请求都被取消,那么这个请求才算被取消。即多路复用

首先对于所有同一个图片请求的request是使用 Multiplexer来维护的。

Multiplexer

对于相同请求,是这样组织的:

private final K mKey;
private final CopyOnWriteArraySet<Pair<Consumer<T>, ProducerContext>> mConsumerContextPairs;

即mKey为这些同一个请求的key,使用一个Set来维护每个请求的ConsumerProducerContext

 public boolean addNewConsumer(final Consumer<T> consumer,final ProducerContext producerContext) {
      final Pair<Consumer<T>, ProducerContext> consumerContextPair = Pair.create(consumer, producerContext);
      //...
      synchronized (Multiplexer.this) {
        if (getExistingMultiplexer(mKey) != this) {
          return false;
        }
        mConsumerContextPairs.add(consumerContextPair);
        prefetchCallbacks = updateIsPrefetch();
        priorityCallbacks = updatePriority();
        //...
      }

      BaseProducerContext.callOnIsPrefetchChanged(prefetchCallbacks);
      BaseProducerContext.callOnPriorityChanged(priorityCallbacks);
      BaseProducerContext.callOnIsIntermediateResultExpectedChanged(intermediateResultsCallbacks);

      synchronized (consumerContextPair) {
        // check if last result changed in the mean time. In such case we should not propagate it
        synchronized (Multiplexer.this) {
          if (lastIntermediateResult != mLastIntermediateResult) {
            lastIntermediateResult = null;
          } else if (lastIntermediateResult != null) {
            lastIntermediateResult = cloneOrNull(lastIntermediateResult);
          }
        }

        if (lastIntermediateResult != null) {
          if (lastProgress > 0) {
            consumer.onProgressUpdate(lastProgress);
          }
          consumer.onNewResult(lastIntermediateResult, lastStatus);
          closeSafely(lastIntermediateResult);
        }
      }

      addCallbacks(consumerContextPair, producerContext);
      return true;
    }
  1. 对于新的请求,会利用该请求的ConsumerProducerContext新建一个Pair添加到 Multiplexer中。
  2. 更新相关callback。只要有一个请求需要 Prefetch or Priority, 那么就为这个请求设置上相关回调,保证任意一个请求拿到返回结果,都可以让需要这个回调的请求得到回调。
  3. 如果在添加callback的过程中获得了 lastIntermediateResult, 则立即对当前添加的 Pair<Consumer,ProducerContext>回调consumer.onNewResult(lastIntermediateResult, lastStatus)
    private void startInputProducerIfHasAttachedConsumers() {
      BaseProducerContext multiplexProducerContext;
      ForwardingConsumer forwardingConsumer;
      synchronized (Multiplexer.this) {
        Preconditions.checkArgument(mMultiplexProducerContext == null);
        Preconditions.checkArgument(mForwardingConsumer == null);

        // Cleanup if all consumers have been cancelled before this method was called
        if (mConsumerContextPairs.isEmpty()) {
          removeMultiplexer(mKey, this);
          return;
        }

        ProducerContext producerContext = mConsumerContextPairs.iterator().next().second;
        mMultiplexProducerContext = new BaseProducerContext(
            producerContext.getImageRequest(),
            producerContext.getId(),
            producerContext.getListener(),
            producerContext.getCallerContext(),
            producerContext.getLowestPermittedRequestLevel(),
            computeIsPrefetch(),
            computeIsIntermediateResultExpected(),
            computePriority());

        mForwardingConsumer = new ForwardingConsumer();
        multiplexProducerContext = mMultiplexProducerContext;
        forwardingConsumer = mForwardingConsumer;
      }
      mInputProducer.produceResults(
          forwardingConsumer,
          multiplexProducerContext);
    }

这个方法很简单: 取出 mConsumerContextPairs中的第一个Pair构建mMultiplexProducerContext。构建ForwardingConsumer,然后将后续处理传递给下一个 inputProducer。

好,到这里我们看完了Multiplexer的核心实现,下面看一下 Multiplexer是如何使用它的 produceResults

  @Override
  public void produceResults(Consumer<T> consumer, ProducerContext context) {
    K key = getKey(context); 
    Multiplexer multiplexer;
    boolean createdNewMultiplexer;
    // We do want to limit scope of this lock to guard only accesses to mMultiplexers map.
    // However what we would like to do here is to atomically lookup mMultiplexers, add new
    // consumer to consumers set associated with the map's entry and call consumer's callback with
    // last intermediate result. We should not do all of those things under this lock.
    do {
      createdNewMultiplexer = false;
      synchronized (this) {
        multiplexer = getExistingMultiplexer(key);
        if (multiplexer == null) {
          multiplexer = createAndPutNewMultiplexer(key);
          createdNewMultiplexer = true;
        }
      }
      // addNewConsumer may call consumer's onNewResult method immediately. For this reason
      // we release "this" lock. If multiplexer is removed from mMultiplexers in the meantime,
      // which is not very probable, then addNewConsumer will fail and we will be able to retry.
    } while (!multiplexer.addNewConsumer(consumer, context));

    if (createdNewMultiplexer) {
      multiplexer.startInputProducerIfHasAttachedConsumers();
    }
  }
  1. 调用实现类的getKey方法,即BitmapMemoryCacheKeyMultiplexProducer, 该类是根据一个请求的一系列参数,生成一个key。
  2. 根据这个可以确定相应的Multiplexer, 并调用 multiplexer.addNewConsumer(consumer, context)
  3. 如果是新创建的Multiplexer,则调用multiplexer.startInputProducerIfHasAttachedConsumers();保证后续流程可以继续。

对于 MultiplexerMultiplexProducer的工作原理可以使用下图总结:

image
上一篇下一篇

猜你喜欢

热点阅读