03-计算限制的异步操作(上)

2018-07-04  本文已影响0人  _秦同学_

计算限制的异步操作(上)

一、协作式取消

.Net 对于视图取消操作的代码提供了标准的取消模式,称为协作式取消。

协作式取消:要取消的操作必须显示支持取消;

标准的协作式取消模式中两个FCL类型:

1.1 CancellationTokenSource

该类结构如下:

public sealed class CancellationTokenSource : IDisposable {
    public CancellationTokenSource();
    public void Dispose();  // 释放资源,如WaitHandle

    public Boolean IsCancellationRequset { get; }
    public CancellationToken Token { get; }

    public void Cancel();
    public void Cancel(Boolean throwOnFirstException);
}

其Token属性包含对一个或多个CancellationToken实例,并将它传递给操作。
对于使用 Token 的 Register 的多个回调,当使用 Cancel(Boolean throwOnFirstException) 方法时,对于throwOnFirstException参数:

1.2 CancellationToken

该类型是一个结构体类型,为值类型,其常用成员如下:

public struct CancellationToken {
    public static CancellationToken None {get;}

    public Boolean IsCancellationRequested { get; }  // 由通过非Task调用的操作调用
    public void ThrowIfCancelltionRequest();        // 由通过Task调用的操作调用

    public Boolean CanCanceled { get; }

    // CancellationTokenSource取消时,WaitHandle会收到信号
    public WaitHandle WaitHandle { get; }   
    public CancellationTokenRegistration Register(Action<Object> callBack, 
                                                  Object state, 
                                                  Boolean useSynchronizationContext);
}

该类型的实例包含一个对CancellationTokenSource对象引用的私有字段。在代码中可定时调用 IsCancellationRequested 属性判断当前操作是否需要提前退出。

通过 CancellationToken.None 这个静态属性,可返回一个不和任何 CancellationTokenSource 对象相关联的Token(该Token的对CancellationTokenSource 引用的私有字段为null)。其 CanBeCancelled 始终为false,而通过CancellationTokenSource返回的Token,该属性始终为true。

可调用 CancellationToken 的Register方法注册一个或多个将在关联的额CancellationTokenSource被取消时要调用的方法。

/// <param name="callBack">CancellationTokenSource取消时回调委托</param>
/// <param name="state">通过委托传给回调方法的状态值</param>
/// <param name="useSyncContext">是否使用调用线程的 SynchronizationContext 来调用委托</param>
public CancellationTokenRegistration Register(Action<Object> callBack, 
                                              Object state,  Boolean useSyncContext);

对于参数useSynchronizationContext:

对于返回值 CancellationTokenRegisteation 对象,存在一个Dispose方法,可以清楚Token上所登记的所有回调方法。当 CancellationTokenSource 被 Cancel 时,任何回调方法都不会被调用。

1.3 CancellationTokenSource 的链接操作

// 创建两个CancellationTokenSource
var cts1 = new CancellationTokenSource();
var cts2 = new CancellationTokenSource();

cts1.Token.Register(()=> Console.WriteLine("cts1 canceled."));
cts2.Token.Regsiter(()=> Console.WriteLine("cts2 canceled."));

// 创建一个新的CancellationTokenSource,它在cts1或cts2被取消时取消
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cts1.Token, cts2.Token);
linkedCts.Token.Register(()=> Console.WriteLine("linkedCts canceled."));

// 取消其中一个CancellationTokenSource对象
cts2.Cancel();

// 显示哪个CancellationTokenSource对象被取消了
Console.WriteLine("cts1 canceled={0}, cts2 canceled={1}, linkedCts={2}", 
cts1.IsCancellationRequested,cts2.IsCancellationRequested,linkedCts.IsCancellationRequested);

以上代码输出为:

linkedCts canceled
cts2 canceled
cts1 canceled=false, cts2.canceled=true, linkedCts=true

1.4 定时取消

可以通过以下方法构建一个定时自动取消的 CancellationTokenSource 对象,或调用 CancelAfter 方法在指定时间后自动取消:

public CancellationTokenSource(Int32 millisecondDelay);
public CancellationTokenSource(TimeSpan delay);
public void CancelAfter(Int32 millisecondDelay);
public void CancelAfter(TimeSpan delay);

二、任务

可使用两种方式创建一个Task:

  1. 通过构造方法,通过Action或Action<Object>来确定想要执行的操作;
  2. 通过静态Run方法,通过Action或Func>TResult<来确定想要执行的操作;

无论是构造器还是Run方法,都可以选择一个CancellationToken,它可以让Task能够在调度前取消。
在构造Task时,可以选择向构造器传递一些 TaskCreationOptions 标志来控制 Task 的执行方式,其定义如下:

[Flags, Serializable]
public enum TaskCreationOptions {
    None                = 0x0000,   // Default

    PreferFairness      = 0x0001,   // 【提议】TaskScheduler,希望任务尽快执行
    LongRunning         = 0x0002,   // 【提议】TaskScheduler,尽可能创建线程池线程
    
    AttachedToPrent     = 0x0004,   // 【设置】附加到它的父Task
    DenyChildAttach     = 0x0008,   // 【设置】拒绝任何子任务附加
    HideScheduler       = 0x0010    // 【设置】使用默认TaskScheduler,而不是父任务的Scheduler
}

TaskCreationOptions控制的是任务调度器TaskScheduler对Task的操作行为,对于TaskScheduler相关的设置,TaskScheduler可能会也可能不会采纳,而后三项只和Task自身相关,总是有效的。

2.1 等待任务完成并获取结果

对于通过构造器创建完毕的Task对象,可调用Start()方法来开启任务。
可以通过调用Wait方法或Result属性来堵塞调用线程等待任务执行完毕。

Result属性内部会调用 Wait;

如果Task还没有开始执行,系统可能(取决于TaskScheduler)使用调用Wait的线程来执行Task。在这种情况下,调用Wait的线程不会堵塞;它会执行Task并立即返回,好处在于,没有线程会被堵塞,所以减少了对资源的占用(因为不需要创建一个线程来替代被堵塞的线程),并提升了性能(因为不需要花时间创建线程,也没有上下文切换)。不好的地方在于,假如线程在调用Wait前已经获得了一个线程同步锁,而Task视图获取同一个锁,就会造成死锁的线程!

以上摘自《via clr c#》中的原话,此处并不理解。

Task既可以使用实例方法 Wait 来等待一个单个任务,也可以使用静态方法等待一个Task数组:

以上两个方法若通过一个CancellationToken取消,都会抛出一个OperationCancelledException;

2.2 任务中的异常处理

默认情况下,Task代码中抛出的异常会被“吞噬”并存储到一个AggregateException对象中,线程池线程可以返回到线程池中;而当Task调用Wait系方法或Result属性来等待任务执行过程中,若任务代码发生了异常,则Wait系方法或Result属性将抛出一个System.AggregateException对象。

Task“吞噬”掉的异常可以向TaskScheduler的静态UnobervedTaskException事件等级一个回调方法,当这个Task会GC回收时,CLR的终结器线程就会引发这个事件:

TaskScheduler.UnobservedTaskException += TaskScheduler_UnobservedTaskException;
private void TaskScheduler_UnobservedTaskException(object sender, 
                                                   UnobservedTaskExceptionEventArgs e)
{
      UnhandledExceptionOccurred(e.Exception);
      e.SetObserved();
}

AggregateExceotion中包含一个InnerExceptions属性,该属性返回一个ReadOnlyCollection<Exception>集合对象(若父任务包含多个子任务,多个子任务中都抛出了异常,则集合中可能会包含多个对象)。
AggregateException提供的常用成员有:

调用Handle后,如果至少存在一个未处理异常,就抛出一个新的AggregateException,其中只包含未处理的异常。

2.3 取消任务

在Task创建的时候传入一个CancellationToken,将Task和Token进行关联。若Task未启动时被取消,那么Task永远不会完成,其Status属性为 Cenceled,同时,其IsCompleted属性为true,标识该任务已经被完成。此时若再调用Start()方法来开启任务,则会抛出一个 InvalidOperationException 异常,无法开启一个已完成的方法。

CancellationTokenSource cts = new CancellationTokenSource();
Task task = new Task(() => { Sum(10000, cts.Token); }, cts.Token);
cts.Cancel();
// 这里会抛出InvalidOperationException
task.Start();

在Task启动之后想要取消任务,则必须显示支持取消,将CancellationToken作为参数传递给回调方法(或使用Lambda表达式)。

Task对象虽然关联了CancellationToken但没有办法访问它。

完整的取消Demo如下:

static void Main(string[] args)
{
    CancellationTokenSource cts = new CancellationTokenSource();
    Task<int> task = new Task<int>(() => Sum(10000000, cts.Token), cts.Token);
    task.Start();

    cts.Cancel();
    try {
        // 若显示取消时,任务还未完成,Result会抛出一个AggregateException
        Console.WriteLine("The sum is:" + task.Result);
    }
    catch (AggregateException ex) {
        // 将所有 OperationCanceledException 都视为已处理
        // 其它任何异常在Handle中都会抛出一个AggregateException,其中只包含未处理的异常
        ex.Handle(e => e is OperationCanceledException);
        Console.WriteLine("Sum was canceled.");
    }
    Console.ReadLine();
}

public static int Sum(int n, CancellationToken token)
{
    int sum = 0;
    for (; n > 0; n--)
    {
        if (token.IsCancellationRequested) break;
        checked { sum += n; };
        // checked 溢出时抛出异常关键字
    }
    return sum;
}

2.4 任务完成时启动新任务

在任务未完成时调用Wait方法,极有可能造成线程池创建新线程。

ContinueWith用来注册当前Task执行完毕的后续任务。Task对象可多次调用ContinueWith,这样Task完成后,所有ContinueWith任务都会进入线程池队列中,使用线程池线程来完成后续任务。

TaskContinuationOptions定义如下:

[Flags, Serializable]
public enum TaskContinuationOptions {
    None                        = 0x0000,   // 默认
    PreferFairness              = 0x0001,   // 提议TaskScheduler尽快执行任务
    LongRunning                 = 0x0002,   // 提议TaskScheduler尽可能创建线程池线程
    AttachedToPrent             = 0x0004,   // 将当前Task和它的父Task关联
    DenyChildAttach             = 0x0008,   // 禁止关联子任务,否则抛出InvalidOperationExcetion
    HideScheduler               = 0x0010,   // 强迫子任务使用默调度器,而不是父任务的调度器

    LazyCancellation            = 0x0020,   // 除非前置任务完成,否则禁止延续任务完成
    ExecuteSynchronously        = 0x80000,  // 由执行前置任务的线程来完成当前延续任务
    // 指明在什么情况下允许运行ContinueWith任务
    NotOnRanToCompletion        = 0x10000,  
    NotOnFaulted                = 0x20000,
    NotOnCanceled               = 0x40000,
    OnlyOnCanceled              = NotOnRanToCompletion | NotOnFaulted,
    OnlyOnFaulted               = NotOnRanToCompletion | NotOnCanceled,
    OnlyOnRanToCompletion       = NotOnFaulted | NotOnCanceled
}

由于ContinueWith同时创建了一个新的Task,故TaskContinueationOptions也提供了TaskCreationOptions所有的选项,来设置新建的Task对象。枚举的其它选项值声明了ContinueWith任务执行的先决条件。
其中Task“完成”的各种状态如下:

Task<Int32> t = Task.Run(()=> Sum(10000));
t.ContinueWith(task => Console.WriteLine("The sum is: " + task.Result), 
               TaskContinuationOptions.OnlyOnRanToCompletion);
t.ContinueWith(task => Console.WriteLine("Sum threw: " + task.Exception.InnerException), 
               TaskContinuationOptions.OnlyOnFaulted);
t.ContinueWith(task => Console.WriteLine("Sum was canceled."), 
               TaskContinuationOptions.OnlyOnCanceled);

2.5 父子任务

任务支持父/子关系,在父任务的回调方法中可以创建新任务,并使用TaskCreationOptions将新任务指定为子任务。
只有所有子任务完成时,父任务才算结束。

Task<int[]> parentTasks = new Task<int[]>(() => {
    int[] results = new int[3];
    // Task<int[]> 父任务内部生成了3个子任务
    Action action0 = new Action(() => results[0] = Sum(CancellationToken.None, 10000));
    Action action1 = new Action(() => results[1] = Sum(CancellationToken.None, 20000));
    Action action2 = new Action(() => results[2] = Sum(CancellationToken.None, 300000));
    new Task(action0, TaskCreationOptions.AttachedToParent).Start();
    new Task(action1, TaskCreationOptions.AttachedToParent).Start();
    new Task(action2,TaskCreationOptions.AttachedToParent).Start();

    return results;
});
// 所有子任务完成,父任务才算完成
parentTasks.ContinueWith(parent => {
    try {
        // 任何一个子任务抛出异常,父任务都不算完成,查询Result属性报错
        Array.ForEach(parent.Result, Console.WriteLine);
    }
    catch (AggregateException ex) {
        Console.WriteLine(ex.GetBaseException().Message);
    }
    Console.WriteLine("Parent completed status is " + parent.Status);
});
// 启动父任务
parentTasks.Start();

输出结果为

算术运算导致溢出。
Parent completed status is Faulted

2.6 任务内部揭秘

每个Task对象都有构成任务状态的一组字段(9部分):

使用Task需要为这些状态分配内存,所以尽量使用ThreadPool.QueueUserWorkItem来节省资源;

Task和Task<T>对象都实现了IDisposable接口,默认Dispose方法都是关闭ManualResetEventSlim对象。不建议显示调用Dispose,应该让GC自己完成。

public enum TaskStatus {
    //Task对象被创建
    Created = 0,
    //该任务正在等待 .NET Framework 基础结构在内部将其激活并进行计划。
    WaitingForActivation = 1,
    
    WaitingToRun = 2,       //Task已经启动,但尚未开始执行。
    Running = 3,            //该任务正在运行,但尚未完成。

    //该任务已完成执行,正在隐式等待附加的子任务完成。
    WaitingForChildrenToComplete = 4,

    RanToCompletion = 5,    //已成功完成执行的任务。
    Canceled = 6,           //任务被取消。
    Faulted = 7             //由于未处理异常的原因而完成的任务。
}

Task状态说明:

Task出错时,其Exception属性返回一个AggregateException对象,其InnerExceptions集合包含了所有未处理的异常。

2.7 任务工厂

TaskFactory用来创建一组共享相同配置的Task对象。要向TaskFactory传递希望任务具有的 CancellationToken、TaskScheduler、TaskCreationOptions和TaskCpntinuationOptions等设置。

private static void Demo()
{
    Task parent = new Task(() => {
        CancellationTokenSource cts = new CancellationTokenSource();
        // 所有通过TaskFactory启动的任务都是子任务,且使用父任务的线程同步执行
        TaskFactory<int> tf = null;
        tf = new TaskFactory<int>(cts.Token, TaskCreationOptions.AttachedToParent, 
                                             TaskContinuationOptions.ExecuteSynchronously, 
                                             TaskScheduler.Default);

        // 该任务工厂创建并启动了3个子任务
        Task<int>[] childTasks = new Task<int>[] {
            tf.StartNew(()=> Sum(cts.Token, 10000)),
            tf.StartNew(()=> Sum(cts.Token, 20000)),
            tf.StartNew(()=> Sum(cts.Token, Int32.MaxValue)) // 将抛出OverFlowException
        };

        // 任何子任务抛出异常,就取消其余子任务
        for (int t = 0; t < childTasks.Length; t++) {
        TaskContinuationOptions continueOnFailed = TaskContinuationOptions.OnlyOnFaulted;
            childTasks[t].ContinueWith(task => cts.Cancel(), continueOnFailed);
        }

        // 所有子任务完成后,从成功完成的任务中找到返回的最大值,再将该值传给另一个任务来显示最大结果
        Func<Task<int>[], int> continueFunc = tasks => {
            var completedTasks = tasks.Where(t => !t.IsFaulted && !t.IsCanceled);
            return completedTasks.Max(item => item.Result);
        };
        // 当所有子任务都完成后创建一个Task,由于它是TaskFactory创建的,仍视为TaskFactory子任务
        Task<int> completedTask = tf.ContinueWhenAll(childTasks, continueFunc, 
                                                     CancellationToken.None);
        completedTask.ContinueWith(t => Console.WriteLine("The maximum is " + t.Result),
                                    TaskContinuationOptions.ExecuteSynchronously);
    });

    // 子任务完成后,也将显示任何未处理的异常
    parent.ContinueWith(p => {
        // 将所有文本放到一个StringBuilder中,就只用调用Console.WriteLine一次,
        // 因为这个任务可能和上面的任务并行执行,而我不希望任务的输出变得不连续
        string msg = "The following exception(s) occurred:" + Environment.NewLine;
        StringBuilder sb = new StringBuilder(msg);

        foreach (var e in p.Exception.Flatten().InnerExceptions) {
            sb.AppendLine(" " + e.GetType().ToString());
        }
        Console.WriteLine(sb.ToString());
    }, TaskContinuationOptions.OnlyOnFaulted);

    // 启动父任务,使它能够启动子任务
    parent.Start();
}

使用TaskFactory创建的所有任务都具有相同的配置,故ft.ContinueWhenAll返回的仍然是父任务的一个子任务,会用默认的TaskScheduler同步执行。通过向其传递CancellationToken.None来覆盖TaskFactory的CancellationToken,使其不能取消。

TaskFactory或TaskFactory<TResult>的静态ContinueWhenAll和ContinueWhenAny方法,会等待所有已创建的子任务完成后新建一个延续任务,该延续任务无论每个子任务的完成状态(Completion、Fault or Cancel)是怎样的都会执行。所以 TaskContinuationOption的以下标志是非法的:NotOnRanToCompletion, NotOnFaulted, NotOnCanceled,以及它们的组合标志。

2.8 任务调度器

TaskScheduler对象负责执行被调度的任务,同时向VS调试器公开任务信息。
FCL提供了两个派生自TaskScheduler的类型:

注意,同步上下文调度器实际上使任务代码放到GUI线程的队列中,并没有开启新的线程,它为Task更新UI提供了一种方式。
同时,线程池线程代码不能尝试更新UI组件,否则会抛出 InvalidOperationException。
如果有特殊任务调度需求,也可以自定义TaskScheduler派生类来完成需求。

// SynchronizationContextTaskScheduler实际安排任务在主线程执行
private readonly TaskScheduler m_syncContextTaskScheduler;
public MainWindow() {
    InitializeComponent();

    m_syncContextTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();

    txt.Text = "Synchronization Context Task Scheduler Demo";
    Visibility = Visibility.Visible;

    UpdateTime();
}

private CancellationTokenSource m_cts = null;
protected override void OnMouseLeftButtonDown(MouseButtonEventArgs e)
{
    if (m_cts != null) { // 一个操作正在进行,取消它
        m_cts.Cancel();
        m_cts = null;
    }
    else {   // 操作没有开始,启动它
    
        txt.Text = "Operation running";
        m_cts = new CancellationTokenSource();

        // 这个任务使用默认TaskScheduler,在一个线程池线程上执行
        Task<Int32> task = Task.Run(() => Sum(m_cts.Token, 20000), m_cts.Token);

        // 这些任务使用同步上下文任务调度器,在 GUI 线程上执行
        task.ContinueWith(t => txt.Text = "Result: " + t.Result, CancellationToken.None, 
                            TaskContinuationOptions.OnlyOnRanToCompletion, 
                            /*m_syncContextTaskScheduler*/TaskScheduler.Default);

        task.ContinueWith(t => txt.Text = "Operation Canceled.", CancellationToken.None, 
                            TaskContinuationOptions.OnlyOnCanceled, 
                            /*m_syncContextTaskScheduler*/TaskScheduler.Default);

        task.ContinueWith(t => txt.Text = "Operation Faulted.", CancellationToken.None, 
                            TaskContinuationOptions.OnlyOnFaulted, 
                            m_syncContextTaskScheduler);
    }
    base.OnMouseLeftButtonDown(e);
}

public static int Sum(CancellationToken token, int n) {
    int sum = 0;
    for (; n > 0; n--) {
        if (token.IsCancellationRequested) break;
        checked { sum += n; };
        Thread.Sleep(5);
    }
    return sum;
}
上一篇 下一篇

猜你喜欢

热点阅读