ReactiveCocoa 框架 RACSignal+Operations.h 定义了 RACSignal 常规操作方法,接下来对一些常用的方法进行分析并解析其作用。
doNext
测试代码:
- (void)testDoNext {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"source signal"];
[subscriber sendCompleted];
return nil;
}];
[[sourceSignal doNext:^(id _Nullable x) {
NSLog(@"sourceSignal doNext will execute before sendNext");
}] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
输出:
sourceSignal doNext will execute before sendNext
value = source signal
底层实现:
- (RACSignal *)doNext:(void (^)(id x))block {
NSCParameterAssert(block != NULL);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [self subscribeNext:^(id x) {
block(x);
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -doNext:", self.name];
}
doNext 传入block 闭包,该闭包的参数就是原信号发送的值,当给订阅者发送 sendNext 之前会执行 block 闭包
类似的 doError,doCompleted 也是在给订阅者发送事件之前就执行相关 block
throttle:valuesPassingTest:
测试代码:
- (void)testThrottleValuesPassingTest {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
});
return nil;
}];
RACSignal *throttleSignal = [sourceSignal throttle:1 valuesPassingTest:^BOOL(id _Nullable next) {
return [next integerValue] <= 2;
}];
[throttleSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
输出:
value = 1
value = 3
throttle:valuesPassingTest: 方法有2个参数:
- 时间间隔
interval
- 判断条件闭包
predicate
该方法大概作用是:从原信号发出第一个信号(@0)发出开始计时,在 interval
秒内如果第二个信号(@1) 符合 predicate 的判断条件,则该前一个信号会被忽略;如果2个信号间隔时间超过 interval
秒或者不满足 predicate 判断,则前一个信号会发给订阅者。
底层实现:
- (RACSignal *)throttle:(NSTimeInterval)interval valuesPassingTest:(BOOL (^)(id next))predicate {
NSCParameterAssert(interval >= 0);
NSCParameterAssert(predicate != nil);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
// We may never use this scheduler, but we need to set it up ahead of
// time so that our scheduled blocks are run serially if we do.
RACScheduler *scheduler = [RACScheduler scheduler];
// Information about any currently-buffered `next` event.
__block id nextValue = nil;
__block BOOL hasNextValue = NO;
RACSerialDisposable *nextDisposable = [[RACSerialDisposable alloc] init];
void (^flushNext)(BOOL send) = ^(BOOL send) {
@synchronized (compoundDisposable) {
[nextDisposable.disposable dispose];
if (!hasNextValue) return;
if (send) [subscriber sendNext:nextValue];
nextValue = nil;
hasNextValue = NO;
}
};
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
BOOL shouldThrottle = predicate(x);
@synchronized (compoundDisposable) {
flushNext(NO);
if (!shouldThrottle) {
[subscriber sendNext:x];
return;
}
nextValue = x;
hasNextValue = YES;
nextDisposable.disposable = [delayScheduler afterDelay:interval schedule:^{
flushNext(YES);
}];
}
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
flushNext(YES);
[subscriber sendCompleted];
}];
[compoundDisposable addDisposable:subscriptionDisposable];
return compoundDisposable;
}] setNameWithFormat:@"[%@] -throttle: %f valuesPassingTest:", self.name, (double)interval];
}
throttle:valuesPassingTest: 内部主要通过判断 nextValue
和 hasNextValue
2个变量的状态来判断是否给订阅者发送信号。内部先订阅原信号,然后触发订阅者的 didSubscribe,结合测试代码,具体流程:
- 收到原信号发出的信号
@0
,传入到条件闭包 predicate,返回 YES
- RACCompoundDisposable 作为线程间互斥信号量,用 @synchronized 加锁保证
nextValue
和 hasNextValue
操作是原子性
- 执行 flushNext(NO) ,把 nextDisposable 进行 dispose,delayScheduler 之前存放的延迟任务如果未被执行会被取消;hasNextValue == NO,直接 return,没有给订阅者发送0
- 判断 !shouldThrottle,跳过 if 内部代码,给 nextValue 和 hasNextValue 赋值,nextValue=@0,hasNextValue = YES
- 把 flushNext(YES) 加入到延迟队列中,1秒后执行
- 原信号发送 @1,此时时间间隔不到 1秒,从步骤3开始重复上述步骤;flushNext(NO) 中 延迟任务被取消,
nextValue
和 hasNextValue
会赋值为对应 零值。shouldThrottle 符合,nextValue
和 hasNextValue
又被赋值到 @1 和 YES,保存新的 flushNext(YES) 到延迟任务队列中
- 1秒后还没有收到原信号发送的信号,执行步骤6保存的 flushNext(YES),把 @1 发给订阅者;2秒后收到原信号的信号 @2,从步骤3开始重复上述步骤,@2会被保存到
hasNextValue
中,等待下一次延迟任务可以触发的时候发给订阅者,反之被忽略
- 步骤7完之后马上收到新的信号@3,从步骤3开始重复上述步骤,步骤7保存的 flushNext(YES) 被取消,因为此时不符合 predicate 判断条件,@3发给订阅者
delay:
测试代码:
- (void)testDelay {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];
[[sourceSignal delay:1] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
NSLog(@"%@", [NSDate date]);
}
输出:
2019-01-31 09:34:33.286371+0800 AppTest[38331:4774309] Thu Jan 31 09:34:33 2019
2019-01-31 09:34:34.380736+0800 AppTest[38331:4774309] value = 0
2019-01-31 09:34:34.380881+0800 AppTest[38331:4774309] value = 1
2019-01-31 09:34:34.380986+0800 AppTest[38331:4774309] value = 2
2019-01-31 09:34:34.381076+0800 AppTest[38331:4774309] value = 3
delay: 方法是将原信号的信号事件延迟发送给订阅者
底层实现:
- (RACSignal *)delay:(NSTimeInterval)interval {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
// We may never use this scheduler, but we need to set it up ahead of
// time so that our scheduled blocks are run serially if we do.
RACScheduler *scheduler = [RACScheduler scheduler];
void (^schedule)(dispatch_block_t) = ^(dispatch_block_t block) {
RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block];
[disposable addDisposable:schedulerDisposable];
};
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
schedule(^{
[subscriber sendNext:x];
});
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
schedule(^{
[subscriber sendCompleted];
});
}];
[disposable addDisposable:subscriptionDisposable];
return disposable;
}] setNameWithFormat:@"[%@] -delay: %f", self.name, (double)interval];
}
delay: 内部首先先定义 schedule ,其参数是 dispatch_block_t,执行 schedule 的时候,会把参数 block 放到 RACScheduler 延迟 interval 秒触发。
- (RACDisposable *)afterDelay:(NSTimeInterval)delay schedule:(void (^)(void))block {
return [self after:[NSDate dateWithTimeIntervalSinceNow:delay] schedule:block];
}
- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(block != NULL);
RACDisposable *disposable = [[RACDisposable alloc] init];
dispatch_after([self.class wallTimeWithDate:date], self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});
return disposable;
}
根据测试代码,最终会调用 RACQueueScheduler 中的 -after:schedule:
,在该方法里面,通过 dispatch_after 来触发入参 block,触发先前会先判断该任务是否被 disposed ,如果是则直接 return。
归纳来说,delay: 方法就是将原信号的 sendNext 和 sendCompleted 事件延迟 interval
秒发送给订阅者。
bufferWithTime:onScheduler:
测试代码:
- (void)testBufferWithTime {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];
RACSignal *bufferSignal = [sourceSignal bufferWithTime:1 onScheduler:[RACScheduler currentScheduler]];
[bufferSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
输出:
value = <RACTuple: 0x600000003200> (
0,
1,
2,
3
)
底层实现:
- (RACSignal *)bufferWithTime:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init];
NSMutableArray *values = [NSMutableArray array];
void (^flushValues)() = ^{
@synchronized (values) {
[timerDisposable.disposable dispose];
if (values.count == 0) return;
RACTuple *tuple = [RACTuple tupleWithObjectsFromArray:values];
[values removeAllObjects];
[subscriber sendNext:tuple];
}
};
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
@synchronized (values) {
if (values.count == 0) {
timerDisposable.disposable = [scheduler afterDelay:interval schedule:flushValues];
}
[values addObject:x ?: RACTupleNil.tupleNil];
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
flushValues();
[subscriber sendCompleted];
}];
return [RACDisposable disposableWithBlock:^{
[selfDisposable dispose];
[timerDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -bufferWithTime: %f onScheduler: %@", self.name, (double)interval, scheduler];
}
同样 bufferWithTime:onScheduler: 内部还会创建新的信号,当新的信号被订阅的时候会创建可变数组 values 。然后定义闭包 flushValues,当收到原信号的 sendNext 和 sendCompleted 的时候会触发该闭包。
之后对原信号进行订阅,首先收到原信号的 sendNext 事件的时候,先判断 values 的元素个数,如果个数为 0,则把 flushValues 延迟 interval
秒触发。然后把信号值加入 values 数组中,如果信号值为空,则把 RACTupleNil.tupleNil 加入数组中。
因为测试代码中,@0、@1、@2、@3 四个信号是连续串行发出,所以之前被加入延迟队列中执行的 flushValues 还没有触发的时候,当原信号发送 sendCompleted 的时候,flushValues 会被触发,这时候 values 四个元素被一次性包装成 RACTuple 发送给订阅者。
如果把测试代码修改成:
- (void)testBufferWithTime2 {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@2];
[subscriber sendNext:@3];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendCompleted];
});
});
return nil;
}];
RACSignal *bufferSignal = [sourceSignal bufferWithTime:1 onScheduler:[RACScheduler currentScheduler]];
[bufferSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
输出:
value = <RACTuple: 0x604000204300> (
0,
1
)
value = <RACTuple: 0x600000019300> (
2,
3
)
订阅者此时会受到2个信号,都是 RACTuple 类型,回到之前的实现代码,
- 收到 @0,flushValues() 加入延迟队列1秒之后执行,@0 再加入到数组 values 中
- 步骤1之后马上收到 @1(1秒内),重复步骤1,这时候延迟队列中有2个任务
- 1秒后,第一个加入延迟队列的 flushValues() 执行,首先执行了
[timerDisposable.disposable dispose];
,这样会将延迟队列中下一个 flushValues() 任务被取消(步骤2加入的)。然后 values 元素包装成 RACTuple 发送给订阅者。
- 基于步骤3,再过一秒,也就是从一开始算,2秒过后。收到@2,然后重复步骤,逻辑相似
- 最后再过2秒,原信号发送 sendCompleted,执行 flushValues() ,values 元素个数为0,直接返回。
总结:bufferWithTime:onScheduler: 作用是将规定时间内将原信号所发送的全部信号包装成 RACTupe 发送给订阅者
timeout:onScheduler:
测试代码:
- (void)testTimeout {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@3];
[subscriber sendCompleted];
});
return nil;
}];
RACSignal *timeoutSig = [sourceSignal timeout:1 onScheduler:[RACScheduler currentScheduler]];
[timeoutSig subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
[timeoutSig subscribeError:^(NSError * _Nullable error) {
NSLog(@"error = %@", error);
}];
}
输出:
value = 0
value = 1
value = 2
error = Error Domain=RACSignalErrorDomain Code=1 "(null)"
底层实现:
- (RACSignal *)timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *timeoutDisposable = [scheduler afterDelay:interval schedule:^{
[disposable dispose];
[subscriber sendError:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorTimedOut userInfo:nil]];
}];
[disposable addDisposable:timeoutDisposable];
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[disposable dispose];
[subscriber sendError:error];
} completed:^{
[disposable dispose];
[subscriber sendCompleted];
}];
[disposable addDisposable:subscriptionDisposable];
return disposable;
}] setNameWithFormat:@"[%@] -timeout: %f onScheduler: %@", self.name, (double)interval, scheduler];
}
timeout:onScheduler: 判断从被订阅开始计时, interval
时间内如果原信号没有把所有信号发送完毕会给订阅者发送 error。
首先创建新的信号,新信号被订阅的时候在延迟队列里面加入任务, interval
后该任务会将 NSError 发送给订阅者。如果在 interval
内原型号发送了 sendCompleted/sendError,会执行 [disposable dispose];
,延迟队列中的任务会被取消。同样如果延迟任务被触发,原信号的订阅也被终止。
map:
测试代码:
/* 代码1 */
- (void)testMap {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];
[[sourceSignal map:^id _Nullable(id _Nullable value) {
// BLOCK 1
return @([value integerValue] + 1);
}] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
输出:
value = 2
value = 3
value = 4
底层实现:
/* 代码2 */
- (__kindof RACStream *)map:(id (^)(id value))block {
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^(id value) {
// BLOCK 2
return [class return:block(value)];
}] setNameWithFormat:@"[%@] -map:", self.name];
}
map 的参数一个参数类型为id,返回值类型也为id的block,内部实现首先通过断言判断 block 是否为空,然后调用 flattenMap 方法
/* 代码3 */
- (__kindof RACStream *)flattenMap:(__kindof RACStream * (^)(id value))block {
Class class = self.class;
return [[self bind:^{
return ^(id value, BOOL *stop) {
// BLOCK 3
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
return stream;
};
}] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}
flattenMap 是通过封装 bind 方法实现,bind的入参是 RACStreamBindBlock 类型闭包,flattenMap 的入参是一个入参类型为 id,返回值为 RACStream 的闭包
在 flattenMap 中,先判断 block(value) 返回的信号是否为nil,若是则返对应class 的empty 信号,也就是 RACEmptySignal 对象
+ (RACSignal *)empty {
#ifdef DEBUG
// Create multiple instances of this class in DEBUG so users can set custom
// names on each.
return [[[self alloc] init] setNameWithFormat:@"+empty"];
#else
static id singleton;
static dispatch_once_t pred;
dispatch_once(&pred, ^{
singleton = [[self alloc] init];
});
return singleton;
#endif
}
从实现代码上来看,RACEmptySignal 是以单例对象的形式返回
RACEmptySignal.m 文件里还重写了 -subscribe
:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendCompleted];
}];
}
RACEmptySignal 信号被订阅之后会马上给订阅者发送 sendCompleted 事件
回到 flattenMap 的实现逻辑中,如果 block(value) 返回不是nil,它是如何返回 RACStream 对象的呢?
执行 block(value) 实际上就是触发 代码2 中的 BLOCK 2
在 BLOCK 2
中,又会触发另个 block(value),这时候会触发代码1中的 BLOCK 1
,这里 block(value) 会返回相关对象(测试代码中返回NSNumber),BLOCK 2
中将 NSNumber 包装成 RACReturnSignal 返回,此时在 BLOCK 3
stream 对象就是 RACReturnSignal。
最后通过 bind 函数的变换,订阅会收到变换过后的值。
![image-20190126201245265]()
mapReplace
测试代码:
- (void)testMapReplace {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];
[[sourceSignal mapReplace:@(10)] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
输出:
value = 10
value = 10
value = 10
底层实现:
- (__kindof RACStream *)mapReplace:(id)object {
return [[self map:^(id _) {
return object;
}] setNameWithFormat:@"[%@] -mapReplace: %@", self.name, RACDescription(object)];
}
mapReplace 是通过封装 map 方法实现,把原信号每一个事件都变换成参数 object 传递给订阅者
reduceEach
测试代码:
- (void)testReduceEach {
RACSignal *signal1 = [RACSignal createSignal:
^RACDisposable *(id<RACSubscriber> subscriber)
{
[subscriber sendNext:RACTuplePack(@1,@2)];
[subscriber sendNext:RACTuplePack(@3,@4)];
[subscriber sendNext:RACTuplePack(@5,@6)];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal1 dispose");
}];
}];
RACSignal *signal2 = [signal1 reduceEach:^id (NSNumber *num1 , NSNumber *num2){
return @([num1 intValue] + [num2 intValue]);
}];
[signal2 subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
输出:
value = 3
value = 7
value = 11
signal1 dispose
从输出结果看出来,reduceEach 将原信号 signal1 发送的 RACTuple 数据进行解包聚合发送给订阅者
底层实现:
/* 代码4 */
- (__kindof RACStream *)reduceEach:(RACReduceBlock)reduceBlock {
NSCParameterAssert(reduceBlock != nil);
__weak RACStream *stream __attribute__((unused)) = self;
return [[self map:^(RACTuple *t) {
// BLOCK 1
NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
}] setNameWithFormat:@"[%@] -reduceEach:", self.name];
}
reduceEach 也是通过封装 map 方法实现。reduceEach 方法的入参是 RACReduceBlock 类型的闭包
typedef id _Nonnull (^RACReduceBlock)();
首先通过断言判断 reduceBlock 闭包是否为nil,然后调用 map 方法,map 方法的入参也就是 BLOCK 1
,在 BLOCK 1
里会先判断原信号 signal 发送的数据 t 是否为 RACTuple 类型,然后返回 RACBlockTrampoline 类型对象
@interface RACBlockTrampoline ()
@property (nonatomic, readonly, copy) id block;
@end
RACBlockTrampoline 内部会保存一个 block 对象,然后根据传进来的参数,动态的构造一个 NSInvocation,通过执行 NSInvocation 返回需要的数据。
- (id)invokeWithArguments:(RACTuple *)arguments {
//
SEL selector = [self selectorForArgumentCount:arguments.count];
NSInvocation *invocation = [NSInvocation invocationWithMethodSignature:[self methodSignatureForSelector:selector]];
invocation.selector = selector;
invocation.target = self;
for (NSUInteger i = 0; i < arguments.count; i++) {
id arg = arguments[i];
NSInteger argIndex = (NSInteger)(i + 2);
[invocation setArgument:&arg atIndex:argIndex];
}
[invocation invoke];
__unsafe_unretained id returnVal;
[invocation getReturnValue:&returnVal];
return returnVal;
}
- (SEL)selectorForArgumentCount:(NSUInteger)count {
NSCParameterAssert(count > 0);
switch (count) {
case 0: return NULL;
case 1: return @selector(performWith:);
case 2: return @selector(performWith::);
case 3: return @selector(performWith:::);
case 4: return @selector(performWith::::);
case 5: return @selector(performWith:::::);
case 6: return @selector(performWith::::::);
case 7: return @selector(performWith:::::::);
case 8: return @selector(performWith::::::::);
case 9: return @selector(performWith:::::::::);
case 10: return @selector(performWith::::::::::);
case 11: return @selector(performWith:::::::::::);
case 12: return @selector(performWith::::::::::::);
case 13: return @selector(performWith:::::::::::::);
case 14: return @selector(performWith::::::::::::::);
case 15: return @selector(performWith:::::::::::::::);
}
NSCAssert(NO, @"The argument count is too damn high! Only blocks of up to 15 arguments are currently supported.");
return NULL;
}
- (id)performWith:(id)obj1 :(id)obj2 {
id (^block)(id, id) = self.block;
return block(obj1, obj2);
}
...
-invokeWithArguments` 中首先通过判断 RACTuple 元素数量选择对应的 selector,最大能支持 15 个元素。
确定好 NSInvocation 的 target 和 seletor,还需要设置参数
for (NSUInteger i = 0; i < arguments.count; i++) {
id arg = arguments[i];
NSInteger argIndex = (NSInteger)(i + 2);
[invocation setArgument:&arg atIndex:argIndex];
}
这里看到设置 Type Encodings 的时候是从偏移量为 2开始,这是因为偏移量0和 偏移量1的参数分别对应着隐藏参数self 和 _cmd。
构造好 invocation 之后,执行 [invocation invoke]
调用动态方法,实际上就是执行代码4中的入参 reduceBlock 闭包,最后通过 [invocation getReturnValue:&returnVal]
拿到闭包的返回值,也就是 RACBlockTrampoline 的最终返回值,最终成为 map 闭包里面的返回值。剩下的就是 map 函数流程。
or
测试代码
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:RACTuplePack(@NO,@YES,@NO)];
[subscriber sendNext:RACTuplePack(@YES,@NO)];
[subscriber sendCompleted];
return nil;
}];
RACSignal *orSignal = [sourceSignal or];
[orSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
输出
value = 1
value = 1
- (RACSignal *)or {
return [[self map:^(RACTuple *tuple) {
NSCAssert([tuple isKindOfClass:RACTuple.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
NSCAssert(tuple.count > 0, @"-or must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");
return @([tuple.rac_sequence any:^(NSNumber *number) {
/// anyBlock
NSCAssert([number isKindOfClass:NSNumber.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);
return number.boolValue;
}]);
}] setNameWithFormat:@"[%@] -or", self.name];
}
这里主要是将 RACTuple 转换成 RACTupleSequence,因为 RACTupleSequence 是继承 RACSequence,这里就会调用 RACSequence 的 - (BOOL)any:(BOOL (^)(id))block
方法
// RACSequence.m
- (BOOL)any:(BOOL (^)(id))block {
NSCParameterAssert(block != NULL);
/// block = BOOL ^(NSNumber *number) {return number.boolValue;}
return [self objectPassingTest:block] != nil;
}
// RACSequence.m
- (id)objectPassingTest:(BOOL (^)(id))block {
NSCParameterAssert(block != NULL);
/// block = BOOL ^(NSNumber *number) {return number.boolValue;}
return [self filter:block].head;
}
- (BOOL)any:(BOOL (^)(id))block
实现里面最终会执行 RACStream 中的 filter
方法
- (__kindof RACStream *)filter:(BOOL (^)(id value))block {
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^ id (id value) {
// flattenMapBlock
/// block = BOOL ^(NSNumber *number) {return number.boolValue;}
if (block(value)) {
return [class return:value];
} else {
return class.empty;
}
}] setNameWithFormat:@"[%@] -filter:", self.name];
}
在 flattenMapBlock 中会以 RACTupleSequence 发出的信号值传入到 block中,这些值信号值也就是一开始测试代码中 sourceSignal 发送给订阅者的 RACTuple 中的元素,如果value对应的BOOL值是YES,就转换成一个 RACTupleSequence 信号。如果对应的是NO,则转换成一个empty信号。
上面的 flattenMap
方法实际上最终调用到 RACSequence 的 -bind
方法
/*
block = ^{
return ^(id value, BOOL *stop) {
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
return stream;
};
bindBlock = ^(id value, BOOL *stop) {
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
return stream;
};
*/
- (RACSequence *)bind:(RACSequenceBindBlock (^)(void))block {
RACSequenceBindBlock bindBlock = block();
return [[self bind:bindBlock passingThroughValuesFromSequence:nil] setNameWithFormat:@"[%@] -bind:", self.name];
}
然后继续调用 - (RACSequence *)bind:(RACSequenceBindBlock)bindBlock passingThroughValuesFromSequence:(RACSequence *)passthroughSequence
这里参数 passthroughSequence 为 nil
- (RACSequence *)bind:(RACSequenceBindBlock)bindBlock passingThroughValuesFromSequence:(RACSequence *)passthroughSequence {
__block RACSequence *valuesSeq = self;
__block RACSequence *current = passthroughSequence;
__block BOOL stop = NO;
RACSequence *sequence = [RACDynamicSequence sequenceWithLazyDependency:^ id {
/// dependencyBlock
while (current.head == nil) {
if (stop) return nil;
// We've exhausted the current sequence, create a sequence from the
// next value.
id value = valuesSeq.head;
if (value == nil) {
// We've exhausted all the sequences.
stop = YES;
return nil;
}
current = (id)bindBlock(value, &stop);
if (current == nil) {
stop = YES;
return nil;
}
valuesSeq = valuesSeq.tail;
}
NSCAssert([current isKindOfClass:RACSequence.class], @"-bind: block returned an object that is not a sequence: %@", current);
return nil;
} headBlock:^(id _) {
return current.head;
} tailBlock:^ id (id _) {
if (stop) return nil;
return [valuesSeq bind:bindBlock passingThroughValuesFromSequence:current.tail];
}];
sequence.name = self.name;
return sequence;
}
上面代码的关键逻辑是调用 RACDynamicSequence 类的 sequenceWithLazyDependency
方法,dependencyBlock 会在之前提及到的函数 -objectPassingTest
中,执行 RACSequence 的 -head
方法中触发
在 方法,dependencyBlock 中,执行current = (id)bindBlock(value, &stop);
根据value的布尔值来产生新的信号,如果为 NO 则返回 RACEmptySequence 类型,此时 current.head 为 nil,进行下一次循环;如过 value 为YES,则返回 RACUnarySequence 类型,current.head 不为 nil,结束循环。
any:
测试代码:
- (void)testAny {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];
[[sourceSignal any:^BOOL(id _Nullable object) {
return [object integerValue] < 2;
}] subscribeNext:^(NSNumber * _Nullable x) {
NSLog(@"value = %@", x);
}];
}
输出:
value = 1
value = 0
value = 0
底层实现:
- (RACSignal *)any:(BOOL (^)(id object))predicateBlock {
NSCParameterAssert(predicateBlock != NULL);
return [[[self materialize] bind:^{
return ^(RACEvent *event, BOOL *stop) {
if (event.finished) {
*stop = YES;
return [RACSignal return:@NO];
}
if (predicateBlock(event.value)) {
*stop = YES;
return [RACSignal return:@YES];
}
return [RACSignal empty];
};
}] setNameWithFormat:@"[%@] -any:", self.name];
}
首先原信号会 通过 -materialize
方法转换成 RACEvent 对象
- (RACSignal *)materialize {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [self subscribeNext:^(id x) {
[subscriber sendNext:[RACEvent eventWithValue:x]];
} error:^(NSError *error) {
[subscriber sendNext:[RACEvent eventWithError:error]];
[subscriber sendCompleted];
} completed:^{
[subscriber sendNext:RACEvent.completedEvent];
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -materialize", self.name];
}
- (BOOL)isFinished {
return self.eventType == RACEventTypeCompleted || self.eventType == RACEventTypeError;
}
从上面代码发现,RACEvent 如果收到原信号的 sendCompleted / sendError,finished 属性会置为 YES
在 -any
方法中,先判断 RACEvent 的 finished 属性,如果为 YES,stop接下来的信号 则返回 [RACSignal return:@NO];反之,则会根据入参 predicateBlock 闭包,将 RACEvent 的 value 传入 predicateBlock,如果返回值为YES,stop接下来的信号,则返回 [RACSignal return:@YES];如果 predicateBlock 返回值为 NO,则返回 [RACSignal
empty]。
简单地总结来说,any 方法根据 predicateBlock 来对原信号的每一个信号进行判断,若遇到返回 YES 的条件,就给订阅者发送 YES 信号,然后发送 sendCompleted;若 predicateBlock 没有返回 YES 的条件,则最后给 订阅者 发送 NO 信号。
all
- (RACSignal *)all:(BOOL (^)(id object))predicateBlock {
NSCParameterAssert(predicateBlock != NULL);
return [[[self materialize] bind:^{
return ^(RACEvent *event, BOOL *stop) {
if (event.eventType == RACEventTypeCompleted) {
*stop = YES;
return [RACSignal return:@YES];
}
if (event.eventType == RACEventTypeError || !predicateBlock(event.value)) {
*stop = YES;
return [RACSignal return:@NO];
}
return [RACSignal empty];
};
}] setNameWithFormat:@"[%@] -all:", self.name];
}
all 方法可以理解为 any 方法的对立面,原信号如果发送 sendError 或者 predicateBlock 返回为 NO,就会结束信号的传递,并会给订阅者发值为 NO 的信号。如果整个订阅过程中都没有出现错误以及都满足 predicateBlock 为真的条件,最后会在 RACEventTypeCompleted 的时候发送 YES。
repeat
测试代码:
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendCompleted];
return nil;
}];
RACSignal *orSignal = [sourceSignal repeat];
[orSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
输出:
value = 1
value = 1
value = 1
...
执行之后会不断地打印 value = 1
- (RACSignal *)repeat {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return subscribeForever(self,
^(id x) {
[subscriber sendNext:x];
},
^(NSError *error, RACDisposable *disposable) {
[disposable dispose];
[subscriber sendError:error];
},
^(RACDisposable *disposable) {
// Resubscribe.
});
}] setNameWithFormat:@"[%@] -repeat", self.name];
}
执行 -repeat
方法之后,内部会创建新的 RASignal,当新的信号被订阅的时候会执行 subscribeForever
方法
static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) {
next = [next copy];
error = [error copy];
completed = [completed copy];
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) {
RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
[compoundDisposable addDisposable:selfDisposable];
__weak RACDisposable *weakSelfDisposable = selfDisposable;
RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) {
@autoreleasepool {
error(e, compoundDisposable);
[compoundDisposable removeDisposable:weakSelfDisposable];
}
recurse();
} completed:^{
@autoreleasepool {
completed(compoundDisposable);
[compoundDisposable removeDisposable:weakSelfDisposable];
}
recurse();
}];
[selfDisposable addDisposable:subscriptionDisposable];
};
// Subscribe once immediately, and then use recursive scheduling for any
// further resubscriptions.
recursiveBlock(^{
RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];
RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock];
[compoundDisposable addDisposable:schedulingDisposable];
});
return compoundDisposable;
}
subscribeForever
方法有4个参数,分别是源信号 signal,next 闭包,error 闭包和 complete 闭包。函数内部先定义好递归的闭包:recursiveBlock,recursiveBlock 中 首先对 signal 进行订阅,如果源信号 signal 发送 sendError 或者 sendCompleted,就会执行对应的 error/complete 闭包,然后就执行 recursiveBlock 的参数闭包 recurse。
subscribeForever
最后是执行 recursiveBlock 并传入具体的 recurse。
recurse 中首先获取当前的递归调度器 recursiveScheduler,然后执行 -scheduleRecursiveBlock
方法
- (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
[self scheduleRecursiveBlock:[recursiveBlock copy] addingToDisposable:disposable];
return disposable;
}
- (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable {
@autoreleasepool {
RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
[disposable addDisposable:selfDisposable];
__weak RACDisposable *weakSelfDisposable = selfDisposable;
// 最终调用 schedule 方法
RACDisposable *schedulingDisposable = [self schedule:^{
/// scheduleBlock
@autoreleasepool {
// At this point, we've been invoked, so our disposable is now useless.
[disposable removeDisposable:weakSelfDisposable];
}
if (disposable.disposed) return;
void (^reallyReschedule)(void) = ^{
if (disposable.disposed) return;
[self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable];
};
// Protects the variables below.
//
// This doesn't actually need to be __block qualified, but Clang
// complains otherwise. :C
__block NSLock *lock = [[NSLock alloc] init];
lock.name = [NSString stringWithFormat:@"%@ %s", self, sel_getName(_cmd)];
__block NSUInteger rescheduleCount = 0;
// Set to YES once synchronous execution has finished. Further
// rescheduling should occur immediately (rather than being
// flattened).
__block BOOL rescheduleImmediately = NO;
@autoreleasepool {
recursiveBlock(^{
[lock lock];
BOOL immediate = rescheduleImmediately;
if (!immediate) ++rescheduleCount;
[lock unlock];
if (immediate) reallyReschedule();
});
}
[lock lock];
NSUInteger synchronousCount = rescheduleCount;
rescheduleImmediately = YES;
[lock unlock];
for (NSUInteger i = 0; i < synchronousCount; i++) {
reallyReschedule();
}
}];
[selfDisposable addDisposable:schedulingDisposable];
}
}
/// RACTargetQueueScheduler.m
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
RACDisposable *disposable = [[RACDisposable alloc] init];
dispatch_async(self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});
return disposable;
}
可以看到会调用到自己的 -schedule
方法,这里测试代码中是在主线程,获取到对应的是 RACTargetQueueScheduler 。这里的 -schedule
方法先判断原信号有没有disposed,若果没有,则把参数 block 放在对应的队列中触发。
可以看得到上面函数中 scheduleBlock 里不断递归执行 [self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable]
,recursiveBlock 不断被触发,对应的 subscribeForever
中的 recurse() 循环执行:
scheduleRecursiveBlock->recursiveBlock->recurse()->reallyReschedule()->scheduleRecursiveBlock
也就是说源信号会循环被订阅触发其给订阅者发送 sendNext 事件,直到源信号发送 error 才结束。
retry:
测试代码:
- (void)testRetry {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendError:[NSError errorWithDomain:@"domain" code:-1 userInfo:nil]];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];
RACSignal *retrySignal = [sourceSignal retry:2];
[retrySignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
输出:
value = 1
value = 2
value = 1
value = 2
value = 1
value = 2
- (RACSignal *)retry:(NSInteger)retryCount {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
__block NSInteger currentRetryCount = 0;
return subscribeForever(self,
^(id x) {
[subscriber sendNext:x];
},
^(NSError *error, RACDisposable *disposable) {
// 当原始信号发送 sendError 时
if (retryCount == 0 || currentRetryCount < retryCount) {
// Resubscribe.
currentRetryCount++;
return;
}
[disposable dispose];
[subscriber sendError:error];
},
^(RACDisposable *disposable) {
// 当原信号发送 sendCompleted
[disposable dispose];
[subscriber sendCompleted];
});
}] setNameWithFormat:@"[%@] -retry: %lu", self.name, (unsigned long)retryCount];
}
-retry:
实现与 -repeat
实现类似,基于 subscribeForever
。-retry:
是内部维护一个 currentRetryCount 变量,当原始信号发送 sendError 时判断重试次数 currentRetryCount 是否小于 retryCount,若是则重试,如果重试依旧收到 sendError,超过 retryCount 之后就会停止重试。
如果原信号没有发生错误,那么原信号在发送结束,当原信号发送 sendCompleted,subscribeForever
也就接受了,所以 -retry:
操作对于没有任何error的信号 和 直接订阅原信号表现一样。
将测试代码改为:
- (void)testRetryNoError {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendCompleted];
return nil;
}];
RACSignal *retrySignal = [sourceSignal retry:2];
[retrySignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
输出:
value = 1
value = 2
原信号发送 sendCompleted ,整个订阅流程就结束了。