文章预览
本篇文章接着上篇继续分析常用的 RACSignal 方法的第四部分进行分析。
switchToLatest
测试代码:
- (void)testSwitchToLast {
RACSignal *signal0 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@2];
[subscriber sendCompleted];
});
return nil;
}];
RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@3];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[subscriber sendNext:@4];
[subscriber sendNext:@5];
[subscriber sendCompleted];
});
return nil;
}];
RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@6];
[subscriber sendCompleted];
return nil;
}];
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:signal0];
[subscriber sendNext:signal1];
[subscriber sendNext:signal2];
[subscriber sendCompleted];
return nil;
}];
[[sourceSignal switchToLatest] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
输出:
value = 0
value = 1
value = 3
value = 6
底层实现:
- (RACSignal *)switchToLatest {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACMulticastConnection *connection = [self publish];
RACDisposable *subscriptionDisposable = [[connection.signal
flattenMap:^(RACSignal *x) {
NSCAssert(x == nil || [x isKindOfClass:RACSignal.class], @"-switchToLatest requires that the source signal (%@) send signals. Instead we got: %@", self, x);
// -concat:[RACSignal never] prevents completion of the receiver from
// prematurely terminating the inner signal.
return [x takeUntil:[connection.signal concat:[RACSignal never]]];
}]
subscribe:subscriber];
RACDisposable *connectionDisposable = [connection connect];
return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
[connectionDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -switchToLatest", self.name];
}
-switchToLatest
内部原信号调用 -publish
转化为热信号,返回的 connection 会持有热信号,也就是其 signal 属性
- 对热信号执行
-flattenMap:
方法,通过断言判断热信号发送的信号值是否也是 RACSignal,也就是说 -switchToLatest
方法是处理高阶信号的
- flattenMap 内部会对热信号执行 concat: 方法,这里的传入参数为 [RACSignal never],RACSignal x 执行
takeUntil:
, 作用是为了防止热信号发送的 RACSignal 过早结束导致整个订阅都被结束
switch: cases: default:
测试代码:
- (void)testswitchCasesDefault {
RACSignal *signal0 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"00"];
return nil;
}];
RACSignal *signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"11"];
return nil;
}];
RACSignal *signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"22"];
[subscriber sendCompleted];
return nil;
}];
RACSignal *defaultSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1024];
[subscriber sendCompleted];
return nil;
}];
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"0"];
[subscriber sendNext:@"1"];
[subscriber sendNext:@"2"];
[subscriber sendCompleted];
return nil;
}];
NSDictionary *dict = @{@"0" : signal0,
@"1" : signal1,
@"2" : signal2
};
[[RACSignal switch:sourceSignal cases:dict default:defaultSignal] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
输出:
value = 00
value = 11
value = 22
底层实现:
+ (RACSignal *)switch:(RACSignal *)signal cases:(NSDictionary *)cases default:(RACSignal *)defaultSignal {
NSCParameterAssert(signal != nil);
NSCParameterAssert(cases != nil);
for (id key in cases) {
id value __attribute__((unused)) = cases[key];
NSCAssert([value isKindOfClass:RACSignal.class], @"Expected all cases to be RACSignals, %@ isn't", value);
}
NSDictionary *copy = [cases copy];
return [[[signal
map:^(id key) {
if (key == nil) key = RACTupleNil.tupleNil;
RACSignal *signal = copy[key] ?: defaultSignal;
if (signal == nil) {
NSString *description = [NSString stringWithFormat:NSLocalizedString(@"No matching signal found for value %@", @""), key];
return [RACSignal error:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorNoMatchingCase userInfo:@{ NSLocalizedDescriptionKey: description }]];
}
return signal;
}]
switchToLatest]
setNameWithFormat:@"+switch: %@ cases: %@ default: %@", signal, cases, defaultSignal];
}
-switch:case:default:
首先对参数 signal、case 做非空判断,然后遍历字典 case 并判断每一个 value 是否为 RACSignal 对象
- 订阅原信号进行 map 操作,根据 key 来从 case 里面取出对应的 RACSignal 并返回,如果取出来的是 nil,则取 defaultSignal,若 defaultSignal 为nil,则返回 error signal,这样原信号就被转化高阶信号,然后再进行 switchToLatest 操作,把最终的信号值发给订阅者
if: then: else:
测试代码:
- (void)testIfThenElse {
RACSignal *signalTrue = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"true"];
return nil;
}];
RACSignal *signalFalse = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"false"];
return nil;
}];
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@(NO)];
[subscriber sendNext:@(YES)];
[subscriber sendCompleted];
return nil;
}];
[[RACSignal if:sourceSignal then:signalTrue else:signalFalse] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];;
}
输出:
value = false
value = true
底层实现:
+ (RACSignal *)if:(RACSignal *)boolSignal then:(RACSignal *)trueSignal else:(RACSignal *)falseSignal {
NSCParameterAssert(boolSignal != nil);
NSCParameterAssert(trueSignal != nil);
NSCParameterAssert(falseSignal != nil);
return [[[boolSignal
map:^(NSNumber *value) {
NSCAssert([value isKindOfClass:NSNumber.class], @"Expected %@ to send BOOLs, not %@", boolSignal, value);
return (value.boolValue ? trueSignal : falseSignal);
}]
switchToLatest]
setNameWithFormat:@"+if: %@ then: %@ else: %@", boolSignal, trueSignal, falseSignal];
}
+if: then: else:
与 +switch: cases: default:
原理类似,把原信号发送的布尔值通过 map 操作转化对应的 RACSignal 返回。YES->trueSignal, NO->falseSignal,原信号被转化成高阶信号,再执行 switchToLatest ,后面逻辑与 +switch: cases: default:
一样
catch:
测试代码:
- (void)testCatch {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
NSError *error = [NSError errorWithDomain:@"" code:-1 userInfo:nil];
[subscriber sendError:error];
return nil;
}];
RACSignal *catchSignal = [sourceSignal catch:^RACSignal * _Nonnull(NSError * _Nonnull error) {
NSLog(@"excute catch block, error = %@", error);
return [RACSignal return:@"error text"];
}];
[catchSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
输出:
excute catch block, error = Error Domain= Code=-1 "(null)"
value = error text
底层实现:
- (RACSignal *)catch:(RACSignal * (^)(NSError *error))catchBlock {
NSCParameterAssert(catchBlock != NULL);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *catchDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
RACSignal *signal = catchBlock(error);
NSCAssert(signal != nil, @"Expected non-nil signal from catch block on %@", self);
catchDisposable.disposable = [signal subscribe:subscriber];
} completed:^{
[subscriber sendCompleted];
}];
return [RACDisposable disposableWithBlock:^{
[catchDisposable dispose];
[subscriptionDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -catch:", self.name];
}
当原信号发送 sendError 的时候把 error 传入 catchBlock 闭包并返回 RACSignal,通过断言判断返回的 signal 是否为空,然后再订阅 signal,把 signal 的信号发给最终的订阅者
try:
测试代码:
- (void)testTry {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"source signal dispose");
}];
}];
RACSignal *trySignal = [sourceSignal try:^BOOL(id _Nullable value, NSError * _Nullable __autoreleasing * _Nullable errorPtr) {
NSInteger i = [value integerValue];
if (i > 2) {
*errorPtr = [NSError errorWithDomain:@"" code:-1 userInfo:nil];
return NO;
}
return YES;
}];
[trySignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
[trySignal subscribeError:^(NSError * _Nullable error) {
NSLog(@"error = %@", error);
}];
}
输出:
value = 0
value = 1
value = 2
source signal dispose
error = Error Domain= Code=-1 "(null)"
source signal dispose
底层实现:
- (RACSignal *)try:(BOOL (^)(id value, NSError **errorPtr))tryBlock {
NSCParameterAssert(tryBlock != NULL);
return [[self flattenMap:^(id value) {
NSError *error = nil;
BOOL passed = tryBlock(value, &error);
return (passed ? [RACSignal return:value] : [RACSignal error:error]);
}] setNameWithFormat:@"[%@] -try:", self.name];
}
- tryBlock 根据原信号发送的信号值返回 passed 和 error(error 是以指针地址的形式传入,tryBlock 内部对其进行赋值)
- 如果 passed 为 YES,则将原信号发送的 value 包装成 RACSignal;若 passed 为 NO,则返回 RACErrorSignal ,通过这2包装,原信号会被包装成一个高阶信号
firstOrDefault: success: error:
测试代码:
- (void)testFirstOrDefaultSuccessError {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_global_queue(0, 0), ^{
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendCompleted];
});
return nil;
}];
BOOL success;
NSError *error;
id value = [sourceSignal firstOrDefault:@10 success:&success error:&error];
NSLog(@"value = %@|thread=%@", value, [NSThread currentThread]);
}
输出:
value = 0|thread=<NSThread: 0x608000075600>{number = 1, name = main}
底层实现:
- (id)firstOrDefault:(id)defaultValue success:(BOOL *)success error:(NSError **)error {
NSCondition *condition = [[NSCondition alloc] init];
condition.name = [NSString stringWithFormat:@"[%@] -firstOrDefault: %@ success:error:", self.name, defaultValue];
__block id value = defaultValue;
__block BOOL done = NO;
// Ensures that we don't pass values across thread boundaries by reference.
__block NSError *localError;
__block BOOL localSuccess;
[[self take:1] subscribeNext:^(id x) {
[condition lock];
value = x;
localSuccess = YES;
done = YES;
[condition broadcast];
[condition unlock];
} error:^(NSError *e) {
[condition lock];
if (!done) {
localSuccess = NO;
localError = e;
done = YES;
[condition broadcast];
}
[condition unlock];
} completed:^{
[condition lock];
localSuccess = YES;
done = YES;
[condition broadcast];
[condition unlock];
}];
[condition lock];
while (!done) {
[condition wait];
}
if (success != NULL) *success = localSuccess;
if (error != NULL) *error = localError;
[condition unlock];
return value;
}
- 定义好状态相关的变量
- value: 记录原信号发送的值
- done:判断原信号是否发送了 sendCompleted / sendError / sendNext
- localError:记录原信号发送的 NSError
- localSuccess:记录原信号是否发送过 sendCompleted / sendNext
- condition :同步用的锁
- 订阅原信号并通过 take: 方法只获取第一个信号值
- sendNext:condition 先加锁,记录 value,如果为nil,则保持为 defaultValue,condition 调用 broadcast 通知所有等待的线程
- sendCompleted:和 sendNext 类似
- sendError:如果之前没有发送过 sendNext,则会保存 error,然后 condition 调用 broadcast 通知所有等待的线程
- 订阅完原信号之后,会马上对 condition 进行加锁,一直等待原信号发送信号值并会阻塞当前线程
-firstOrDefault: success: error:
用法有点类似 python 中 await
,有点像协程的概念
要注意的是如果执行该方法的线程和原信号发送 sendCompleted / sendError / sendNext 是统一线程,很容易会造成死锁,因为函数内部会先对 condition 加锁一次(步骤3),然后收到原信号发送的信号值的时候幽会对 condition 加锁一次,这时候如果在统一线程会造成死锁
waitUntilCompleted:
测试代码:
- (void)testWaitUntilCompleted {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(2 * NSEC_PER_SEC)), dispatch_get_global_queue(0, 0), ^{
[subscriber sendNext:@0];
[subscriber sendNext:@1];
[subscriber sendCompleted];
});
return nil;
}];
NSError *error;
BOOL success = [sourceSignal waitUntilCompleted:&error];
NSLog(@"success = %@", @(success));
}
输出:
success = 1
底层实现:
- (BOOL)waitUntilCompleted:(NSError **)error {
BOOL success = NO;
[[[self
ignoreValues]
setNameWithFormat:@"[%@] -waitUntilCompleted:", self.name]
firstOrDefault:nil success:&success error:error];
return success;
}
- 先对原信号执行 ignoreValues 方法,过滤掉所有 sendNext 事件
- 执行 firstOrDefault: success: error: ,若原信号发送 sendCompleted,则返回 YES,若发送 sendError 则返回 NO
defer:
测试代码:
- (void)testDefer {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendCompleted];
return nil;
}];
[[RACSignal defer:^RACSignal * _Nonnull{
NSLog(@"execute defer block");
return sourceSignal;
}] subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
输出:
execute defer block
value = 0
底层实现:
+ (RACSignal *)defer:(RACSignal<id> * (^)(void))block {
NSCParameterAssert(block != NULL);
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [block() subscribe:subscriber];
}] setNameWithFormat:@"+defer:"];
}
+defer:
首先创建返回新的信号,然后执行 block,并订阅 block 返回的信号;+defer:
主要作用是延迟订阅,也就是在订阅前进行相关自定义的操作
initially:
测试代码:
- (void)testInitially {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@0];
[subscriber sendCompleted];
return nil;
}];
RACSignal *initialSignal = [sourceSignal initially:^{
NSLog(@"execute initial block");
}];
[initialSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = %@", x);
}];
}
输出:
execute initial block
value = 0
底层实现:
- (RACSignal *)initially:(void (^)(void))block {
NSCParameterAssert(block != NULL);
return [[RACSignal defer:^{
block();
return self;
}] setNameWithFormat:@"[%@] -initially:", self.name];
}
-initially:
是通过封装 defer 实现的,在 defer 的 block 中先执行block,在返回自己,也就是订阅原信号之前先进行 block 中的相关操作
deliverOn:
测试代码:
- (void)testDeliverOn {
RACSignal *sourceSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
[subscriber sendNext:@0];
[subscriber sendCompleted];
});
return nil;
}];
[[sourceSignal deliverOn:[RACScheduler mainThreadScheduler]] subscribeNext:^(id _Nullable x) {
NSLog(@"value = x|thread = %@", [NSThread currentThread]);
}];
[sourceSignal subscribeNext:^(id _Nullable x) {
NSLog(@"value = x|thread = %@", [NSThread currentThread]);
}];
}
输出:
value = 0|thread = <NSThread: 0x600000a77240>{number = 3, name = (null)}
value = 0|thread = <NSThread: 0x60c0000718c0>{number = 1, name = main}
底层实现:
- (RACSignal *)deliverOn:(RACScheduler *)scheduler {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return [self subscribeNext:^(id x) {
[scheduler schedule:^{
[subscriber sendNext:x];
}];
} error:^(NSError *error) {
[scheduler schedule:^{
[subscriber sendError:error];
}];
} completed:^{
[scheduler schedule:^{
[subscriber sendCompleted];
}];
}];
}] setNameWithFormat:@"[%@] -deliverOn: %@", self.name, scheduler];
}
-deliverOn:
当原信号发送 sendNext/sendError/sendCompleted 的时候,会根据传入的调度器 scheduler ,在对应的线程中把信号发给订阅者;换句话来说,就是通过传入 scheduler 指定对应线程接受信号
subscribeOn:
- (RACSignal *)subscribeOn:(RACScheduler *)scheduler {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *schedulingDisposable = [scheduler schedule:^{
RACDisposable *subscriptionDisposable = [self subscribe:subscriber];
[disposable addDisposable:subscriptionDisposable];
}];
[disposable addDisposable:schedulingDisposable];
return disposable;
}] setNameWithFormat:@"[%@] -subscribeOn: %@", self.name, scheduler];
}
-subscribeOn:
和 -deliverOn:
类似,主要区别在于 -subscribeOn:
是指定订阅动作所在的线程,也就是执行 didSubscribe 闭包的线程被指定,但是 sendNext/sendError/sendCompleted 所在线程是不确定的
deliverOnMainThread
- (RACSignal *)deliverOnMainThread {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
__block volatile int32_t queueLength = 0;
void (^performOnMainThread)(dispatch_block_t) = ^(dispatch_block_t block) {
int32_t queued = OSAtomicIncrement32(&queueLength);
if (NSThread.isMainThread && queued == 1) {
block();
OSAtomicDecrement32(&queueLength);
} else {
dispatch_async(dispatch_get_main_queue(), ^{
block();
OSAtomicDecrement32(&queueLength);
});
}
};
return [self subscribeNext:^(id x) {
performOnMainThread(^{
[subscriber sendNext:x];
});
} error:^(NSError *error) {
performOnMainThread(^{
[subscriber sendError:error];
});
} completed:^{
performOnMainThread(^{
[subscriber sendCompleted];
});
}];
}] setNameWithFormat:@"[%@] -deliverOnMainThread", self.name];
}
-deliverOnMainThread
和 -deliverOn
作用类似,前者是将 sendNext/sendError/sendCompleted 都放在主线程上执行
OSAtomicIncrement32 方法是对某个值进行自增计算并且是线程安全,如果当前线程是主线程并且 OSAtomicIncrement32 函数返回值是 1,说明当前主线程没有待执行的 sendNext/sendError/sendCompleted 事件,这时候会直接运行block,反之则把任务放在主线程异步执行
………………………………