征服Swift傲视苍穹iOS《Swift》VIP专题博客积累

【Swift 脑洞系列】并行异步运算以及100行的`Promis

2016-08-29  本文已影响1751人  aaaron7

承接上一篇,轻松无痛实现异步操作串行。 如果没看过上一篇,阅读本篇可能会有点懵逼。

在上一篇文章中,我主要描述了如何实现异步串行运算符,+>。并演示了如何基于他来做一些诸如参数的传递和错误的处理等操作。

这篇文章中,我们会基于之前的发现,来实现异步并行运算符 <>。 以及基于 +><> 来做一些有趣的应用。

本文的主要内容:

第一部分 能够折叠异步并行操作的运算符

什么是折叠

首先,我们需要定义什么是异步并行? 就是我们同时执行多个异步操作,当所有操作都执行完毕后,执行异步(Complete)回调。比如我们已经有了用户的 ID,需要同时请求用户的头像和基本资料。在两个请求都拿到数据时,刷新界面。

在上一篇文章中,我们在提出运算符 +> 之前,提出了一个连接的概念。指的是把两个异步操作连接起来,一个执行完就执行另一个。通过连接,把两个异步操作合并为一个。

但现在异步并行,显然不能用连接,因为多个请求是一起发生的,没有先后顺序。在本文中,用折叠来表示把多个异步请求以并行的方式合并为一个的过程。

基本分析

首先,回忆一下我们异步串行运算符的签名:


typealias AsyncFunc = (info : AnyObject,complete:(AnyObject?,NSError?)->Void) -> Void

+> : (AsyncFunc,AsyncFunc) -> AsyncFunc

我们通过实现把两个异步操作折叠为一个,来实现串行折叠任意多个异步操作。

并行的思路也是一样的,我们只要实现并行折叠两个异步操作,我们就能折叠任意多个异步操作。

我们首先写出函数的签名:

func <>(left : AsyncFunc, right : AsyncFunc) -> AsyncFunc

为什么我们选择的串行异步运算符 +> 是非对称的,而并行异步运算符 <> 却是对称的呢?这还是由串行异步和并行异步两个运算的性质决定的,串行异步不满足交换律,因为串行就代表了运算本身有先后。而并行却没这个限制。a <> b == b <> a ,但 a +> b != b +> a

按照惯例,我们先根据函数的签名(返回一个函数),撸个基本的架子:

func <>(left : AsyncFunc , right : AsyncFunc) -> AsyncFunc{
    return { info, complete in
        
    }
}

架子搭好以后,我们来思考一下如何实现函数体, 有以下几个方面

这里的函数体,是指我们 return 后面的函数的函数体,而不是 <> 的函数体,如果一味思考后者,很容易懵逼。函数式编程的一个关键技巧就是通过类型来拆分抽象层次,局部具体,总体抽象。

实现异步折叠运算符

基于以上的分析,我们大概可以给出如下的实现:


func <>(left : AsyncFunc , right : AsyncFunc) -> AsyncFunc{
    return { info, complete in
        
        var leftComplete = false
        var rightComplete = false
        
        var leftResult:AnyObject? = nil
        var rightResult:AnyObject? = nil
        
        let checkComplete = {
            if leftComplete && rightComplete{
                let finalResult:[AnyObject] = [leftResult!, rightResult!]
                complete(finalResult, nil)
            }
        }
        
        left(info: info){result,error in
            guard error == nil else{
                complete(nil, error)
                return
            }
            
            leftComplete = true
            leftResult = result;
            checkComplete()
        }
        
        right(info: info){result,error in
            guard error == nil else{
                complete(nil, error)
                return
            }
            
            rightComplete = true
            rightResult = result;
            checkComplete()
        }
    }
}

上面的代码逻辑其实很简单,我们通过一个 checkComplete 函数来检查两个任务是否都已经完成,如果完成则合并两个异步函数返回的结果,并调用最外层的 complete 闭包。 两个异步函数则直接调用,在 complete 闭包中检查是否出错,没有则保存相应的结果,和置对应的标志位。

测试一下

        let delay = dispatch_time(DISPATCH_TIME_NOW, Int64(NSEC_PER_SEC))
        
        let test1:AsyncFunc = { _,complete in
            print("test1")
            
            dispatch_after(delay, dispatch_get_main_queue(), {
                complete(0,nil);
            })
        }
        
        let test2:AsyncFunc = { _,complete in
            print("test2")
            
            dispatch_after(delay, dispatch_get_main_queue(), {
                complete(0,nil);
            })
        }
        
        let test = test1 <> test2;
        
        test(info: 0){ _,_ in print("all finished")};

上述代码中,我们创建了两个异步操作:test1test2。 然后通过我们的并行折叠运算符 <> 折叠为一个: test。之后直接运行 test。

结果输出:

test1
test2
all finished

我们运行折叠后的函数,test1test2 都得到了调用,并且在都完成之后,调用了最外层的 complete 闭包:打印出了 all finished。看上去很完美。

精益求精

但是真的完美了吗?

在上述测试代码中,我们把 main_queue 换成 global_queue之后,我们会发现最外层的 complete 闭包被执行了两次,最终打印了两次 all finished, 这明显不是我们想要的结果。

上面的代码其实会有一个经典的多线程问题,如果 leftrightcomplete 闭包是并发调用的话,就有可能在执行完 leftComplete = true 的时候执行被切走,执行 rightcomplete 闭包,执行完 right 之后继续 left 这边的执行。这个时序就会导致最终被执行两次。

解决也很简单,我们只要加一个变量来当做互斥锁即可,最终的并行折叠运算符修改如下:

func <>(left : AsyncFunc , right : AsyncFunc) -> AsyncFunc{
    return { info, complete in
        
        var leftComplete = false
        var rightComplete = false
        var finishedComplete = false
        
        var leftResult:AnyObject? = nil
        var rightResult:AnyObject? = nil
        
        let checkComplete = {
            if leftComplete && rightComplete{
                objc_sync_enter(finishedComplete)
                if !finishedComplete{
                    let finalResult:[AnyObject] = [leftResult!, rightResult!]
                    complete(finalResult, nil)
                    finishedComplete = true
                }
                objc_sync_exit(finishedComplete)
            }
        }
        
        left(info: info){result,error in
            guard error == nil else{
                complete(nil, error)
                return
            }

            leftComplete = true
            leftResult = result;
            checkComplete()
        }
        
        right(info: info){result,error in
            guard error == nil else{
                complete(nil, error)
                return
            }

            rightComplete = true
            rightResult = result;
            checkComplete()
        }
    }
}

至此,我们拥有了一个优雅的并行折叠运算符:<>, 和 +> 一样。可以帮助我们简化代码,抽象逻辑。 当然,闲的蛋疼要对其玩一玩map/filter/reduce之类也是支持的,和上篇介绍的思路一样。在此不再赘述。

第二部分,100行实现类 PromiseKit 的接口

镜头切换到一些实际应用的场景,很多时候我们倾向于通过 closure 来组织逻辑,这样可以把本身就耦合的逻辑写在一个地方,也更容易维护。我们的并行折叠和串行连接运算符都是基于函数的,能不能应用在 closure based scenario 呢? let try it.

考虑接口易用性,我们 API 的设计可以直接参(shan)考(zhai) PromiseKit.

PromiseKit GitHub主页的 Readme 给了这样的一个例子:

firstly {
    when(NSURLSession.GET(url).asImage(), CLLocationManager.promise())
}.then { image, location -> Void in
    self.imageView.image = image
    self.label.text = "\(location)"
}.always {
    UIApplication.sharedApplication().networkActivityIndicatorVisible = false
}.error { error in
    UIAlertView(/*…*/).show()
}

我们来分析一下他都做了什么:

仅从 API 字面分析,本文不涉及 PromiseKit 内部真正的实现机制

基于以上的分析,我们一步步来实现这几个组件:

firstly

firstly用于接收第一个任务,任务书写是同步的方式,但必须异步运行。

func firstly(body : Void->Void)->Promise{
    let starter: AsyncFunc = { _,complete in
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,0)) {
            body();
            complete(0,nil);
        }
    }

    return Promise(starter: starter)
}

我们的 firstly 实现只做了两件事, 把第一个任务包成异步的,并用这个任务创建了一个 Promise 对象并返回。

因为所有任务最终都是由 Promise 对象来维护的,所以 firstly 只需要把第一个任务直接给他即可。

Promise 类基础

根据之前的分析,我们先把显而易见的架子撸出来:

class Promise {
    var chain : AsyncFunc
    var alwaysClosure : (Void->Void)?
    var errorClosure : (NSError?->Void)?
    
    init(starter : AsyncFunc){
        chain = starter
    }
    
    func then(body : AnyObject throws->Void )->Promise{
        //TO BE IMP
        return self
    }
    
    func always(closure : Void->Void)->Promise{
        alwaysClosure = closure
        return self
    }
    
    func error(closure : NSError?->Void)->Promise{
        errorClosure = closure
        fire()
        return self
    }
    
    func fire(){
        chain(info: 0) { (info, error) in
            if let always = self.alwaysClosure{
                always()
            }
            
            if error == nil{
                print("all task finished")
            }else{
                if let errorC = self.errorClosure{
                    errorC(error)
                }
            }
        }
    }
}

上述代码实现了除 then 函数之外的所有部件。我们把初始任务存在成员 chain 上面,然后分别用成员保存 error closurealways closure, 然后在注册完 error closure 之后调用 fire 来触发 chain 的执行,在 chain 执行完毕后分别执行 always 和是否出错来执行 error.

then, always, error 都返回 self, 实现链式调用。

至此,我们已经实现了能执行一个任务,并且实现 alwayserror 机制的 Promise 对象。

无限的、链式 then 块。

如之前所说,我们把 firstly 传进来的初始任务保存在 chain 这个成员中。那之后的 then 传入的其实就是后续的任务,比如有三个链式的 then,就代表我们需要串行的执行四个任务:初始任务,三个 then块的任务。

所以,我们的 then 函数可以这样来实现:

        let async: AsyncFunc = { info, complete in
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,0)) {
                var error : NSError?
                do{
                    try body(info)
                }catch let err as NSError{
                    error = err
                }
                complete(0,error)
            }
        }
        chain = chain +> async
        return self
    }

显而易见, then 做的事情和 firstly并无太多区别,首先把传进来的同步任务打包成异步,第二步是把新的任务通过异步串行运算符 +> 合并到成员 chain 中。这样,chain 保存的就不仅仅是初始任务,而是像一个累加器一样,有多少 then, chain就是最终合并的任务。这样,我们不管 then 多少次,每个 then 块中的任务都会被合并到 chain 里。最终我们只需要执行 chain, 即可触发所有任务的链式执行(因为合并用的是 +>)。

注意在 then 块中执行 body 的时候用了 do-catch 结构,目的就是在 then 块接受的任务可以通过 throw 抛出错误,然后在这里捕获,实现错误的感知(如果捕获到错误,则最终会调用 errorClosure

实现 when 函数

我们温习一下上文对 when 函数的分析:

when 函数接受两个同步的任务,同时触发两个任务并阻塞当前的执行,直到两个任务都完成。(异步并行的场景),这里虽然 when 会阻塞执行,但 when 本身是运行在主线程中的,也不会阻塞主线程。

根据 when 函数的定位,只要简单实现成独立的函数即可,不需要实现为 Promise类的成员。

func when(fstBody : (Void->Void), sndBody : (Void->Void)){
    let async1 : AsyncFunc = { _ , complete in
        dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,0)) {
            fstBody();
            complete(0,nil);
        }
    }
    
    let async2 : AsyncFunc = { _ , complete in
        dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT,0)) {
            sndBody();
            complete(0,nil);
        }
    }
    
    let async = async1 <> async2
    
    var finished = false
    
    async(info: 0) { (_, _) in
        finished = true
    }
    
    while finished == false {
        
    }
}

上述代码中,when 首先把传入的两个同步任务打包成一部,并通过异步并行运算符 <> 合并,然后直接执行合并后的结果。合并后的结果回调时(也就是两个任务都完成时),置 finishedtrue。 末尾用一个 whilefinishedfalse 时阻塞函数的执行。

至此,我们完成了一个最简单的 Promise 的封装, firstlyPromise 主类when 三个组件,加起来一共100行

老规矩,来测试一下

        firstly { () in
            when({ () in
                print(“begin fst job")
                sleep(1)
                print("fst job in when finished")
                }, sndBody: { () in
                    print(“begin snd job")
                    sleep(5)
                    print("snd job in when finished")
            })
        }.then { (info) in
            print("second job")
        }.then { (info) in
            print("third job")
        }.always { () in
            print("always block")
        }.error { (error) in
            print("error occurred")
        }

执行流程:同时执行 when 的两个任务,都完成之后按顺序执行 then, 最后执行 always。因为过程中没有error,所以 error 块没有被调用。

begin fst job
begin snd job
(间隔1秒)fst job in when finished
(间隔4秒)
snd job in when finished
second job
third job
always block

现在来简单修改一下代码,在 second job 里抛出一个 error:

        firstly { () in
            when({ () in
                print(“begin fst job")
                sleep(1)
                print("fst job in when finished")
                }, sndBody: { () in
                    print(“begin snd job")
                    sleep(5)
                    print("snd job in when finished")
            })
        }.then { (info) in
            print("second job")
            throw NSError(domain: "error", code: 0, userInfo: [:])
        }.then { (info) in
            print("third job")
        }.always { () in
            print("always block")
        }.error { (error) in
            print("error occurred")
        }

最终输出:

begin fst job
begin snd job
(间隔1秒)fst job in when finished
(间隔4秒)
snd job in when finished
second job
always block
error occurred

对比之前的结果,因为抛出了错误,所以 error 块得以执行,并且thrid job 没有执行,因为出错中断了 then 链的执行。

总结

本文所有代码: https://github.com/aaaron7/functional_async_demo

上一篇下一篇

猜你喜欢

热点阅读