RAC高阶操作

2019-04-09  本文已影响0人  boy丿log

doNext、doError、doCompleted

- (RACSignal *)doNext:(void (^)(id x))block {
    NSCParameterAssert(block != NULL);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        return [self subscribeNext:^(id x) {
            block(x);
            [subscriber sendNext:x];
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            [subscriber sendCompleted];
        }];
    }] setNameWithFormat:@"[%@] -doNext:", self.name];
}

在发送信号前做点什么😈.

throttle:和throttle:valuesPassingTest:

- (RACSignal<ValueType> *)throttle:(NSTimeInterval)interval valuesPassingTest:(BOOL (^)(id _Nullable next))predicate 
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

        // We may never use this scheduler, but we need to set it up ahead of
        // time so that our scheduled blocks are run serially if we do.
        RACScheduler *scheduler = [RACScheduler scheduler];

        // Information about any currently-buffered `next` event.
        __block id nextValue = nil;
        __block BOOL hasNextValue = NO;
        RACSerialDisposable *nextDisposable = [[RACSerialDisposable alloc] init];

首先,创建一个nextValue对象用来存上次的值,接下来用来保存上次发送的值和一个状态

void (^flushNext)(BOOL send) = ^(BOOL send) {
            @synchronized (compoundDisposable) {
                [nextDisposable.disposable dispose];

                if (!hasNextValue) return;
                if (send) [subscriber sendNext:nextValue];

                nextValue = nil;
                hasNextValue = NO;
            }
        };

这个block用来根据一个状态来确定是否执行一个发送消息的任务。并将nextVaule和hasNextValue置空。

RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
            BOOL shouldThrottle = predicate(x);

            @synchronized (compoundDisposable) {
                flushNext(NO);
                if (!shouldThrottle) {
                    [subscriber sendNext:x];
                    return;
                }

                nextValue = x;
                hasNextValue = YES;
                nextDisposable.disposable = [delayScheduler afterDelay:interval schedule:^{
                    flushNext(YES);
                }];
            }

开辟一个线程,如果接收到的值满足跳过,则进入延时,如果立马收到值,则取消上次的延时,进入这次判断,如果在延时时间到达之前没有接受新的消息,则发送这个消息。

总结一下,就是当接收到一个要跳过的请求时,需要一个时间段,在这个时间段有新的消息发送过来,就取消上个流程,如果不跳过,取消上个流程,立马转发,如果时间段内没有新的消息传进来就用新的消息

- (RACSignal *)delay:(NSTimeInterval)interval

所有的信号都是延时

return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

        // We may never use this scheduler, but we need to set it up ahead of
        // time so that our scheduled blocks are run serially if we do.
        RACScheduler *scheduler = [RACScheduler scheduler];

        void (^schedule)(dispatch_block_t) = ^(dispatch_block_t block) {
            RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
            RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block];
            [disposable addDisposable:schedulerDisposable];
        };

        RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
            schedule(^{
                [subscriber sendNext:x];
            });
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            schedule(^{
                [subscriber sendCompleted];
            });
        }];

        [disposable addDisposable:subscriptionDisposable];
        return disposable;
    }] setNameWithFormat:@"[%@] -delay: %f", self.name, (double)interval];

repeat, 当发送完成的时候重新订阅,虽然失败信号也调用了这个递归,但是因为发送信号被销毁了,所以就不会再接收到消息了

初始化一个信号,并调用subscribeForever函数传入被订阅信号,及回调,注意error方法,将取消订阅


- (RACSignal *)repeat {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        return subscribeForever(self,
            ^(id x) {
                [subscriber sendNext:x];
            },
            ^(NSError *error, RACDisposable *disposable) {
                [disposable dispose];
                [subscriber sendError:error];
            },
            ^(RACDisposable *disposable) {
                // Resubscribe.
            });
    }] setNameWithFormat:@"[%@] -repeat", self.name];
}

下面看下subscribeForever函数:

static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) {
    next = [next copy];
    error = [error copy];
    completed = [completed copy];

    RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
//定义一个block,bock有一个参数也是一个block
    RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) {
        RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
        [compoundDisposable addDisposable:selfDisposable];

        __weak RACDisposable *weakSelfDisposable = selfDisposable;

        RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) {
            @autoreleasepool {
                error(e, compoundDisposable);
                [compoundDisposable removeDisposable:weakSelfDisposable];
            }
          //回调参数中的block
            recurse();
        } completed:^{
            @autoreleasepool {
                completed(compoundDisposable);
                [compoundDisposable removeDisposable:weakSelfDisposable];
            }
          //回调参数中的block
            recurse();
        }];

        [selfDisposable addDisposable:subscriptionDisposable];
    };

    // Subscribe once immediately, and then use recursive scheduling for any
    // further resubscriptions.
    //调用block,并设置参数block
    recursiveBlock(^{
        RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];
        //参数block中调用设置block
        RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock];
        [compoundDisposable addDisposable:schedulingDisposable];
    });

    return compoundDisposable;
}

从这个函数中我们可以看到,是一个递归调用,在发送信号的发送失败或完成请求的时候递归调用,但是在上面的函数中我们知道在发送失败的时候对原信号进行了销毁操作

- (RACSignal *)catch:(RACSignal * (^)(NSError *error))catchBlock;

捕获异常.

- (RACSignal *)catch:(RACSignal * (^)(NSError *error))catchBlock {
    NSCParameterAssert(catchBlock != NULL);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACSerialDisposable *catchDisposable = [[RACSerialDisposable alloc] init];

        //添加一个订阅
        RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
            
            [subscriber sendNext:x];
        } error:^(NSError *error) {
        //在收到异常的时候,传入一个catch信号用来转换
            RACSignal *signal = catchBlock(error);
            NSCAssert(signal != nil, @"Expected non-nil signal from catch block on %@", self);
            //调用冷信号的方法
            catchDisposable.disposable = [signal subscribe:subscriber];
        } completed:^{
            [subscriber sendCompleted];
        }];

        return [RACDisposable disposableWithBlock:^{
            [catchDisposable dispose];
            [subscriptionDisposable dispose];
        }];
    }] setNameWithFormat:@"[%@] -catch:", self.name];
}

方法在原信号流sendError的时候,将对应的NSError通过catchBlock转换成了一个新的信号流并让原订阅者继续订阅。

catchTo:

上一方法的简化

+try: / - try:/-tryMap:

/**
 根据一个eroor创建一个signal对象
 */
+ (void)tryBlock;

/**
 用到了faltten操作,如果返回的值或者errorPtr满足YES,则继续,否则过滤
 */
- (void)try:(BOOL (^)(id _Nullable value, NSError **errorPtr))tryBlock ;

/**
 同上,但是可以转换值
 */
- (void)tryMap:(id (^)(id _Nullable value, NSError **errorPtr))mapBlock ;

+ (RACSignal *)try:(id (^)(NSError **errorPtr))tryBlock {
    NSCParameterAssert(tryBlock != NULL);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        NSError *error;
        id value = tryBlock(&error);
        RACSignal *signal = (value == nil ? [RACSignal error:error] : [RACSignal return:value]);
        return [signal subscribe:subscriber];
    }] setNameWithFormat:@"+try:"];
}

- (RACSignal *)try:(BOOL (^)(id value, NSError **errorPtr))tryBlock {
    NSCParameterAssert(tryBlock != NULL);

    return [[self flattenMap:^(id value) {
        NSError *error = nil;
        BOOL passed = tryBlock(value, &error);
        return (passed ? [RACSignal return:value] : [RACSignal error:error]);
    }] setNameWithFormat:@"[%@] -try:", self.name];
}

- (RACSignal *)tryMap:(id (^)(id value, NSError **errorPtr))mapBlock {
    NSCParameterAssert(mapBlock != NULL);

    return [[self flattenMap:^(id value) {
        NSError *error = nil;
        id mappedValue = mapBlock(value, &error);
        return (mappedValue == nil ? [RACSignal error:error] : [RACSignal return:mappedValue]);
    }] setNameWithFormat:@"[%@] -tryMap:", self.name];
}

defer,延迟操作,需要先订阅才能执行完block,因为这个block是在订阅的时候执行的

+ (RACSignal *)defer:(RACSignal<id> * (^)(void))block {
    NSCParameterAssert(block != NULL);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        return [block() subscribe:subscriber];
    }] setNameWithFormat:@"+defer:"];
}

initially,

**
 在添加订阅者之前先执行一个block
 */
 - (RACSignal *)initially:(void (^)(void))block {
    NSCParameterAssert(block != NULL);

    return [[RACSignal defer:^{
        block();
        return self;
    }] setNameWithFormat:@"[%@] -initially:", self.name];
}

finally,在结束的时候执行一个block

- (RACSignal *)finally:(void (^)(void))block {
    NSCParameterAssert(block != NULL);

    return [[[self
        doError:^(NSError *error) {
            block();
        }]
        doCompleted:^{
            block();
        }]
        setNameWithFormat:@"[%@] -finally:", self.name];
}

bufferWithTime: onScheduler:,统计某个分时段的数据发送给订阅者

先来分析下逻辑:

RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init];
        NSMutableArray *values = [NSMutableArray array];

        void (^flushValues)() = ^{
            @synchronized (values) {
                [timerDisposable.disposable dispose];

                if (values.count == 0) return;

                RACTuple *tuple = [RACTuple tupleWithObjectsFromArray:values];
                [values removeAllObjects];
                [subscriber sendNext:tuple];
            }
        };
  1. 创建一个数组。
  2. 创建一个block,在这个block中如果values的数量是0,直接返回,否则根据数组创建一个RACTuple,然后移除所有数组,将这个元组对象发送给订阅者
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
            @synchronized (values) {
                if (values.count == 0) {
                    timerDisposable.disposable = [scheduler afterDelay:interval schedule:flushValues];
                }

                [values addObject:x ?: RACTupleNil.tupleNil];
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            flushValues();
            [subscriber sendCompleted];
        }];

        return [RACDisposable disposableWithBlock:^{
            [selfDisposable dispose];
            [timerDisposable dispose];
        }];

订阅当前信号:
1.接收到错误的时候,直接回调错误,在完成时调用上面的block。
2.接受到信号的时候,判断数组是不是个数为0,如果是0,则time秒后执行block,然后将现在的数据存到数组里面。

由此可见,这个方法是用来分时段发送信号给订阅者的。

takeLast,保存最近发送的N次消息,在完成后发送给订阅者

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
        return [self subscribeNext:^(id x) {
            [valuesTaken addObject:x ? : RACTupleNil.tupleNil];

            while (valuesTaken.count > count) {
                [valuesTaken removeObjectAtIndex:0];
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            for (id value in valuesTaken) {
                [subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value];
            }

            [subscriber sendCompleted];
        }];
    }] setNameWithFormat:@"[%@] -takeLast: %lu", self.name, (unsigned long)count];

保存最近发送的N次消息,在完成后发送给订阅者

combineLatestWith,用当前最后一个信号来合并后面的信号发出的值

        //定义变量
        __block id lastSelfValue = nil;
        __block BOOL selfCompleted = NO;

        __block id lastOtherValue = nil;
        __block BOOL otherCompleted = NO;
        //定义输出方法,传递的是一个元组
        void (^sendNext)(void) = ^{
            @synchronized (disposable) {
                if (lastSelfValue == nil || lastOtherValue == nil) return;
                [subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)];
            }
        };
        
        //订阅自己设置值
        RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
            @synchronized (disposable) {
                lastSelfValue = x ?: RACTupleNil.tupleNil;
                sendNext();
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            @synchronized (disposable) {
                selfCompleted = YES;
                if (otherCompleted) [subscriber sendCompleted];
            }
        }];


        [disposable addDisposable:selfDisposable];

RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
            @synchronized (disposable) {
                lastOtherValue = x ?: RACTupleNil.tupleNil;
                sendNext();
            }
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            @synchronized (disposable) {
                otherCompleted = YES;
                if (selfCompleted) [subscriber sendCompleted];
            }
        }];

        [disposable addDisposable:otherDisposable];

combineLatest:reduce

+ (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals reduce:(RACGenericReduceBlock)reduceBlock {
    NSCParameterAssert(reduceBlock != nil);

    RACSignal *result = [self combineLatest:signals];

    // Although we assert this condition above, older versions of this method
    // supported this argument being nil. Avoid crashing Release builds of
    // apps that depended on that.
    if (reduceBlock != nil) result = [result reduceEach:reduceBlock];

    return [result setNameWithFormat:@"+combineLatest: %@ reduce:", signals];
}

可以使用其中的几个

merge,合并信号,一个信号一个信号发送

NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
    for (RACSignal *signal in signals) {
        [copiedSignals addObject:signal];
    }

    return [[[RACSignal
        createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
            for (RACSignal *signal in copiedSignals) {
                [subscriber sendNext:signal];
            }

            [subscriber sendCompleted];
            return nil;
        }]
        flatten]
        setNameWithFormat:@"+merge: %@", copiedSignals];
RACMerge.png

- (RACSignal *)flatten:(NSUInteger)maxConcurrent

这个要求必须是同步的。

__block BOOL selfCompleted = NO;

        // Subscribes to the given signal.
        __block void (^subscribeToSignal)(RACSignal *);

        // Weak reference to the above, to avoid a leak.
        __weak __block void (^recur)(RACSignal *);

        // Sends completed to the subscriber if all signals are finished.
        //
        // This should only be used while synchronized on `subscriber`.

//当完成而且有效的销毁任务数量为0时,发送完成信号
        void (^completeIfAllowed)(void) = ^{
            if (selfCompleted && activeDisposables.count == 0) {
                [subscriber sendCompleted];
            }
        };

上面的方法定义了一些基础的参数。

recur = subscribeToSignal = ^(RACSignal *signal) {
            RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];

            @synchronized (subscriber) {
                [compoundDisposable addDisposable:serialDisposable];
                [activeDisposables addObject:serialDisposable];
            }

            serialDisposable.disposable = [signal subscribeNext:^(id x) {
                [subscriber sendNext:x];
            } error:^(NSError *error) {
                [subscriber sendError:error];
            } completed:^{
                __strong void (^subscribeToSignal)(RACSignal *) = recur;
                RACSignal *nextSignal;

                @synchronized (subscriber) {
                    [compoundDisposable removeDisposable:serialDisposable];
                    [activeDisposables removeObjectIdenticalTo:serialDisposable];

                    if (queuedSignals.count == 0) {
                        completeIfAllowed();
                        return;
                    }

                    nextSignal = queuedSignals[0];
                    [queuedSignals removeObjectAtIndex:0];
                }

                subscribeToSignal(nextSignal);
            }];
        };

这段代码定义了一个闭包,需要传入一个参数signal,然后在创建的信号在里面订阅这个传入的信号,在完成的时候,进行销毁任务。取出数组的第一个信号,进行重复调用。

[compoundDisposable addDisposable:[self subscribeNext:^(RACSignal *signal) {
            if (signal == nil) return;

            NSCAssert([signal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", signal);

            @synchronized (subscriber) {
                if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) {
                    [queuedSignals addObject:signal];

                    // If we need to wait, skip subscribing to this
                    // signal.
                    return;
                }
            }

            subscribeToSignal(signal);
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            @synchronized (subscriber) {
                selfCompleted = YES;
                completeIfAllowed();
            }
        }]];

        [compoundDisposable addDisposable:[RACDisposable disposableWithBlock:^{
            // A strong reference is held to `subscribeToSignal` until we're
            // done, preventing it from deallocating early.
            subscribeToSignal = nil;
        }]];

        return compoundDisposable;

订阅当前信号并销毁。

这个信号的作用是只接受固定数量的信号。原信号发送的必须是当前的信号。

ignoreValues,只接受错误信号和完成信号

then,等原信号执行完后,执行block返回的信号

- (RACSignal *)then:(RACSignal * (^)(void))block {
    NSCParameterAssert(block != nil);

    return [[[self
        ignoreValues]
        concat:[RACSignal defer:block]]
        setNameWithFormat:@"[%@] -then:", self.name];
}

aggregateWithStart, 计算总和,用到了scanWithStart,必须调用sendcompleted

- (RACSignal *)aggregateWithStart:(id)start reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
    return [[[[self
        scanWithStart:start reduceWithIndex:reduceBlock]
        startWith:start]
        takeLast:1]
        setNameWithFormat:@"[%@] -aggregateWithStart: %@ reduceWithIndex:", self.name, RACDescription(start)];
}

setKeyPath:(NSString *)keyPath onObject:(NSObject *)object nilValue:(id)nilValue

RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
        // Possibly spec, possibly compiler bug, but this __bridge cast does not
        // result in a retain here, effectively an invisible __unsafe_unretained
        // qualifier. Using objc_precise_lifetime gives the __strong reference
        // desired. The explicit use of __strong is strictly defensive.
        __strong NSObject *object __attribute__((objc_precise_lifetime)) = (__bridge __strong id)objectPtr;
        [object setValue:x ?: nilValue forKeyPath:keyPath];
    } error:^(NSError *error) {
        __strong NSObject *object __attribute__((objc_precise_lifetime)) = (__bridge __strong id)objectPtr;

        NSCAssert(NO, @"Received error from %@ in binding for key path \"%@\" on %@: %@", self, keyPath, object, error);

        // Log the error if we're running with assertions disabled.
        NSLog(@"Received error from %@ in binding for key path \"%@\" on %@: %@", self, keyPath, object, error);

        [disposable dispose];
    } completed:^{
        [disposable dispose];
    }];

订阅当前信号,订阅者接收信号的时候把keyPath通过KVC设置一下

takeUntil:(RACSignal *)signalTrigger,当signaltrigger发送一个信号的时候取消订阅

- (RACSignal *)takeUntil:(RACSignal *)signalTrigger {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
        void (^triggerCompletion)(void) = ^{
            [disposable dispose];
            [subscriber sendCompleted];
        };

        RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) {
            triggerCompletion();
        } completed:^{
            triggerCompletion();
        }];

        [disposable addDisposable:triggerDisposable];

        if (!disposable.disposed) {
            RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
                [subscriber sendNext:x];
            } error:^(NSError *error) {
                [subscriber sendError:error];
            } completed:^{
                [disposable dispose];
                [subscriber sendCompleted];
            }];

            [disposable addDisposable:selfDisposable];
        }

        return disposable;
    }] setNameWithFormat:@"[%@] -takeUntil: %@", self.name, signalTrigger];
}

takeUntilReplacement,替换信号

- (RACSignal *)takeUntilReplacement:(RACSignal *)replacement {
    return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];

        RACDisposable *replacementDisposable = [replacement subscribeNext:^(id x) {
            [selfDisposable dispose];
            [subscriber sendNext:x];
        } error:^(NSError *error) {
            [selfDisposable dispose];
            [subscriber sendError:error];
        } completed:^{
            [selfDisposable dispose];
            [subscriber sendCompleted];
        }];

        if (!selfDisposable.disposed) {
            selfDisposable.disposable = [[self
                concat:[RACSignal never]]
                subscribe:subscriber];
        }

        return [RACDisposable disposableWithBlock:^{
            [selfDisposable dispose];
            [replacementDisposable dispose];
        }];
    }];
}

原信号concat:了一个[RACSignal never]信号,这样原信号就一直不会disposed,会一直等待replacement信号的到来。
控制selfDisposable是否被dispose,控制权来自于入参的replacement信号,一旦replacement信号sendNext,那么原信号就会取消订阅,接下来的事情就会交给replacement信号了。
变换后的新信号sendNext,sendError,sendCompleted全部都由replacement信号来发送,最终新信号完成的时刻也是replacement信号完成的时刻。

switchToLatest

- (RACSignal *)switchToLatest {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
    //转换为热信号。
        RACMulticastConnection *connection = [self publish];
//返回值变换
        RACDisposable *subscriptionDisposable = [[connection.signal
            flattenMap:^(RACSignal *x) {
                NSCAssert(x == nil || [x isKindOfClass:RACSignal.class], @"-switchToLatest requires that the source signal (%@) send signals. Instead we got: %@", self, x);

                // -concat:[RACSignal never] prevents completion of the receiver from
                // prematurely terminating the inner signal.
                //contat是为了防止信号销毁,takeUntil是为了接受新信号的时候取消就信号的订阅
                return [x takeUntil:[connection.signal concat:[RACSignal never]]];
            }]
            //订阅最新的信号
            subscribe:subscriber];

        RACDisposable *connectionDisposable = [connection connect];
        return [RACDisposable disposableWithBlock:^{
            [subscriptionDisposable dispose];
            [connectionDisposable dispose];
        }];
    }] setNameWithFormat:@"[%@] -switchToLatest", self.name];
}

总结下来,就是返回的是最新收到的信号。这个信号发送的是一个信号

switch:(RACSignal *)signal cases:(NSDictionary *)cases default:(RACSignal *)defaultSignal


    for (id key in cases) {
        id value __attribute__((unused)) = cases[key];
        NSCAssert([value isKindOfClass:RACSignal.class], @"Expected all cases to be RACSignals, %@ isn't", value);
    }

可以发现字典里面是一个RACSignal信号。


[signal
        map:^(id key) {
            if (key == nil) key = RACTupleNil.tupleNil;

            RACSignal *signal = copy[key] ?: defaultSignal;
            if (signal == nil) {
                NSString *description = [NSString stringWithFormat:NSLocalizedString(@"No matching signal found for value %@", @""), key];
                return [RACSignal error:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorNoMatchingCase userInfo:@{ NSLocalizedDescriptionKey: description }]];
            }

            return signal;
        }]

中转信号是根据订阅到的信号到字典中去获取的,如果获取不到则用默认的信号。

return [[[signal
        map:^(id key) {
            if (key == nil) key = RACTupleNil.tupleNil;

            RACSignal *signal = copy[key] ?: defaultSignal;
            if (signal == nil) {
                NSString *description = [NSString stringWithFormat:NSLocalizedString(@"No matching signal found for value %@", @""), key];
                return [RACSignal error:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorNoMatchingCase userInfo:@{ NSLocalizedDescriptionKey: description }]];
            }

            return signal;
        }]
        switchToLatest]

使用最新收到的信号。

+ (RACSignal *)if:(RACSignal *)boolSignal then:(RACSignal *)trueSignal else:(RACSignal *)falseSignal

+ (RACSignal *)if:(RACSignal *)boolSignal then:(RACSignal *)trueSignal else:(RACSignal *)falseSignal {
    NSCParameterAssert(boolSignal != nil);
    NSCParameterAssert(trueSignal != nil);
    NSCParameterAssert(falseSignal != nil);

    return [[[boolSignal
        map:^(NSNumber *value) {
            NSCAssert([value isKindOfClass:NSNumber.class], @"Expected %@ to send BOOLs, not %@", boolSignal, value);

            return (value.boolValue ? trueSignal : falseSignal);
        }]
        switchToLatest]
        setNameWithFormat:@"+if: %@ then: %@ else: %@", boolSignal, trueSignal, falseSignal];
}

如果收到一个信号是YES则用true信号,反之用false信号,且取订阅最新的信号。

firstOrDefault,获取订阅的第一个消息

- (id)firstOrDefault:(id)defaultValue success:(BOOL *)success error:(NSError **)error {
    NSCondition *condition = [[NSCondition alloc] init];
    condition.name = [NSString stringWithFormat:@"[%@] -firstOrDefault: %@ success:error:", self.name, defaultValue];

    __block id value = defaultValue;
    __block BOOL done = NO;

    // Ensures that we don't pass values across thread boundaries by reference.
    __block NSError *localError;
    __block BOOL localSuccess;

    [[self take:1] subscribeNext:^(id x) {
        [condition lock];

        value = x;
        localSuccess = YES;

        done = YES;
        [condition broadcast];
        [condition unlock];
    } error:^(NSError *e) {
        [condition lock];

        if (!done) {
            localSuccess = NO;
            localError = e;

            done = YES;
            [condition broadcast];
        }

        [condition unlock];
    } completed:^{
        [condition lock];

        localSuccess = YES;

        done = YES;
        [condition broadcast];
        [condition unlock];
    }];

    [condition lock];
    while (!done) {
        [condition wait];
    }

    if (success != NULL) *success = localSuccess;
    if (error != NULL) *error = localError;

    [condition unlock];
    return value;
}

waitUntilCompleted,忽略所有值直到成功

- (BOOL)waitUntilCompleted:(NSError **)error {
    BOOL success = NO;

    [[[self
        ignoreValues]
        setNameWithFormat:@"[%@] -waitUntilCompleted:", self.name]
        firstOrDefault:nil success:&success error:error];

    return success;
}

collect,将所有发过的消息集中到一个数组中

- (RACSignal *)collect {
    return [[self aggregateWithStartFactory:^{
        return [[NSMutableArray alloc] init];
    } reduce:^(NSMutableArray *collectedValues, id x) {
        [collectedValues addObject:(x ?: NSNull.null)];
        return collectedValues;
    }] setNameWithFormat:@"[%@] -collect", self.name];
}

toArray,返回collect的数组

- (NSArray *)toArray {
    return [[[self collect] first] copy];
}

sequence,订阅信号,然后通过递归创建

- (RACSequence *)sequence {
    return [[RACSignalSequence sequenceWithSignal:self] setNameWithFormat:@"[%@] -sequence", self.name];
}

timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler,过多少秒之后发送一个失败的信号

- (RACSignal *)timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
    NSCParameterAssert(scheduler != nil);
    NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

        RACDisposable *timeoutDisposable = [scheduler afterDelay:interval schedule:^{
            [disposable dispose];
            [subscriber sendError:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorTimedOut userInfo:nil]];
        }];

        [disposable addDisposable:timeoutDisposable];

        RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
            [subscriber sendNext:x];
        } error:^(NSError *error) {
            [disposable dispose];
            [subscriber sendError:error];
        } completed:^{
            [disposable dispose];
            [subscriber sendCompleted];
        }];

        [disposable addDisposable:subscriptionDisposable];
        return disposable;
    }] setNameWithFormat:@"[%@] -timeout: %f onScheduler: %@", self.name, (double)interval, scheduler];
}

deliverOn,接收信号在某个线程上

- (RACSignal *)deliverOn:(RACScheduler *)scheduler {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        return [self subscribeNext:^(id x) {
            [scheduler schedule:^{
                [subscriber sendNext:x];
            }];
        } error:^(NSError *error) {
            [scheduler schedule:^{
                [subscriber sendError:error];
            }];
        } completed:^{
            [scheduler schedule:^{
                [subscriber sendCompleted];
            }];
        }];
    }] setNameWithFormat:@"[%@] -deliverOn: %@", self.name, scheduler];
}

subscribeOn,在某个线程上发送信号

- (RACSignal *)subscribeOn:(RACScheduler *)scheduler {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

        RACDisposable *schedulingDisposable = [scheduler schedule:^{
            RACDisposable *subscriptionDisposable = [self subscribe:subscriber];

            [disposable addDisposable:subscriptionDisposable];
        }];

        [disposable addDisposable:schedulingDisposable];
        return disposable;
    }] setNameWithFormat:@"[%@] -subscribeOn: %@", self.name, scheduler];
}

deliverOnMainThread,在主线程上接受信号

- (RACSignal *)deliverOnMainThread {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        __block volatile int32_t queueLength = 0;
        
        void (^performOnMainThread)(dispatch_block_t) = ^(dispatch_block_t block) {
            int32_t queued = OSAtomicIncrement32(&queueLength);
            if (NSThread.isMainThread && queued == 1) {
                block();
                OSAtomicDecrement32(&queueLength);
            } else {
                dispatch_async(dispatch_get_main_queue(), ^{
                    block();
                    OSAtomicDecrement32(&queueLength);
                });
            }
        };

        return [self subscribeNext:^(id x) {
            performOnMainThread(^{
                [subscriber sendNext:x];
            });
        } error:^(NSError *error) {
            performOnMainThread(^{
                [subscriber sendError:error];
            });
        } completed:^{
            performOnMainThread(^{
                [subscriber sendCompleted];
            });
        }];
    }] setNameWithFormat:@"[%@] -deliverOnMainThread", self.name];
}

- (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock transform:(id (^)(id object))transformBlock

- (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock transform:(id (^)(id object))transformBlock {
    NSCParameterAssert(keyBlock != NULL);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        NSMutableDictionary *groups = [NSMutableDictionary dictionary];
        NSMutableArray *orderedGroups = [NSMutableArray array];

        return [self subscribeNext:^(id x) {
            id<NSCopying> key = keyBlock(x);
            RACGroupedSignal *groupSubject = nil;
            @synchronized(groups) {
                groupSubject = groups[key];
                if (groupSubject == nil) {
                    groupSubject = [RACGroupedSignal signalWithKey:key];
                    groups[key] = groupSubject;
                    [orderedGroups addObject:groupSubject];
                    [subscriber sendNext:groupSubject];
                }
            }

            [groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x];
        } error:^(NSError *error) {
            [subscriber sendError:error];

            [orderedGroups makeObjectsPerformSelector:@selector(sendError:) withObject:error];
        } completed:^{
            [subscriber sendCompleted];

            [orderedGroups makeObjectsPerformSelector:@selector(sendCompleted)];
        }];
    }] setNameWithFormat:@"[%@] -groupBy:transform:", self.name];
}

分组

- (RACSignal *)any:(BOOL (^)(id object))predicateBlock

materialize, 将当前信号转发为RACEvent

- (RACSignal *)materialize {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        return [self subscribeNext:^(id x) {
            [subscriber sendNext:[RACEvent eventWithValue:x]];
        } error:^(NSError *error) {
            [subscriber sendNext:[RACEvent eventWithError:error]];
            [subscriber sendCompleted];
        } completed:^{
            [subscriber sendNext:RACEvent.completedEvent];
            [subscriber sendCompleted];
        }];
    }] setNameWithFormat:@"[%@] -materialize", self.name];
}

dematerialize,将订阅的event信号转为普通信号

- (RACSignal *)dematerialize {
    return [[self bind:^{
        return ^(RACEvent *event, BOOL *stop) {
            switch (event.eventType) {
                case RACEventTypeCompleted:
                    *stop = YES;
                    return [RACSignal empty];

                case RACEventTypeError:
                    *stop = YES;
                    return [RACSignal error:event.error];

                case RACEventTypeNext:
                    return [RACSignal return:event.value];
            }
        };
    }] setNameWithFormat:@"[%@] -dematerialize", self.name];
}

any:

判断是否包含一个可以使block返回为1的值

- (RACSignal *)any:(BOOL (^)(id object))predicateBlock {
    NSCParameterAssert(predicateBlock != NULL);

    return [[[self materialize] bind:^{
        return ^(RACEvent *event, BOOL *stop) {
            if (event.finished) {
                *stop = YES;
                return [RACSignal return:@NO];
            }

            if (predicateBlock(event.value)) {
                *stop = YES;
                return [RACSignal return:@YES];
            }

            return [RACSignal empty];
        };
    }] setNameWithFormat:@"[%@] -any:", self.name];
}

any

含有任意值

All,信号sendComplete则为YES

- (RACSignal *)all:(BOOL (^)(id object))predicateBlock {
    NSCParameterAssert(predicateBlock != NULL);

    return [[[self materialize] bind:^{
        return ^(RACEvent *event, BOOL *stop) {
            if (event.eventType == RACEventTypeCompleted) {
                *stop = YES;
                return [RACSignal return:@YES];
            }

            if (event.eventType == RACEventTypeError || !predicateBlock(event.value)) {
                *stop = YES;
                return [RACSignal return:@NO];
            }

            return [RACSignal empty];
        };
    }] setNameWithFormat:@"[%@] -all:", self.name];
}

- (RACSignal *)retry:(NSInteger)retryCount

如果发送失败,则重新请求N次后还是失败,则返回失败

- (RACSignal *)retry:(NSInteger)retryCount {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        __block NSInteger currentRetryCount = 0;
        return subscribeForever(self,
            ^(id x) {
                [subscriber sendNext:x];
            },
            ^(NSError *error, RACDisposable *disposable) {
                if (retryCount == 0 || currentRetryCount < retryCount) {
                    // Resubscribe.
                    currentRetryCount++;
                    return;
                }

                [disposable dispose];
                [subscriber sendError:error];
            },
            ^(RACDisposable *disposable) {
                [disposable dispose];
                [subscriber sendCompleted];
            });
    }] setNameWithFormat:@"[%@] -retry: %lu", self.name, (unsigned long)retryCount];

- (RACSignal *)sample:(RACSignal *)sampler,将前面信号发送的消息取最新的,在传入sampler的信号发送消息时发送。

- (RACSignal *)sample:(RACSignal *)sampler {
    NSCParameterAssert(sampler != nil);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        NSLock *lock = [[NSLock alloc] init];
        __block id lastValue;
        __block BOOL hasValue = NO;

        RACSerialDisposable *samplerDisposable = [[RACSerialDisposable alloc] init];
        RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
            [lock lock];
            hasValue = YES;
            lastValue = x;
            [lock unlock];
        } error:^(NSError *error) {
            [samplerDisposable dispose];
            [subscriber sendError:error];
        } completed:^{
            [samplerDisposable dispose];
            [subscriber sendCompleted];
        }];

        samplerDisposable.disposable = [sampler subscribeNext:^(id _) {
            BOOL shouldSend = NO;
            id value;
            [lock lock];
            shouldSend = hasValue;
            value = lastValue;
            [lock unlock];

            if (shouldSend) {
                [subscriber sendNext:value];
            }
        } error:^(NSError *error) {
            [sourceDisposable dispose];
            [subscriber sendError:error];
        } completed:^{
            [sourceDisposable dispose];
            [subscriber sendCompleted];
        }];

        return [RACDisposable disposableWithBlock:^{
            [samplerDisposable dispose];
            [sourceDisposable dispose];
        }];
    }] setNameWithFormat:@"[%@] -sample: %@", self.name, sampler];
}

not,发送的必须是一个NSNumbe类型的信号,返回的是bool值

- (RACSignal *)not {
    
    return [[self map:^(NSNumber *value) {
        NSCAssert([value isKindOfClass:NSNumber.class], @"-not must only be used on a signal of NSNumbers. Instead, got: %@", value);

        return @(!value.boolValue);
    }] setNameWithFormat:@"[%@] -not", self.name];
}

and,发送的必须是RACTuple对象,如果所有元素都大于0,则返回1否则返回0

- (RACSignal *)and {
    return [[self map:^(RACTuple *tuple) {
        NSCAssert([tuple isKindOfClass:RACTuple.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
        NSCAssert(tuple.count > 0, @"-and must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");

        return @([tuple.rac_sequence all:^(NSNumber *number) {
            NSCAssert([number isKindOfClass:NSNumber.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);

            return number.boolValue;
        }]);
    }] setNameWithFormat:@"[%@] -and", self.name];
}

or,发送的必须是RACTuple对象,如果有一个元素大于0,则返回1否则返回0

- (RACSignal *)or {
    return [[self map:^(RACTuple *tuple) {
        NSCAssert([tuple isKindOfClass:RACTuple.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
        NSCAssert(tuple.count > 0, @"-or must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");

        return @([tuple.rac_sequence any:^(NSNumber *number) {
            NSCAssert([number isKindOfClass:NSNumber.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);

            return number.boolValue;
        }]);
    }] setNameWithFormat:@"[%@] -or", self.name];
}

reduceApply,传入一个元组,第一个元素是一个block,block可以获取的参数是元组的个数-1,block返回一个id对象

- (RACSignal *)reduceApply {
    return [[self map:^(RACTuple *tuple) {
        NSCAssert([tuple isKindOfClass:RACTuple.class], @"-reduceApply must only be used on a signal of RACTuples. Instead, received: %@", tuple);
        NSCAssert(tuple.count > 1, @"-reduceApply must only be used on a signal of RACTuples, with at least a block in tuple[0] and its first argument in tuple[1]");

        // We can't use -array, because we need to preserve RACTupleNil
        NSMutableArray *tupleArray = [NSMutableArray arrayWithCapacity:tuple.count];
        for (id val in tuple) {
            [tupleArray addObject:val];
        }
        RACTuple *arguments = [RACTuple tupleWithObjectsFromArray:[tupleArray subarrayWithRange:NSMakeRange(1, tupleArray.count - 1)]];

        return [RACBlockTrampoline invokeBlock:tuple[0] withArguments:arguments];
    }] setNameWithFormat:@"[%@] -reduceApply", self.name];
}
上一篇下一篇

猜你喜欢

热点阅读