NSOperation源码浅析

2022-12-14  本文已影响0人  FingerStyle

NSOperation是苹果官方提供的多线程解决方案,相比GCD,最大的特点是支持任务间的依赖和优先级的调整,这里我们结合GNU Step的源码来看下别人是怎么实现的。
源码下载地址:https://github.com/gnustep/libs-base

1.如何实现operationA 对B的依赖?

首先把依赖的任务B插入A的dependencies 数组
判断当前任务和要依赖的任务的状态,如果A或B已结束,或者A被取消或在执行中 则跳过。

- (void) addDependency: (NSOperation *)op
{
  if (NO == [op isKindOfClass: [NSOperation class]])
    {
      [NSException raise: NSInvalidArgumentException
          format: @"[%@-%@] dependency is not an NSOperation",
    NSStringFromClass([self class]), NSStringFromSelector(_cmd)];
    }
  if (op == self)
    {
      [NSException raise: NSInvalidArgumentException
          format: @"[%@-%@] attempt to add dependency on self",
    NSStringFromClass([self class]), NSStringFromSelector(_cmd)];
    }
  [internal->lock lock];
  if (internal->dependencies == nil)
    {
      internal->dependencies = [[NSMutableArray alloc] initWithCapacity: 5];
    }
  NS_DURING
    {
      if (NSNotFound == [internal->dependencies indexOfObjectIdenticalTo: op])
    {
      [self willChangeValueForKey: @"dependencies"];
          [internal->dependencies addObject: op];
      /* We only need to watch for changes if it's possible for them to
       * happen and make a difference.
       */
      if (NO == [op isFinished]
        && NO == [self isCancelled]
        && NO == [self isExecuting]
        && NO == [self isFinished])
        {
          /* Can change readiness if we are neither cancelled nor
           * executing nor finished.  So we need to observe for the
           * finish of the dependency.
           */
          [op addObserver: self
           forKeyPath: @"isFinished"
              options: NSKeyValueObservingOptionNew
              context: isFinishedCtxt];
          if (internal->ready == YES)
        {
          /* The new dependency stops us being ready ...
           * change state.
           */
          [self willChangeValueForKey: @"isReady"];
          internal->ready = NO;
          [self didChangeValueForKey: @"isReady"];
        }
        }
      [self didChangeValueForKey: @"dependencies"];
    }
    }
  NS_HANDLER
    {
      [internal->lock unlock];
      NSLog(@"Problem adding dependency: %@", localException);
      return;
    }
  NS_ENDHANDLER
  [internal->lock unlock];
}

然后通过KVO添加对operation B 的isReady状态监听,当B的isReady发生变化时,修改A的isReady状态为YES。

- (void) observeValueForKeyPath: (NSString *)keyPath
               ofObject: (id)object
                         change: (NSDictionary *)change
                        context: (void *)context
{
  [internal->lock lock];

  /* We only observe isFinished changes, and we can remove self as an
   * observer once we know the operation has finished since it can never
   * become unfinished.
   */
  [object removeObserver: self forKeyPath: @"isFinished"];

  if (object == self)
    {
      /* We have finished and need to unlock the condition lock so that
       * any waiting thread can continue.
       */
      [internal->cond lock];
      [internal->cond unlockWithCondition: 1];
      [internal->lock unlock];
      return;
    }

  if (NO == internal->ready)
    {
      NSEnumerator  *en;
      NSOperation   *op;

      /* Some dependency has finished (or been removed) ...
       * so we need to check to see if we are now ready unless we know we are.
       * This is protected by locks so that an update due to an observed
       * change in one thread won't interrupt anything in another thread.
       */
      en = [internal->dependencies objectEnumerator];
      while ((op = [en nextObject]) != nil)
        {
          if (NO == [op isFinished])
        break;
        }
      if (op == nil)
    {
          [self willChangeValueForKey: @"isReady"];
      internal->ready = YES;
          [self didChangeValueForKey: @"isReady"];
    }
    }
  [internal->lock unlock];
}

同时operationQueue在add Operation的时候监听了operation的isReady状态,所以当前面A的isReady发生变化时,operationQueue也会得到通知

- (void) addOperation: (NSOperation *)op
{
  if (op == nil || NO == [op isKindOfClass: [NSOperation class]])
    {
      [NSException raise: NSInvalidArgumentException
          format: @"[%@-%@] object is not an NSOperation",
    NSStringFromClass([self class]), NSStringFromSelector(_cmd)];
    }
  [internal->lock lock];
  if (NSNotFound == [internal->operations indexOfObjectIdenticalTo: op]
    && NO == [op isFinished])
    {
      [op addObserver: self
       forKeyPath: @"isReady"
          options: NSKeyValueObservingOptionNew
          context: isReadyCtxt];
      [self willChangeValueForKey: @"operations"];
      [self willChangeValueForKey: @"operationCount"];
      [internal->operations addObject: op];
      [self didChangeValueForKey: @"operationCount"];
      [self didChangeValueForKey: @"operations"];
      if (YES == [op isReady])
    {
      [self observeValueForKeyPath: @"isReady"
                  ofObject: op
                change: nil
                   context: isReadyCtxt];
    }
    }
  [internal->lock unlock];
}

在operationQueue的KVO回调方法里面,会移除isReady的监听,添加一个对operationA 优先级变化的监听,然后找到一个适合operationA位置并插入到waiting数组里面,然后执行_execute方法。

//NSOperationQueue (Private)
- (void) observeValueForKeyPath: (NSString *)keyPath
               ofObject: (id)object
                         change: (NSDictionary *)change
                        context: (void *)context
{
  /* We observe three properties in sequence ...
   * isReady (while we wait for an operation to be ready)
   * queuePriority (when priority of a ready operation may change)
   * isFinished (to see if an executing operation is over).
   */
  if (context == isFinishedCtxt)
    {
      [internal->lock lock];
      internal->executing--;
      [object removeObserver: self forKeyPath: @"isFinished"];
      [internal->lock unlock];
      [self willChangeValueForKey: @"operations"];
      [self willChangeValueForKey: @"operationCount"];
      [internal->lock lock];
      [internal->operations removeObjectIdenticalTo: object];
      [internal->lock unlock];
      [self didChangeValueForKey: @"operationCount"];
      [self didChangeValueForKey: @"operations"];
    }
  else if (context == queuePriorityCtxt || context == isReadyCtxt)
    {
      NSInteger pos;

      [internal->lock lock];
      if (context == queuePriorityCtxt)
        {
          [internal->waiting removeObjectIdenticalTo: object];
        }
      if (context == isReadyCtxt)
        {
          [object removeObserver: self forKeyPath: @"isReady"];
          [object addObserver: self
                   forKeyPath: @"queuePriority"
                      options: NSKeyValueObservingOptionNew
                      context: queuePriorityCtxt];
        }
      pos = [internal->waiting insertionPosition: object
                                   usingFunction: sortFunc
                                         context: 0];
      [internal->waiting insertObject: object atIndex: pos];
      [internal->lock unlock];
    }
  [self _execute];
}

2.放进operationQueue的任务当isReady状态改变后是立刻执行吗?
不一定,参考_execute方法:
首先进入一个while循环,判断operationQuque是否为suspend状态,是否超过当前最大并发数,以及waiting数组的数量是否<=0,如果都不是,则拿出Waiting 数组里面的第一个任务,同时移除对该任务优先级的监听,添加对该任务isFinished的监听,增加executing的计数。 然后,看要执行的任务是否支持并发,如果支持,则调用该任务的start方法开始执行,如果不支持,则先把当前任务加到starting数组内,然后看当前正在使用的线程数是否大于线程池内的线程数量(默认是8),如果不超过,则开启一个新线程,并执行_thread方法。

//NSOperationQueue (Private)
/* Check for operations which can be executed and start them.
 */
- (void) _execute
{
  NSInteger max;

  [internal->lock lock];

  max = [self maxConcurrentOperationCount];
  if (NSOperationQueueDefaultMaxConcurrentOperationCount == max)
    {
      max = maxConcurrent;
    }

  NS_DURING
  while (NO == [self isSuspended]
    && max > internal->executing
    && [internal->waiting count] > 0)
    {
      NSOperation   *op;

      /* Take the first operation from the queue and start it executing.
       * We set ourselves up as an observer for the operating finishing
       * and we keep track of the count of operations we have started,
       * but the actual startup is left to the NSOperation -start method.
       */
      op = [internal->waiting objectAtIndex: 0];
      [internal->waiting removeObjectAtIndex: 0];
      [op removeObserver: self forKeyPath: @"queuePriority"];
      [op addObserver: self
       forKeyPath: @"isFinished"
          options: NSKeyValueObservingOptionNew
          context: isFinishedCtxt];
      internal->executing++;
      if (YES == [op isConcurrent])
    {
          [op start];
    }
      else
    {
      NSUInteger    pending;

      [internal->cond lock];
      pending = [internal->starting count];
      [internal->starting addObject: op];

      /* Create a new thread if all existing threads are busy and
       * we haven't reached the pool limit.
       */
      if (0 == internal->threadCount
        || (pending > 0 && internal->threadCount < POOL))
        {
          internal->threadCount++;
          NS_DURING
        {
          [NSThread detachNewThreadSelector: @selector(_thread)
                       toTarget: self
                     withObject: nil];
        }
          NS_HANDLER
        {
          NSLog(@"Failed to create thread for %@: %@",
            self, localException);
        }
          NS_ENDHANDLER
        }
      /* Tell the thread pool that there is an operation to start.
       */
      [internal->cond unlockWithCondition: 1];
    }
    }
  NS_HANDLER
    {
      [internal->lock unlock];
      [localException raise];
    }
  NS_ENDHANDLER
  [internal->lock unlock];
}

_thread方法:
首先会创建一个自动释放池(因为开启了一个新线程),然后进入一个无限循环,在循环体里面会遍历starting数组,每次循环取出第一个任务,设置好他的线程优先级然后执行,执行完后调用该任务的_finish 方法修改执行状态。最后再销毁自动释放池。这个循环的退出条件是循环开始执行后5秒内没有要执行的任务(starting数组为空),内部是通过一个条件锁来控制这个时间的。

//NSOperationQueue (Private)
- (void) _thread
{
  CREATE_AUTORELEASE_POOL(arp);

  [[[NSThread currentThread] threadDictionary] setObject: self
                                                  forKey: threadKey];
  for (;;)
    {
      NSOperation   *op;
      NSDate        *when;
      BOOL      found;
      /* We use a pool for each operation in case releasing the operation
       * causes it to be deallocated, and the deallocation of the operation
       * autoreleases something which needs to be cleaned up.
       */
      RECREATE_AUTORELEASE_POOL(arp);

      when = [[NSDate alloc] initWithTimeIntervalSinceNow: 5.0];
      found = [internal->cond lockWhenCondition: 1 beforeDate: when];
      RELEASE(when);
      if (NO == found)
    {
      break;    // Idle for 5 seconds ... exit thread.
    }

      if ([internal->starting count] > 0)
    {
          op = RETAIN([internal->starting objectAtIndex: 0]);
      [internal->starting removeObjectAtIndex: 0];
    }
      else
    {
      op = nil;
    }

      if ([internal->starting count] > 0)
    {
      // Signal any other idle threads,
          [internal->cond unlockWithCondition: 1];
    }
      else
    {
      // There are no more operations starting.
          [internal->cond unlockWithCondition: 0];
    }

      if (nil != op)
    {
          NS_DURING
        {
          ENTER_POOL
              [NSThread setThreadPriority: [op threadPriority]];
              [op start];
          LEAVE_POOL
        }
          NS_HANDLER
        {
          NSLog(@"Problem running operation %@ ... %@",
        op, localException);
        }
          NS_ENDHANDLER
      [op _finish];
          RELEASE(op);
    }
    }

  [[[NSThread currentThread] threadDictionary] removeObjectForKey: threadKey];
  [internal->lock lock];
  internal->threadCount--;
  [internal->lock unlock];
  DESTROY(arp);
  [NSThread exit];
}

由此可见,如果任务支持并发,并且operationQueue没有suspend的话,会马上执行该任务。否则会看是否有可用的线程,如果有会在新开启的线程立刻执行,如果没有,则暂存在starting数组中等待执行,当任意一个线程执行完当前任务就会接着执行这个排队的任务。

查阅源码的时候我们会发现到处都有 internal 这个变量,这个变量实际就是self, 只是为了方便内存管理,封装成了一个类,我们直接当成self理解就可以。

上一篇下一篇

猜你喜欢

热点阅读