今天看啥  ›  专栏  ›  人类买水精华

RACSignal常用方法深入分析(1)

人类买水精华  · 掘金  ·  · 2019-02-01 14:13

ReactiveCocoa 框架 RACSignal+Operations.h 定义了 RACSignal 常规操作方法,接下来对一些常用的方法进行分析并解析其作用。

doNext

测试代码:

- (void)testDoNext {
    RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@"source signal"];
        
        [subscriber sendCompleted];
        return nil;
    }];
    
    [[sourceSignal doNext:^(id  _Nullable x) {
        NSLog(@"sourceSignal doNext will execute before sendNext");
    }] subscribeNext:^(id  _Nullable x) {
        NSLog(@"value = %@", x);
    }];
}

输出:

sourceSignal doNext will execute before sendNext
value = source signal

底层实现:

- (RACSignal *)doNext:(void (^)(id x))block {
	NSCParameterAssert(block != NULL);

	return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
		return [self subscribeNext:^(id x) {
			block(x);
			[subscriber sendNext:x];
		} error:^(NSError *error) {
			[subscriber sendError:error];
		} completed:^{
			[subscriber sendCompleted];
		}];
	}] setNameWithFormat:@"[%@] -doNext:", self.name];
}

doNext 传入block 闭包,该闭包的参数就是原信号发送的值,当给订阅者发送 sendNext 之前会执行 block 闭包

类似的 doError,doCompleted 也是在给订阅者发送事件之前就执行相关 block

throttle:valuesPassingTest:

测试代码:

- (void)testThrottleValuesPassingTest {
    RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@0];
        [subscriber sendNext:@1];
        dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
            
            [subscriber sendNext:@2];
            [subscriber sendNext:@3];
            [subscriber sendCompleted];
        });
        return nil;
    }];
    
    RACSignal *throttleSignal = [sourceSignal throttle:1 valuesPassingTest:^BOOL(id  _Nullable next) {
        return [next integerValue] <= 2;
    }];
    
    [throttleSignal subscribeNext:^(id  _Nullable x) {
        NSLog(@"value = %@", x);
    }];
}

输出:

value = 1
value = 3

throttle:valuesPassingTest: 方法有2个参数:

  • 时间间隔 interval
  • 判断条件闭包 predicate

该方法大概作用是:从原信号发出第一个信号(@0)发出开始计时,在 interval 秒内如果第二个信号(@1) 符合 predicate 的判断条件,则该前一个信号会被忽略;如果2个信号间隔时间超过 interval 秒或者不满足 predicate 判断,则前一个信号会发给订阅者。

底层实现:

- (RACSignal *)throttle:(NSTimeInterval)interval valuesPassingTest:(BOOL (^)(id next))predicate {
	NSCParameterAssert(interval >= 0);
	NSCParameterAssert(predicate != nil);

	return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
		RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

		// We may never use this scheduler, but we need to set it up ahead of
		// time so that our scheduled blocks are run serially if we do.
		RACScheduler *scheduler = [RACScheduler scheduler];

		// Information about any currently-buffered `next` event.
		__block id nextValue = nil;
		__block BOOL hasNextValue = NO;
		RACSerialDisposable *nextDisposable = [[RACSerialDisposable alloc] init];

		void (^flushNext)(BOOL send) = ^(BOOL send) {
			@synchronized (compoundDisposable) {
				[nextDisposable.disposable dispose];

				if (!hasNextValue) return;
				if (send) [subscriber sendNext:nextValue];

				nextValue = nil;
				hasNextValue = NO;
			}
		};

		RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
			RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
			BOOL shouldThrottle = predicate(x);

			@synchronized (compoundDisposable) {
				flushNext(NO);
				if (!shouldThrottle) {
					[subscriber sendNext:x];
					return;
				}

				nextValue = x;
				hasNextValue = YES;
				nextDisposable.disposable = [delayScheduler afterDelay:interval schedule:^{
					flushNext(YES);
				}];
			}
		} error:^(NSError *error) {
			[compoundDisposable dispose];
			[subscriber sendError:error];
		} completed:^{
			flushNext(YES);
			[subscriber sendCompleted];
		}];

		[compoundDisposable addDisposable:subscriptionDisposable];
		return compoundDisposable;
	}] setNameWithFormat:@"[%@] -throttle: %f valuesPassingTest:", self.name, (double)interval];
}

throttle:valuesPassingTest: 内部主要通过判断 nextValuehasNextValue 2个变量的状态来判断是否给订阅者发送信号。内部先订阅原信号,然后触发订阅者的 didSubscribe,结合测试代码,具体流程:

  1. 收到原信号发出的信号 @0,传入到条件闭包 predicate,返回 YES
  2. RACCompoundDisposable 作为线程间互斥信号量,用 @synchronized 加锁保证 nextValuehasNextValue 操作是原子性
  3. 执行 flushNext(NO) ,把 nextDisposable 进行 dispose,delayScheduler 之前存放的延迟任务如果未被执行会被取消;hasNextValue == NO,直接 return,没有给订阅者发送0
  4. 判断 !shouldThrottle,跳过 if 内部代码,给 nextValue 和 hasNextValue 赋值,nextValue=@0,hasNextValue = YES
  5. 把 flushNext(YES) 加入到延迟队列中,1秒后执行
  6. 原信号发送 @1,此时时间间隔不到 1秒,从步骤3开始重复上述步骤;flushNext(NO) 中 延迟任务被取消, nextValuehasNextValue 会赋值为对应 零值。shouldThrottle 符合,nextValuehasNextValue 又被赋值到 @1 和 YES,保存新的 flushNext(YES) 到延迟任务队列中
  7. 1秒后还没有收到原信号发送的信号,执行步骤6保存的 flushNext(YES),把 @1 发给订阅者;2秒后收到原信号的信号 @2,从步骤3开始重复上述步骤,@2会被保存到 hasNextValue 中,等待下一次延迟任务可以触发的时候发给订阅者,反之被忽略
  8. 步骤7完之后马上收到新的信号@3,从步骤3开始重复上述步骤,步骤7保存的 flushNext(YES) 被取消,因为此时不符合 predicate 判断条件,@3发给订阅者

delay:

测试代码:

- (void)testDelay {
    RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@0];
        [subscriber sendNext:@1];
        [subscriber sendNext:@2];
        [subscriber sendNext:@3];
        [subscriber sendCompleted];
        return nil;
    }];
    
    [[sourceSignal delay:1] subscribeNext:^(id  _Nullable x) {
        NSLog(@"value = %@", x);
    }];
    
    NSLog(@"%@", [NSDate date]);
}

输出:

2019-01-31 09:34:33.286371+0800 AppTest[38331:4774309] Thu Jan 31 09:34:33 2019
2019-01-31 09:34:34.380736+0800 AppTest[38331:4774309] value = 0
2019-01-31 09:34:34.380881+0800 AppTest[38331:4774309] value = 1
2019-01-31 09:34:34.380986+0800 AppTest[38331:4774309] value = 2
2019-01-31 09:34:34.381076+0800 AppTest[38331:4774309] value = 3

delay: 方法是将原信号的信号事件延迟发送给订阅者

底层实现:

- (RACSignal *)delay:(NSTimeInterval)interval {
	return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
		RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

		// We may never use this scheduler, but we need to set it up ahead of
		// time so that our scheduled blocks are run serially if we do.
		RACScheduler *scheduler = [RACScheduler scheduler];

		void (^schedule)(dispatch_block_t) = ^(dispatch_block_t block) {
			RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
			RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block];
			[disposable addDisposable:schedulerDisposable];
		};

		RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
			schedule(^{
				[subscriber sendNext:x];
			});
		} error:^(NSError *error) {
			[subscriber sendError:error];
		} completed:^{
			schedule(^{
				[subscriber sendCompleted];
			});
		}];

		[disposable addDisposable:subscriptionDisposable];
		return disposable;
	}] setNameWithFormat:@"[%@] -delay: %f", self.name, (double)interval];
}

delay: 内部首先先定义 schedule ,其参数是 dispatch_block_t,执行 schedule 的时候,会把参数 block 放到 RACScheduler 延迟 interval 秒触发。

- (RACDisposable *)afterDelay:(NSTimeInterval)delay schedule:(void (^)(void))block {
	return [self after:[NSDate dateWithTimeIntervalSinceNow:delay] schedule:block];
}

- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
	NSCParameterAssert(date != nil);
	NSCParameterAssert(block != NULL);

	RACDisposable *disposable = [[RACDisposable alloc] init];

	dispatch_after([self.class wallTimeWithDate:date], self.queue, ^{
		if (disposable.disposed) return;
		[self performAsCurrentScheduler:block];
	});

	return disposable;
}

根据测试代码,最终会调用 RACQueueScheduler 中的 -after:schedule:,在该方法里面,通过 dispatch_after 来触发入参 block,触发先前会先判断该任务是否被 disposed ,如果是则直接 return。

归纳来说,delay: 方法就是将原信号的 sendNext 和 sendCompleted 事件延迟 interval 秒发送给订阅者。

bufferWithTime:onScheduler:

测试代码:

- (void)testBufferWithTime {
    RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@0];
        [subscriber sendNext:@1];
        [subscriber sendNext:@2];
        [subscriber sendNext:@3];
        [subscriber sendCompleted];
        return nil;
    }];
    
    RACSignal *bufferSignal = [sourceSignal bufferWithTime:1 onScheduler:[RACScheduler currentScheduler]];
    [bufferSignal subscribeNext:^(id  _Nullable x) {
        NSLog(@"value = %@", x);
    }];
}

输出:

value = <RACTuple: 0x600000003200> (
    0,
    1,
    2,
    3
)

底层实现:

- (RACSignal *)bufferWithTime:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
	NSCParameterAssert(scheduler != nil);
	NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);

	return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
		RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init];
		NSMutableArray *values = [NSMutableArray array];

		void (^flushValues)() = ^{
			@synchronized (values) {
				[timerDisposable.disposable dispose];

				if (values.count == 0) return;

				RACTuple *tuple = [RACTuple tupleWithObjectsFromArray:values];
				[values removeAllObjects];
				[subscriber sendNext:tuple];
			}
		};

		RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
			@synchronized (values) {
				if (values.count == 0) {
					timerDisposable.disposable = [scheduler afterDelay:interval schedule:flushValues];
				}

				[values addObject:x ?: RACTupleNil.tupleNil];
			}
		} error:^(NSError *error) {
			[subscriber sendError:error];
		} completed:^{
			flushValues();
			[subscriber sendCompleted];
		}];

		return [RACDisposable disposableWithBlock:^{
			[selfDisposable dispose];
			[timerDisposable dispose];
		}];
	}] setNameWithFormat:@"[%@] -bufferWithTime: %f onScheduler: %@", self.name, (double)interval, scheduler];
}

同样 bufferWithTime:onScheduler: 内部还会创建新的信号,当新的信号被订阅的时候会创建可变数组 values 。然后定义闭包 flushValues,当收到原信号的 sendNext 和 sendCompleted 的时候会触发该闭包。

之后对原信号进行订阅,首先收到原信号的 sendNext 事件的时候,先判断 values 的元素个数,如果个数为 0,则把 flushValues 延迟 interval 秒触发。然后把信号值加入 values 数组中,如果信号值为空,则把 RACTupleNil.tupleNil 加入数组中。

因为测试代码中,@0、@1、@2、@3 四个信号是连续串行发出,所以之前被加入延迟队列中执行的 flushValues 还没有触发的时候,当原信号发送 sendCompleted 的时候,flushValues 会被触发,这时候 values 四个元素被一次性包装成 RACTuple 发送给订阅者。

如果把测试代码修改成:

- (void)testBufferWithTime2 {
    RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@0];
        [subscriber sendNext:@1];
        dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
            [subscriber sendNext:@2];
            [subscriber sendNext:@3];
            dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
                [subscriber sendCompleted];
            });
        });
        return nil;
    }];
    
    RACSignal *bufferSignal = [sourceSignal bufferWithTime:1 onScheduler:[RACScheduler currentScheduler]];
    [bufferSignal subscribeNext:^(id  _Nullable x) {
        NSLog(@"value = %@", x);
    }];
}

输出:

value = <RACTuple: 0x604000204300> (
    0,
    1
)
value = <RACTuple: 0x600000019300> (
    2,
    3
)

订阅者此时会受到2个信号,都是 RACTuple 类型,回到之前的实现代码,

  1. 收到 @0,flushValues() 加入延迟队列1秒之后执行,@0 再加入到数组 values 中
  2. 步骤1之后马上收到 @1(1秒内),重复步骤1,这时候延迟队列中有2个任务
  3. 1秒后,第一个加入延迟队列的 flushValues() 执行,首先执行了 [timerDisposable.disposable dispose]; ,这样会将延迟队列中下一个 flushValues() 任务被取消(步骤2加入的)。然后 values 元素包装成 RACTuple 发送给订阅者。
  4. 基于步骤3,再过一秒,也就是从一开始算,2秒过后。收到@2,然后重复步骤,逻辑相似
  5. 最后再过2秒,原信号发送 sendCompleted,执行 flushValues() ,values 元素个数为0,直接返回。

总结:bufferWithTime:onScheduler: 作用是将规定时间内将原信号所发送的全部信号包装成 RACTupe 发送给订阅者

timeout:onScheduler:

测试代码:

- (void)testTimeout {
    RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@0];
        [subscriber sendNext:@1];
        [subscriber sendNext:@2];
        dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
            [subscriber sendNext:@3];
            [subscriber sendCompleted];
        });
        return nil;
    }];
    
    RACSignal *timeoutSig = [sourceSignal timeout:1 onScheduler:[RACScheduler currentScheduler]];
    [timeoutSig subscribeNext:^(id  _Nullable x) {
        NSLog(@"value = %@", x);
    }];
    
    [timeoutSig subscribeError:^(NSError * _Nullable error) {
        NSLog(@"error = %@", error);
    }];
}

输出:

value = 0
value = 1
value = 2
error = Error Domain=RACSignalErrorDomain Code=1 "(null)"

底层实现:

- (RACSignal *)timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
	NSCParameterAssert(scheduler != nil);
	NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);

	return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
		RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

		RACDisposable *timeoutDisposable = [scheduler afterDelay:interval schedule:^{
			[disposable dispose];
			[subscriber sendError:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorTimedOut userInfo:nil]];
		}];

		[disposable addDisposable:timeoutDisposable];

		RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
			[subscriber sendNext:x];
		} error:^(NSError *error) {
			[disposable dispose];
			[subscriber sendError:error];
		} completed:^{
			[disposable dispose];
			[subscriber sendCompleted];
		}];

		[disposable addDisposable:subscriptionDisposable];
		return disposable;
	}] setNameWithFormat:@"[%@] -timeout: %f onScheduler: %@", self.name, (double)interval, scheduler];
}

timeout:onScheduler: 判断从被订阅开始计时, interval 时间内如果原信号没有把所有信号发送完毕会给订阅者发送 error。

首先创建新的信号,新信号被订阅的时候在延迟队列里面加入任务, interval 后该任务会将 NSError 发送给订阅者。如果在 interval 内原型号发送了 sendCompleted/sendError,会执行 [disposable dispose];,延迟队列中的任务会被取消。同样如果延迟任务被触发,原信号的订阅也被终止。

map:

测试代码:

/* 代码1 */

- (void)testMap {
    RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@1];
        [subscriber sendNext:@2];
        [subscriber sendNext:@3];
        
        [subscriber sendCompleted];
        return nil;
    }];
    
    [[sourceSignal map:^id _Nullable(id  _Nullable value) {
        // BLOCK 1
        return @([value integerValue] + 1);
    }] subscribeNext:^(id  _Nullable x) {
        NSLog(@"value = %@", x);
    }];
}

输出:

value = 2
value = 3
value = 4

底层实现:

/* 代码2 */

- (__kindof RACStream *)map:(id (^)(id value))block {
	NSCParameterAssert(block != nil);

	Class class = self.class;
	
	return [[self flattenMap:^(id value) {
        // BLOCK 2
		return [class return:block(value)];
	}] setNameWithFormat:@"[%@] -map:", self.name];
}

map 的参数一个参数类型为id,返回值类型也为id的block,内部实现首先通过断言判断 block 是否为空,然后调用 flattenMap 方法

/* 代码3 */

- (__kindof RACStream *)flattenMap:(__kindof RACStream * (^)(id value))block {
	Class class = self.class;

	return [[self bind:^{
		return ^(id value, BOOL *stop) {
            // BLOCK 3
            
			id stream = block(value) ?: [class empty];
			NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);

			return stream;
		};
	}] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}

flattenMap 是通过封装 bind 方法实现,bind的入参是 RACStreamBindBlock 类型闭包,flattenMap 的入参是一个入参类型为 id,返回值为 RACStream 的闭包

在 flattenMap 中,先判断 block(value) 返回的信号是否为nil,若是则返对应class 的empty 信号,也就是 RACEmptySignal 对象

+ (RACSignal *)empty {
#ifdef DEBUG
	// Create multiple instances of this class in DEBUG so users can set custom
	// names on each.
	return [[[self alloc] init] setNameWithFormat:@"+empty"];
#else
	static id singleton;
	static dispatch_once_t pred;

	dispatch_once(&pred, ^{
		singleton = [[self alloc] init];
	});

	return singleton;
#endif
}

从实现代码上来看,RACEmptySignal 是以单例对象的形式返回

RACEmptySignal.m 文件里还重写了 -subscribe:

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
	NSCParameterAssert(subscriber != nil);

	return [RACScheduler.subscriptionScheduler schedule:^{
		[subscriber sendCompleted];
	}];
}

RACEmptySignal 信号被订阅之后会马上给订阅者发送 sendCompleted 事件

回到 flattenMap 的实现逻辑中,如果 block(value) 返回不是nil,它是如何返回 RACStream 对象的呢?

执行 block(value) 实际上就是触发 代码2 中的 BLOCK 2

BLOCK 2 中,又会触发另个 block(value),这时候会触发代码1中的 BLOCK 1,这里 block(value) 会返回相关对象(测试代码中返回NSNumber),BLOCK 2 中将 NSNumber 包装成 RACReturnSignal 返回,此时在 BLOCK 3 stream 对象就是 RACReturnSignal。

最后通过 bind 函数的变换,订阅会收到变换过后的值。

image-20190126201245265

mapReplace

测试代码:

- (void)testMapReplace {
    RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@1];
        [subscriber sendNext:@2];
        [subscriber sendNext:@3];
        
        [subscriber sendCompleted];
        return nil;
    }];
    
    [[sourceSignal mapReplace:@(10)] subscribeNext:^(id  _Nullable x) {
         NSLog(@"value = %@", x);
    }];
}

输出:

value = 10
value = 10
value = 10

底层实现:

- (__kindof RACStream *)mapReplace:(id)object {
	return [[self map:^(id _) {
		return object;
	}] setNameWithFormat:@"[%@] -mapReplace: %@", self.name, RACDescription(object)];
}

mapReplace 是通过封装 map 方法实现,把原信号每一个事件都变换成参数 object 传递给订阅者

reduceEach

测试代码:

- (void)testReduceEach {
    RACSignal *signal1 = [RACSignal createSignal:
                          ^RACDisposable *(id<RACSubscriber> subscriber)
                          {
                              [subscriber sendNext:RACTuplePack(@1,@2)];
                              [subscriber sendNext:RACTuplePack(@3,@4)];
                              [subscriber sendNext:RACTuplePack(@5,@6)];
                              [subscriber sendCompleted];
                              return [RACDisposable disposableWithBlock:^{
                                  NSLog(@"signal1 dispose");
                              }];
                          }];
    
    RACSignal *signal2 = [signal1 reduceEach:^id (NSNumber *num1 , NSNumber *num2){
        return @([num1 intValue] + [num2 intValue]);
    }];
    
    [signal2 subscribeNext:^(id  _Nullable x) {
        NSLog(@"value = %@", x);
    }];
}

输出:

value = 3
value = 7
value = 11
signal1 dispose

从输出结果看出来,reduceEach 将原信号 signal1 发送的 RACTuple 数据进行解包聚合发送给订阅者

底层实现:

/* 代码4 */

- (__kindof RACStream *)reduceEach:(RACReduceBlock)reduceBlock {
	NSCParameterAssert(reduceBlock != nil);

	__weak RACStream *stream __attribute__((unused)) = self;
	return [[self map:^(RACTuple *t) {
        // BLOCK 1
		NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
		return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
	}] setNameWithFormat:@"[%@] -reduceEach:", self.name];
}

reduceEach 也是通过封装 map 方法实现。reduceEach 方法的入参是 RACReduceBlock 类型的闭包

typedef id _Nonnull (^RACReduceBlock)();

首先通过断言判断 reduceBlock 闭包是否为nil,然后调用 map 方法,map 方法的入参也就是 BLOCK 1,在 BLOCK 1 里会先判断原信号 signal 发送的数据 t 是否为 RACTuple 类型,然后返回 RACBlockTrampoline 类型对象

@interface RACBlockTrampoline ()
@property (nonatomic, readonly, copy) id block;
@end

RACBlockTrampoline 内部会保存一个 block 对象,然后根据传进来的参数,动态的构造一个 NSInvocation,通过执行 NSInvocation 返回需要的数据。

- (id)invokeWithArguments:(RACTuple *)arguments {
    // 
	SEL selector = [self selectorForArgumentCount:arguments.count];
	NSInvocation *invocation = [NSInvocation invocationWithMethodSignature:[self methodSignatureForSelector:selector]];
	invocation.selector = selector;
	invocation.target = self;

	for (NSUInteger i = 0; i < arguments.count; i++) {
		id arg = arguments[i];
		NSInteger argIndex = (NSInteger)(i + 2);
		[invocation setArgument:&arg atIndex:argIndex];
	}

	[invocation invoke];
	
	__unsafe_unretained id returnVal;
	[invocation getReturnValue:&returnVal];
	return returnVal;
}

- (SEL)selectorForArgumentCount:(NSUInteger)count {
	NSCParameterAssert(count > 0);

	switch (count) {
		case 0: return NULL;
		case 1: return @selector(performWith:);
		case 2: return @selector(performWith::);
		case 3: return @selector(performWith:::);
		case 4: return @selector(performWith::::);
		case 5: return @selector(performWith:::::);
		case 6: return @selector(performWith::::::);
		case 7: return @selector(performWith:::::::);
		case 8: return @selector(performWith::::::::);
		case 9: return @selector(performWith:::::::::);
		case 10: return @selector(performWith::::::::::);
		case 11: return @selector(performWith:::::::::::);
		case 12: return @selector(performWith::::::::::::);
		case 13: return @selector(performWith:::::::::::::);
		case 14: return @selector(performWith::::::::::::::);
		case 15: return @selector(performWith:::::::::::::::);
	}

	NSCAssert(NO, @"The argument count is too damn high! Only blocks of up to 15 arguments are currently supported.");
	return NULL;
}
- (id)performWith:(id)obj1 :(id)obj2 {
	id (^block)(id, id) = self.block;
	return block(obj1, obj2);
}

...

-invokeWithArguments` 中首先通过判断 RACTuple 元素数量选择对应的 selector,最大能支持 15 个元素。

确定好 NSInvocation 的 target 和 seletor,还需要设置参数

for (NSUInteger i = 0; i < arguments.count; i++) {
    id arg = arguments[i];
    NSInteger argIndex = (NSInteger)(i + 2);
    [invocation setArgument:&arg atIndex:argIndex];
}

这里看到设置 Type Encodings 的时候是从偏移量为 2开始,这是因为偏移量0和 偏移量1的参数分别对应着隐藏参数self 和 _cmd。

构造好 invocation 之后,执行 [invocation invoke] 调用动态方法,实际上就是执行代码4中的入参 reduceBlock 闭包,最后通过 [invocation getReturnValue:&returnVal] 拿到闭包的返回值,也就是 RACBlockTrampoline 的最终返回值,最终成为 map 闭包里面的返回值。剩下的就是 map 函数流程。

or

测试代码

RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
    [subscriber sendNext:RACTuplePack(@NO,@YES,@NO)];
    [subscriber sendNext:RACTuplePack(@YES,@NO)];

    [subscriber sendCompleted];
    return nil;
}];

RACSignal *orSignal = [sourceSignal or];
[orSignal subscribeNext:^(id  _Nullable x) {
    NSLog(@"value = %@", x);
}];

输出

value = 1
value = 1
- (RACSignal *)or {
	return [[self map:^(RACTuple *tuple) {
		NSCAssert([tuple isKindOfClass:RACTuple.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
		NSCAssert(tuple.count > 0, @"-or must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");

		return @([tuple.rac_sequence any:^(NSNumber *number) {
            /// anyBlock
			NSCAssert([number isKindOfClass:NSNumber.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);

			return number.boolValue;
		}]);
	}] setNameWithFormat:@"[%@] -or", self.name];
}

这里主要是将 RACTuple 转换成 RACTupleSequence,因为 RACTupleSequence 是继承 RACSequence,这里就会调用 RACSequence 的 - (BOOL)any:(BOOL (^)(id))block 方法

//  RACSequence.m

- (BOOL)any:(BOOL (^)(id))block {
    NSCParameterAssert(block != NULL);

    /// block = BOOL ^(NSNumber *number) {return number.boolValue;}
    return [self objectPassingTest:block] != nil;
}
//  RACSequence.m

- (id)objectPassingTest:(BOOL (^)(id))block {
    NSCParameterAssert(block != NULL);

    /// block = BOOL ^(NSNumber *number) {return number.boolValue;}
    return [self filter:block].head;
}

- (BOOL)any:(BOOL (^)(id))block 实现里面最终会执行 RACStream 中的 filter 方法

- (__kindof RACStream *)filter:(BOOL (^)(id value))block {
    NSCParameterAssert(block != nil);

    Class class = self.class;
    
    return [[self flattenMap:^ id (id value) {
        // flattenMapBlock

        /// block = BOOL ^(NSNumber *number) {return number.boolValue;}
        if (block(value)) { 
            return [class return:value];
        } else {
            return class.empty;
        }
    }] setNameWithFormat:@"[%@] -filter:", self.name];
}

在 flattenMapBlock 中会以 RACTupleSequence 发出的信号值传入到 block中,这些值信号值也就是一开始测试代码中 sourceSignal 发送给订阅者的 RACTuple 中的元素,如果value对应的BOOL值是YES,就转换成一个 RACTupleSequence 信号。如果对应的是NO,则转换成一个empty信号。

上面的 flattenMap 方法实际上最终调用到 RACSequence 的 -bind 方法

/*
    block = ^{
        return ^(id value, BOOL *stop) {
            id stream = block(value) ?: [class empty];
            NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);

            return stream;
        };

    bindBlock = ^(id value, BOOL *stop) {
            id stream = block(value) ?: [class empty];
            NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);

            return stream;
        };
*/

- (RACSequence *)bind:(RACSequenceBindBlock (^)(void))block {
    RACSequenceBindBlock bindBlock = block();
    return [[self bind:bindBlock passingThroughValuesFromSequence:nil] setNameWithFormat:@"[%@] -bind:", self.name];
}

然后继续调用 - (RACSequence *)bind:(RACSequenceBindBlock)bindBlock passingThroughValuesFromSequence:(RACSequence *)passthroughSequence 这里参数 passthroughSequence 为 nil

- (RACSequence *)bind:(RACSequenceBindBlock)bindBlock passingThroughValuesFromSequence:(RACSequence *)passthroughSequence {

    __block RACSequence *valuesSeq = self;
    __block RACSequence *current = passthroughSequence;
    __block BOOL stop = NO;

    RACSequence *sequence = [RACDynamicSequence sequenceWithLazyDependency:^ id {
        /// dependencyBlock

        while (current.head == nil) {
            if (stop) return nil;

            // We've exhausted the current sequence, create a sequence from the
            // next value.
            id value = valuesSeq.head;

            if (value == nil) {
                // We've exhausted all the sequences.
                stop = YES;
                return nil;
            }

            current = (id)bindBlock(value, &stop);
            if (current == nil) {
                stop = YES;
                return nil;
            }

            valuesSeq = valuesSeq.tail;
        }

        NSCAssert([current isKindOfClass:RACSequence.class], @"-bind: block returned an object that is not a sequence: %@", current);
        return nil;
    } headBlock:^(id _) {
        return current.head;
    } tailBlock:^ id (id _) {
        if (stop) return nil;

        return [valuesSeq bind:bindBlock passingThroughValuesFromSequence:current.tail];
    }];

    sequence.name = self.name;
    return sequence;
}

上面代码的关键逻辑是调用 RACDynamicSequence 类的 sequenceWithLazyDependency 方法,dependencyBlock 会在之前提及到的函数 -objectPassingTest 中,执行 RACSequence 的 -head 方法中触发

在 方法,dependencyBlock 中,执行current = (id)bindBlock(value, &stop); 根据value的布尔值来产生新的信号,如果为 NO 则返回 RACEmptySequence 类型,此时 current.head 为 nil,进行下一次循环;如过 value 为YES,则返回 RACUnarySequence 类型,current.head 不为 nil,结束循环。

any:

测试代码:

- (void)testAny {
    RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@1];
        [subscriber sendNext:@2];
        [subscriber sendNext:@3];
        
        [subscriber sendCompleted];
        return nil;
    }];
    
    [[sourceSignal any:^BOOL(id  _Nullable object) {
        return [object integerValue] < 2;
    }] subscribeNext:^(NSNumber * _Nullable x) {
        NSLog(@"value = %@", x);
    }];
}

输出:

value = 1
value = 0
value = 0

底层实现:

- (RACSignal *)any:(BOOL (^)(id object))predicateBlock {
	NSCParameterAssert(predicateBlock != NULL);

	return [[[self materialize] bind:^{
		return ^(RACEvent *event, BOOL *stop) {
			if (event.finished) {
				*stop = YES;
				return [RACSignal return:@NO];
			}

			if (predicateBlock(event.value)) {
				*stop = YES;
				return [RACSignal return:@YES];
			}

			return [RACSignal empty];
		};
	}] setNameWithFormat:@"[%@] -any:", self.name];
}

首先原信号会 通过 -materialize 方法转换成 RACEvent 对象

- (RACSignal *)materialize {
	return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
		return [self subscribeNext:^(id x) {
			[subscriber sendNext:[RACEvent eventWithValue:x]];
		} error:^(NSError *error) {
			[subscriber sendNext:[RACEvent eventWithError:error]];
			[subscriber sendCompleted];
		} completed:^{
			[subscriber sendNext:RACEvent.completedEvent];
			[subscriber sendCompleted];
		}];
	}] setNameWithFormat:@"[%@] -materialize", self.name];
}
- (BOOL)isFinished {
	return self.eventType == RACEventTypeCompleted || self.eventType == RACEventTypeError;
}

从上面代码发现,RACEvent 如果收到原信号的 sendCompleted / sendError,finished 属性会置为 YES

-any 方法中,先判断 RACEvent 的 finished 属性,如果为 YES,stop接下来的信号 则返回 [RACSignal return:@NO];反之,则会根据入参 predicateBlock 闭包,将 RACEvent 的 value 传入 predicateBlock,如果返回值为YES,stop接下来的信号,则返回 [RACSignal return:@YES];如果 predicateBlock 返回值为 NO,则返回 [RACSignal empty]。

简单地总结来说,any 方法根据 predicateBlock 来对原信号的每一个信号进行判断,若遇到返回 YES 的条件,就给订阅者发送 YES 信号,然后发送 sendCompleted;若 predicateBlock 没有返回 YES 的条件,则最后给 订阅者 发送 NO 信号。

all

- (RACSignal *)all:(BOOL (^)(id object))predicateBlock {
    NSCParameterAssert(predicateBlock != NULL);
    
    return [[[self materialize] bind:^{
        return ^(RACEvent *event, BOOL *stop) {
            if (event.eventType == RACEventTypeCompleted) {
                *stop = YES;
                return [RACSignal return:@YES];
            }
            
            if (event.eventType == RACEventTypeError || !predicateBlock(event.value)) {
                *stop = YES;
                return [RACSignal return:@NO];
            }
            
            return [RACSignal empty];
        };
    }] setNameWithFormat:@"[%@] -all:", self.name];
}

all 方法可以理解为 any 方法的对立面,原信号如果发送 sendError 或者 predicateBlock 返回为 NO,就会结束信号的传递,并会给订阅者发值为 NO 的信号。如果整个订阅过程中都没有出现错误以及都满足 predicateBlock 为真的条件,最后会在 RACEventTypeCompleted 的时候发送 YES。

repeat

测试代码:

RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
    [subscriber sendNext:@1];

    [subscriber sendCompleted];
    return nil;
}];

RACSignal *orSignal = [sourceSignal repeat];
[orSignal subscribeNext:^(id  _Nullable x) {
    NSLog(@"value = %@", x);
}];

输出:

value = 1
value = 1
value = 1
...

执行之后会不断地打印 value = 1

- (RACSignal *)repeat {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        return subscribeForever(self,
                                ^(id x) {
                                    [subscriber sendNext:x];
                                },
                                ^(NSError *error, RACDisposable *disposable) {
                                    [disposable dispose];
                                    [subscriber sendError:error];
                                },
                                ^(RACDisposable *disposable) {
                                    // Resubscribe.
                                });
    }] setNameWithFormat:@"[%@] -repeat", self.name];
}

执行 -repeat 方法之后,内部会创建新的 RASignal,当新的信号被订阅的时候会执行 subscribeForever 方法

static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) {
	next = [next copy];
	error = [error copy];
	completed = [completed copy];

	RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

	RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) {
		RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
		[compoundDisposable addDisposable:selfDisposable];

		__weak RACDisposable *weakSelfDisposable = selfDisposable;

		RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) {
			@autoreleasepool {
				error(e, compoundDisposable);
				[compoundDisposable removeDisposable:weakSelfDisposable];
			}

			recurse();
		} completed:^{
			@autoreleasepool {
				completed(compoundDisposable);
				[compoundDisposable removeDisposable:weakSelfDisposable];
			}

			recurse();
		}];

		[selfDisposable addDisposable:subscriptionDisposable];
	};

	// Subscribe once immediately, and then use recursive scheduling for any
	// further resubscriptions.
	recursiveBlock(^{
		RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];

		RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock];
		[compoundDisposable addDisposable:schedulingDisposable];
	});

	return compoundDisposable;
}

subscribeForever 方法有4个参数,分别是源信号 signal,next 闭包,error 闭包和 complete 闭包。函数内部先定义好递归的闭包:recursiveBlock,recursiveBlock 中 首先对 signal 进行订阅,如果源信号 signal 发送 sendError 或者 sendCompleted,就会执行对应的 error/complete 闭包,然后就执行 recursiveBlock 的参数闭包 recurse。

subscribeForever 最后是执行 recursiveBlock 并传入具体的 recurse。

recurse 中首先获取当前的递归调度器 recursiveScheduler,然后执行 -scheduleRecursiveBlock 方法

- (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock {
	RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

	[self scheduleRecursiveBlock:[recursiveBlock copy] addingToDisposable:disposable];
	return disposable;
}

- (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable {
	@autoreleasepool {
		RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
		[disposable addDisposable:selfDisposable];

		__weak RACDisposable *weakSelfDisposable = selfDisposable;

        // 最终调用 schedule 方法
		RACDisposable *schedulingDisposable = [self schedule:^{
            /// scheduleBlock
            
			@autoreleasepool {
				// At this point, we've been invoked, so our disposable is now useless.
				[disposable removeDisposable:weakSelfDisposable];
			}

			if (disposable.disposed) return;

			void (^reallyReschedule)(void) = ^{
				if (disposable.disposed) return;
				[self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable];
			};

			// Protects the variables below.
			//
			// This doesn't actually need to be __block qualified, but Clang
			// complains otherwise. :C
			__block NSLock *lock = [[NSLock alloc] init];
			lock.name = [NSString stringWithFormat:@"%@ %s", self, sel_getName(_cmd)];

			__block NSUInteger rescheduleCount = 0;

			// Set to YES once synchronous execution has finished. Further
			// rescheduling should occur immediately (rather than being
			// flattened).
			__block BOOL rescheduleImmediately = NO;

			@autoreleasepool {
				recursiveBlock(^{
					[lock lock];
					BOOL immediate = rescheduleImmediately;
					if (!immediate) ++rescheduleCount;
					[lock unlock];

					if (immediate) reallyReschedule();
				});
			}

			[lock lock];
			NSUInteger synchronousCount = rescheduleCount;
			rescheduleImmediately = YES;
			[lock unlock];

			for (NSUInteger i = 0; i < synchronousCount; i++) {
				reallyReschedule();
			}
		}];

		[selfDisposable addDisposable:schedulingDisposable];
	}
}

/// RACTargetQueueScheduler.m
- (RACDisposable *)schedule:(void (^)(void))block {
	NSCParameterAssert(block != NULL);

	RACDisposable *disposable = [[RACDisposable alloc] init];

	dispatch_async(self.queue, ^{
		if (disposable.disposed) return;
		[self performAsCurrentScheduler:block];
	});

	return disposable;
}

可以看到会调用到自己的 -schedule 方法,这里测试代码中是在主线程,获取到对应的是 RACTargetQueueScheduler 。这里的 -schedule 方法先判断原信号有没有disposed,若果没有,则把参数 block 放在对应的队列中触发。

可以看得到上面函数中 scheduleBlock 里不断递归执行 [self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable],recursiveBlock 不断被触发,对应的 subscribeForever 中的 recurse() 循环执行:

scheduleRecursiveBlock->recursiveBlock->recurse()->reallyReschedule()->scheduleRecursiveBlock

也就是说源信号会循环被订阅触发其给订阅者发送 sendNext 事件,直到源信号发送 error 才结束。

retry:

测试代码:

- (void)testRetry {
    RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@1];
        [subscriber sendNext:@2];
        
        [subscriber sendError:[NSError errorWithDomain:@"domain" code:-1 userInfo:nil]];
        
        [subscriber sendNext:@3];
        
        [subscriber sendCompleted];
        return nil;
    }];
    
    RACSignal *retrySignal = [sourceSignal retry:2];
    [retrySignal subscribeNext:^(id  _Nullable x) {
        NSLog(@"value = %@", x);
    }];
}

输出:

value = 1
value = 2
value = 1
value = 2
value = 1
value = 2
- (RACSignal *)retry:(NSInteger)retryCount {
	return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
		__block NSInteger currentRetryCount = 0;
		return subscribeForever(self,
			^(id x) {
				[subscriber sendNext:x];
			},
			^(NSError *error, RACDisposable *disposable) {
                // 当原始信号发送 sendError 时
				if (retryCount == 0 || currentRetryCount < retryCount) {
					// Resubscribe.
					currentRetryCount++;
					return;
				}

				[disposable dispose];
				[subscriber sendError:error];
			},
			^(RACDisposable *disposable) {
                // 当原信号发送 sendCompleted
				[disposable dispose];
				[subscriber sendCompleted];
			});
	}] setNameWithFormat:@"[%@] -retry: %lu", self.name, (unsigned long)retryCount];
}

-retry: 实现与 -repeat 实现类似,基于 subscribeForever-retry: 是内部维护一个 currentRetryCount 变量,当原始信号发送 sendError 时判断重试次数 currentRetryCount 是否小于 retryCount,若是则重试,如果重试依旧收到 sendError,超过 retryCount 之后就会停止重试。

如果原信号没有发生错误,那么原信号在发送结束,当原信号发送 sendCompleted,subscribeForever 也就接受了,所以 -retry: 操作对于没有任何error的信号 和 直接订阅原信号表现一样。

将测试代码改为:

- (void)testRetryNoError {
    RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:@1];
        [subscriber sendNext:@2];        
        [subscriber sendCompleted];
        return nil;
    }];
    
    RACSignal *retrySignal = [sourceSignal retry:2];
    [retrySignal subscribeNext:^(id  _Nullable x) {
        NSLog(@"value = %@", x);
    }];
}

输出:

value = 1
value = 2

原信号发送 sendCompleted ,整个订阅流程就结束了。




原文地址:访问原文地址
快照地址: 访问文章快照