Swift | 自定义顺序执行的异步Operation

2022-03-17  本文已影响0人  清無

需求:实现一系列异步(如网络请求)的操作,这些操作间彼此依赖(顺序执行),全部操作完成后进行通知(回调)。

最终实现

lazy var ops: [GCD.Task.Operation] = (1...10).map
    { i in
        let op = GCD.Task.Operation(taskId: .init(i), timeout: 3)
        op.handler =  { [weak self, weak op] in
            guard let strongSelf = self, let strongOp = op else { return }
            strongSelf.request(task: i, time: Double(i), completion: {
                strongSelf.label.text = "Finished: \(strongOp.taskId) ... ✅✅✅✅✅✅✅✅✅✅✅"
                GCD.asyncAfter(seconds: 1, handler: strongOp.completionBlock ?? {})
            })
        }
        return op
    }
    
    override func touchesBegan(_ touches: Set<UITouch>, with event: UIEvent?) {
        super.touchesBegan(touches, with: event)
        
        GCD.Task.async(
            operations: ops,
            queuePolicy: .serial,
            timeoutPolicy: .cancelOthers
        ) { [weak self] result in
            guard let self = self else {
                return
            }
            print("Tasks completed with result: \(result)")
            self.view.backgroundColor = result.isAllSuccess ? .systemGreen : .systemRed
        }
        ops[5].cancel()
        ops[6].cancel()
        ops[7].cancel()
    }
    
    func request(task: Int, time: TimeInterval, completion: (()->Void)?) {
        DispatchQueue.global().async { [weak self] in
            GCD.asyncInMainQueue {
                self?.label.text = "Started: \(task) ... ➡️➡️➡️➡️➡️➡️"
            }
            GCD.asyncAfter(seconds: time, in: .main, handler: completion ?? {})
        }
    }
    
    override func viewWillDisappear(_ animated: Bool) {
        super.viewWillDisappear(animated)
        
        GCD.Task.cancel(ops)
    }
extension GCD {
    public enum Task {
        public static func cancel(_ operations: [Operation]) {
            operations.forEach{ op in
                if !op.isCancelled, !op.isFinished {
                    op.cancel()
                }
            }
        }
        
        private static func executeResult(of operations: [Operation]) -> ExecuteResult {
            var result = ExecuteResult()
            for op in operations {
                if op.isCancelled {
                    result.cancelled.append(op.taskId)
                }
                else if op.isTimedout {
                    result.timedout.append(op.taskId)
                }
                else {
                    result.success.append(op.taskId)
                }
            }
            result.isAllSuccess = result.success.count == operations.count
            return result
        }
        
        public static func async(
            operations: [Operation],
            queuePolicy: QueuePolicy = .concurrent(maxCount: -1),
            timeoutPolicy: TimeoutPolicy = .waiting,
            notify notifyQueue: DispatchQueue = .main,
            completion: @escaping BlockT<ExecuteResult>
        )
        {
            GCD.asyncInGlobalQueue
            {
                var count = -1
                switch queuePolicy {
                case .serial:
                    count = 1
                case .concurrent(let maxCount):
                    count = maxCount
                }
                
                var isFinished = false
                let opQueue = GCD.Queue.operation(maxConcurrentCount: count)
                let newOperations = operations.filter{
                    !$0.isFinished && !opQueue.operations.contains($0)
                }
                for op in newOperations {
                    op.timedoutHandler = {
                        switch timeoutPolicy {
                        case .cancelOthers:
                            guard !isFinished else { break }
                            cancel(newOperations)
                        default: break
                        }
                        return timeoutPolicy
                    }
                    opQueue.addOperation(op)
                }
                opQueue.waitUntilAllOperationsAreFinished()
                
                notifyQueue.async{
                    isFinished = true
                    completion(self.executeResult(of: operations))
                }
            }
        }
    }
    
}

extension GCD.Task {
    public enum QueuePolicy {
        case serial
        case concurrent(maxCount: Int)
    }
    public enum TimeoutPolicy {
        case waiting
        case continueNext
        case cancelOthers
    }
    public struct ExecuteResult {
        public fileprivate(set) var success: [String] = []
        public fileprivate(set) var cancelled: [String] = []
        public fileprivate(set) var timedout: [String] = []
        public fileprivate(set) var isAllSuccess = false
    }
}

extension GCD.Task {
    public class Operation: Foundation.Operation {
        public var taskId: String
        public var timeouts: TimeInterval
        public var handler: BlockVoid?
        public private(set) var isTimedout = false
        internal var timedoutHandler: BlockR<TimeoutPolicy>?
        
        fileprivate lazy var completion: BlockVoid = {
            return { [weak self] in
                guard let strongSelf = self else { return }
                strongSelf.semaphore.signal()
            }
        }()
        public override var completionBlock: (() -> Void)? {
            set{}
            get {
                return completion
            }
        }
        private lazy var semaphore = DispatchSemaphore(value: 0)
        
        public enum Status {
            case ready, timedout, finished, cancelled
        }
        public var status: Status {
            switch (isTimedout, isFinished, isCancelled) {
            case (true, _, _): return .timedout
            case (_, true, _): return .finished
            case (_, _, true): return .cancelled
            default: return .ready
            }
        }
        
        public override var description: String {
            return "[Task: \(taskId)] [Status: \(status)]"
        }
        
        public init(
            taskId: String,
            timeout seconds: TimeInterval = 0
        ) {
            self.taskId = taskId
            self.timeouts = seconds
        }
        
        public override func main()
        {
            guard !isCancelled, let handler = handler else {
                return
            }
            GCD.Queue.global(qos: .userInitiated).queue.async(execute: handler)
            
            if timeouts > 0
            {
                switch (semaphore.wait(timeout: .now() + timeouts)) {
                case .timedOut:
                    isTimedout = true
                    guard let policy = timedoutHandler?() else {
                        return
                    }
                    switch policy {
                    case .continueNext: semaphore.signal()
                    default: break
                    }
                case .success:
                    break
                }
            }
            else {
                semaphore.wait()
            }
        }
    }
}

方法1

通过直接重写main方法,通过GCDsemaphore实现异步线程的同步阻塞,达到“串行”需求。

实现:

class SerialOperation: Operation {
    var task: Int
    init(task: Int) {
        self.task = task
    }
    override func main() {
        print("[\(task)] main Start")
/// 注意这里`value`为0时,若外部调用`wait()`则会阻塞当前operation所在的queue
/// 直到异步操作完成,释放信号量`semaphore.signal()`,才会执行下一个operation
        let semaphore = DispatchSemaphore(value: 0)
        DispatchQueue.global().async { [weak self] in
            guard let self = self else { return }

            print("[\(self.task)] Serial begin...\(Thread.current)")
            request(task: self.task, time: 3) {
                semaphore.signal()
                print("[\(self.task)] Serial end...\(Thread.current)")
            }
/*
            print("[\(self.task)] Serial begin...\(Thread.current)")
            Thread.sleep(forTimeInterval: 3)
            print("[\(self.task)] Serial end...\(Thread.current)")
            semaphore.signal() */
        }
        semaphore.wait()
        print("[\(task)] main End")
    }
}

调用:

let queue = OperationQueue()
// count为1类似于`异步串行`队列,达到operation`依赖`的效果
        queue.maxConcurrentOperationCount = 1
        for i in 1...3 {
            let serial = SerialOperation(task: i)
            serial.completionBlock = {
                print("Serial operation finished! \(i)")
            }
            queue.addOperation(serial)
        }

输出:

[1] main Start
[1] Serial begin...<NSThread: 0x600000936d40>{number = 6, name = (null)}
[1] Serial end...<NSThread: 0x600000936d40>{number = 6, name = (null)}
[1] main End
Serial operation finished! 1
[2] main Start
[2] Serial begin...<NSThread: 0x600000925a00>{number = 7, name = (null)}
[2] Serial end...<NSThread: 0x600000925a00>{number = 7, name = (null)}
[2] main End
[3] main Start
Serial operation finished! 2
[3] Serial begin...<NSThread: 0x600000936d40>{number = 6, name = (null)}
[3] Serial end...<NSThread: 0x600000936d40>{number = 6, name = (null)}
[3] main End
Serial operation finished! 3

方法2

重写start方法,并手动维护isFinished、isCacelled、isExecuting等状态,此种方式较为复杂。

实现:

/// 公用网络请求方法
func request(task: Int, time: TimeInterval, completion: (()->Void)?) {
    DispatchQueue.global().async {
        print("Requesting task: \(task) ... started")
        Thread.sleep(forTimeInterval: time)
        print("Requesting task: \(task) ... finished")
        DispatchQueue.main.async(execute: completion ?? {})
    }
}

class RequestOperation: Operation {
    var task: Int
    var time: TimeInterval
    
    init (task: Int, time: TimeInterval, completion: @escaping ()->Void) {
        self.task = task
        self.time = time
        
        super.init()
        self.completionBlock = completion
    }
    
    private var _finished: Bool = false {
        willSet{
            willChangeValue(forKey: "isFinished")
        }
        didSet{
            didChangeValue(forKey: "isFinished")
        }
    }
    private var _executing: Bool = false {
        willSet{
            willChangeValue(forKey: "isExecuting")
        }
        didSet{
            didChangeValue(forKey: "isExecuting")
        }
    }
    private var _cancelled: Bool = false {
        willSet{
            willChangeValue(forKey: "isCancelled")
        }
        didSet{
            didChangeValue(forKey: "isCancelled")
        }
    }
    
    override var isFinished: Bool {
        return _finished
    }
    override var isExecuting: Bool {
        return _executing
    }
    override var isCancelled: Bool {
        return _cancelled
    }
    override var isAsynchronous: Bool {
        return true
    }
    
    private func done() {
        super.cancel()
        
        _cancelled = true
        _executing = false
        _finished = true
        
        completionBlock?()
    }
    
    override func cancel() {
        objc_sync_enter(self)
        done()
        objc_sync_exit(self)
    }
    
    override func start() {
        guard !isCancelled else {
            done()
            return
        }
        _executing = true
        request(task: task, time: time) { [weak self] in
            self?.done()
        }
    }
}

调用:

let queue = OperationQueue()
        var previousOperation: Operation?
        for i in 1...3 {
            let op = RequestOperation(task: i, time: Double(3-i), completion: {
                print("Operation \(i) is completed!")
                if i == 3 {
                    print("All tasks are done !!!")
                }
            })
            if let previous = previousOperation {
                op.addDependency(previous)
            }
            queue.addOperation(op)
            previousOperation = op
        }

输出:

Requesting task: 1 ... started
Requesting task: 1 ... finished
Operation 1 is completed!
Requesting task: 2 ... started
Requesting task: 2 ... finished
Operation 2 is completed!
Requesting task: 3 ... started
Requesting task: 3 ... finished
Operation 3 is completed!
All tasks are done !!!
上一篇下一篇

猜你喜欢

热点阅读