C++多线程

第20章:线程、任务和同步

2018-12-07  本文已影响5人  MrDecoder

#1 概述

线程是程序中独立的指令流。使用C#编写任何程序时,都有一个入口点:Main()方法。程序从Main()方法的第一条语句开始执行,直到这个方法返回为止。


#2. 异步委托

创建线程的一个简单方式是定义一个委托,并异步调用它。委托是方法的类型安全的引用。Delegate类还支持异步地调用方法。在后台,Delegate类会创建一个执行任务的线程。

==委托使用线程池来完成异步任务。==

为了说明委托的异步特性,从一个需要一定的时间才能执行完毕的方法开始。TaskAWwhile()执行一定程度的耗时操作,其定义如下:

static int TaskAWhile(int data,int ms)
{
    Console.WriteLine("TaskAWhile started");
    Thread.Sleep(ms);
    Console.WriteLine("TaskAWhile completed");
    return ++data;
} 

要从委托中调用这个方法,必须定义一个有相同参数和返回类型的委托,如下面的TaskAWhileDelegate()所示:

public delegate int TaskAWhileDelegate(int data, int ms);

2.1 投票

一种技术是投票,并检查委托是否完成了它的任务,所创建的Delegate类提供了BeginInvoke()方法,在该方法中,可以传递用委托类型定义的输入参数。BeginInvoke()方法总是有AsyncCallback和object类型的两个额外参数。现在重要的是BeginInvoke()方法的返回类型:IAsyncResult。通过IAsyncResult,可以获得该委托的相关信息,并通过IsCompleted属性验证该委托是否完成了任务。只要委托没有完成其任务,程序的主线程就继续执行while循环。

static void Main(string[] args)
{
    //synchronous method call 
    //TaskAWhile(1,3000);

    //asychronous by using a delegate
    TaskAWhileDelegate dl = TaskAWhile;

    IAsyncResult ar = dl.BeginInvoke(1, 3000, null, null);
    while (!ar.IsCompleted)
    {
        //doing something else in the main thread.
        Console.Write(".");
        Thread.Sleep(50);
    }
    int result = dl.EndInvoke(ar);
    Console.WriteLine("result: {0}", result);
}

==如果在委托结束之前不等委托完成其任务就结束主线程,委托线程就会停止。==

2.2 等待句柄

等待异步委托的结果的第2种方式是使用与IAsyncResult相关联的等待句柄。使用AsyncWaitHandle属性可以访问等待句柄。这个属性返回一个WaitHandle类型的对象,它可以等待委托线程完成其任务。WaitOne()将一个超时时间作为可选的第一个参数,在其中可以定义要等待的最长时间。如果发生超时,WaitOne()就返回false,while循环会继续执行。如果等待操作成功,就用一个中断退出while循环,用委托EndInvoke方法接受结果。

static void Main(string[] args)
{
    //synchronous method call 
    //TaskAWhile(1,3000);

    //asychronous by using a delegate
    TaskAWhileDelegate dl = TaskAWhile;

    IAsyncResult ar = dl.BeginInvoke(1, 3000, null, null);

    while(true)
    {
        Console.Write(".");
        if (ar.AsyncWaitHandle.WaitOne(50, false))
        {
            Console.WriteLine("Can get result now");
            break;
        }
    }
    int result = dl.EndInvoke(ar);
    Console.WriteLine("result: {0}", result);
}

2.3 异步回调

等待委托结果的第3中方式是使用异步回调。在BeginInvoke()的第3个参数中,可以传递一个满足AsyncCallback委托的需求的方法。AsyncCallback委托定义了一个IAsyncResult类型参数,其返回类型是void。这里,把TaskAWhileCompleted()的地址赋予第3个参数,它满足AsyncCallback委托的需求。对于最后一个参数,可以传递任意对象,以便从回调方法中访问它。传递委托实例很有用,这样回调方法就可以使用它获得异步方法的结果。

现在,只要TaskAWhileDelegate委托完成其任务,就调用TaskAWhileCompleted()。不需要在主线程中等待结果。但是在委托线程的任务完成之前,不能停止主线程,除非主线程结束时停止的委托线程没有问题。

static void Main(string[] args)
{
    TaskAWhileDelegate dl = TaskAWhile;
    dl.BeginInvoke(1, 3000, TaskAWhileCompleted, dl);
    for (int i = 0; i < 100; i++)
    {
        Console.Write(".");
        Thread.Sleep(50);
    }
}

TaskAWhileCompleted声明如下:

static void TaskAWhileCompleted(IAsyncResult ar)
{
    if (ar == null)
    {
        throw new ArgumentException("ar");        
    }
    TaskAWhileDelegate dl = ar.AsyncState as TaskAWhileDelegate;
    Trace.Assert(dl != null, "Invalid object type.");

    int result = dl.EndInvoke(ar);
    Console.WriteLine("result: {0}", result);
}

==使用回调方法,必须注意这个方法从委托线程中调用,而不是从主线程中调用。==

除了定义一个单独的方法,并给它传递BeginInvoke(),lambda表达式也非常适合这种情况。参数ar是IAsyncResult类型。在实现代码中,不需要把一个值赋予BeginInvoke()方法的最后一个参数,因为lambda表达式可以直接访问该作用域外部的变量dl。但是,Lambda表达式的实现代码仍是从委托线程中调用,以这种方式定义方法时,这不是很明显。

static void Main(string[] args)
{
    TaskAWhileDelegate dl = TaskAWhile;
    dl.BeginInvoke(1, 3000, ar => {
        int result = dl.EndInvoke(ar);
        Console.WriteLine("result: {0}", result);
    }, 
    null);
    for (int i = 0; i < 100; i++) {
        Console.Write(".");
        Thread.Sleep(50);
    }
}

#3. Thread类

使用Thread类可以创建和控制线程。下面的代码是创建和启动一个新线程的简单例子。Thread类构造函数重载并接受ThreadStart和ParameterizedThreadStart类型的委托参数。ThreadStart委托定义了一个返回类型为void的无参数方法。在创建了Thread对象后,就可以调用Start()方法启动线程:

static void Main(string[] args)
{
    var t1 = new Thread(ThreadMain);
    t1.Start();
    Console.WriteLine("This is a main thread.");
}

static void ThreadMain()
{
    Console.WriteLine("Running a thread.");
}

3.1 给线程传递数据

给线程传递数据可以采用两种方式:

  1. 使用带ParameterizedThreadStart委托参数的Thread构造函数;
  2. 创建一个自定义类,把线程的方法定义为实例方法,这样可以初始化实例的数据,之后启动线程。

要给线程传递数据,需要某个存储数据的类或结构。这里定义了包含字符串Data结构,但可以传递任意对象。

public struct Data 
{
    public string Message;
}

如果使用了ParameterizedThreadStart委托,线程的入口点必须有一个object类型的参数,且返回类型为void,对象可以强制转换为任意数据类型,这里把消息写入控制台。

static void ThreadMainWithParameters(object o) {
    Data d = (Data)o;
    Console.WriteLine("Running a thread, received {0}",d.Message);
}

通过Thread类的构造函数,可以将新的入口点赋予ThreadMainWithParamters,传递变量d,以此调用Start()。

static void Main(string[] args)
{
    var d = new Data {
         Message = "Info"
    };
    var t2 = new Thread(ThreadMainWithParameters);
    t2.Start(d);
}

3.2 后台线程

只要有一个前台线程在运行,应用程序的进程就在运行。如果多个前台线程在运行,而Main()方法结束了,应用程序的进程就仍然是激活的,直到所有前台线程完成其任务为止。

在默认情况下,用Thread类创建的线程是前台线程。线程池中的线程总是后台线程

在用Thread类创建线程时,可以设置IsBackground属性,以确定该线程是前台线程还是后台线程。

static void Main(string[] args)
{
    var t1 = new Thread(ThreadMain) {
        Name = "MyNewThread",
        IsBackground = false,
    };
    t1.Start();
    Console.WriteLine("Main thread ending now.");
}

static void ThreadMain()
{
    Console.WriteLine("Thread {0} started",Thread.CurrentThread.Name);
    Thread.Sleep(3000);
    Console.WriteLine("Thread {0} completed",Thread.CurrentThread.Name);
}

3.3 线程的优先级

线程由操作系统调度。给线程指定优先级,就可以影响调度顺序。

在改变优先级之前。必须理解线程调度器。操作系统根据优先级来调度线程。调度优先级最高的线程以在CPU上运行。线程如果在等待资源,它就会停止运行,并释放CPU。

在Thread类中,可以设置Priority属性,以影响线程的基本优先级。Priority属性需要ThreadPrority枚举定义的一个值。定义的级别有HighestAboveNormalBelowNormalLowest

在给线程指定优先级的时候要小心,因为这可能降低其他线程的运行概率。根据需要,可以短暂的改变优先级。

3.4 控制线程

调用Thread对象的Start()可以创建线程。但是,在调用Start()后,新线程仍不是处于Running状态,而是处于Unstarted状态。只要操作系统的线程调度器选择了要运行的线程,线程就会改为Runnnig状态。读取Thread.ThreadState属性,就可以获得线程的当前状态。

使用Thread.sleep(),会使线程处于WaitSleepJoin状态,在经历Sleep()方法定义的时间段后,线程就会等待再次被唤醒。

要停止一个线程,可以调用Thread.abort()。调用这个方法时,会在接到终止命令的线程中抛出一个ThreadAbortException类型的异常。用一个处理程序捕获这个异常,线程可以在结束前完成一些清理工作。线程还可以在接受到调用Thread.ResetAbort()方法的结果ThreadAbortException异常后继续执行。如果线程没有重置终止,接受到终止请求的线程的状态就从AbortRequested改为Aborted。

如果需要等待线程的结束,就可以调用Thread.join()。Thread.join()会停止当前线程,并把它设置为WaitSleepJoin状态,直到加入的线程完成终止。


#4. 线程池

创建线程需要时间。如果有不同的小任务要完成,就可以事先创建许多线程,在应完成这些任务时发出请求。这个线程数最好在需要更多的线程数的时候增加,在需要释放资源的时候减少。

不需要自己创建这样一个列表。该列表由ThreadPool类托管。这个类在需要时增减池中的线程数,直到最大的线程数。池中的最大线程数是可配置的。在双核CPU中,默认设置为1023个工作线程和1000个I/O线程。

static void Main(string[] args)
{
    int nWorkerThreads;
    int nCompletePortThreads;
    ThreadPool.GetMaxThreads(out nWorkerThreads, out nCompletePortThreads);
    Console.WriteLine("Max worker threads: {0}" + " I/O completion threads: {1}", nWorkerThreads, nCompletePortThreads);
    for (int i = 0; i < 5; i++)
    {
        ThreadPool.QueueUserWorkItem(JobForAThread);
    }
    Thread.Sleep(3000);
}
        
static void JobForAThread(object state)
{
    for (int i = 0; i < 3; i++)
    {
        Console.WriteLine("loop {0},running inside pooled thread {1}", i, Thread.CurrentThread.ManagedThreadId);
        Thread.Sleep(50);
    }
}

线程池使用起来很简单,但它有一些限制:


#5. 任务

.NET 4包含新名称空间System.Threading.Tasks,它包含的类抽象了线程功能。在后台使用ThreadPool。任务表示要完成的某个单元的工作。这个单元的工作可以在单独的线程中运行,也可以以同步方式启动一个任务,这需要等待主调线程。使用任务不仅可以获得一个抽象层,还可以对底层线程进行很多控制。

5.1 启动任务

要启动任务,可以使用TaskFactory类或Task类的构造函数和Start()。Task类的构造函数在创建任务上提供的灵活性较大。

在启动任务时,会创建Task类的一个实例,利用Action或Action<object>委托(不带参数或带一个object参数),可以指定应运行的代码。这类似于Thread类。下面定义一个无参数的方法。在实现代码中,把任务ID写入控制台。

static void TaskMethod()
{
    Console.WriteLine("running in a task.");
    Console.WriteLine("Task id: {0}",Task.CurrentId);
}

在上面代码中,可以看到启动新任务的不同方式:

  1. 第一种方式使用实例化的TaskFactory类,在其中把TaskMethod()传递给StartNew(),就会立即启动任务。
  2. 第二种方式使用Task类的构造函数。实例化Task对象时,任务不会立即运行,而是指定Created状态。接着调用Task类的Start(),来启动任务。

使用Task类时,除了调用Start(),还可以调用RunSynchronously()。这样任务也会启动,但在调用者的当前线程中它正在运行,调用者需要等待任务结束。默认情况下,任务是异步运行的。

    //using task factory
    TaskFactory tf = new TaskFactory();
    Task t1 = tf.StartNew(TaskMethod);

    //using the task factory via a task
    Task t2 = Task.Factory.StartNew(TaskMethod);

    //using Task constructor
    Task t3 = new Task(TaskMethod);
    t3.Start();

使用Task类的构造函数和TaskFactory类的StartNew(),都可以传递TaskCreationOptions枚举中的值。设置LongRunning选项,可以通知任务调度器,该任务需要较长时间执行,这样调度器更可能使用新线程。如果该任务应关联到父任务上,而父任务取消了,则该任务也应取消,此时应设置AttachToParent选项。

5.2 连续的任务

通过任务,可以指定在任务完成后,应开始运行另一个特定任务,例如,一个使用前一个任务的结果的新任务,如果前一个任务失败了,这个任务应执行一些清理工作。

任务处理程序或者不带参数或者带一个对象参数,而连续处理程序有一个Task类型的参数,这里可以访问起始任务的相关信息:

static void DoOnFirst()
{
    Console.WriteLine("doing some task {0}",Task.CurrentId);
    Thread.Sleep(3000);
}

static void DoOnSecond(Task t)
{
    Console.WriteLine("task {0} finished",t.Id);
    Console.WriteLine("this task id {0}",Task.CurrentId);
    Console.WriteLine("do some cleanup");
    Thread.Sleep(3000);
} 

连续任务通过任务上调用ContinueWith()来定义。也可以使用TaskFactory类来定义。

    Task t1 = new Task(DoOnFirst);
    Task t2 = t1.ContinueWith(DoOnSecond);
    Task t3 = t1.ContinueWith(DoOnSecond);
    Task t4 = t2.ContinueWith(DoOnSecond);

5.3 任务层次架构

利用任务连续性,可以在一个任务结束后启动另一个任务。任务也可以构成一个层次结构。一个任务启动一个新任务时,就启动了一个父/子层次结构。

下面的代码段在父任务内部新建一个任务。创建子任务的代码与创建父任务的代码相同。唯一的区别是这个任务从另一个任务内部创建。

static void ParentAndChild() 
{
    var parent = new Task(ParentTask);
    parent.Start();
    Thread.Sleep(2000);
    Console.WriteLine(parent.Status);
    Thread.Sleep(4000);
    Console.WriteLine(parent.Status);
}

static void ParentTask() 
{
    Console.WriteLine("task id {0}",Task.CurrentId);
    var child = new Task(ChildTask);
    child.Start();
    Thread.Sleep(1000);
    Console.WriteLine("parent started child");
}

static void ChildTask() 
{
    Console.WriteLine("child");
    Thread.Sleep(5000);
    Console.WriteLine("child finished");
}

如果父任务在子任务之前结束,父任务的状态就显示为WaitingForChildrenToComplete。只要子任务也结束时,父任务的状态就变成RanToCompletion。当然,如果父任务用TaskCreationOptions枚举中的DetachedFromParent创建子任务时,这就无效。

5.4 任务的结果

任务结束时,它可以把一些有用的状态信息写到共享对象中。这个共享对象必须是线程安全的。另一个选项是使用返回某个结果的任务。使用Task类的泛型版本,就可以定义返回某个结果的任务返回类型。

为了返回某个结果任务调用的方法就可以声明为带任意返回类型。

static Tuple<int,int> TaskWithResult(object division) 
{
    Tuple<int,int> div = (Tuple<int,int>)division;
    int result = div.Item1/div.Item2;
    int reminder = div.Item1 % div.Item2;
    Console.WriteLine("task creates a result...")
    
    return Tuple.Create<int,int>(result,reminder);
}

定义一个调用TaskWithResult()的任务时,要使用泛型类Task<Result>。泛型参数定义了返回类型。通过构造函数,把这个方法传递给Func委托,第二个参数定义了输入值。因为这个任务在onject参数中需要两个输入值,所以还创建了一个元祖。接着启动该任务。

var t1 = new Task<Tuple<int,int>>(TaskWithResult,Tuple.Create<int,int>(8,3));
t1.Start();
Console.WriteLine(t1.Result);
t1.Wait();
Console.WriteLine("result from task: {0} {1}",t1.Result.Item1,t1.Result.Item2);

#6. Parallel类

在.Net4中,另一个新增的抽象线程是Parallel类。这个类定义了并行的for和foreach的静态方法。在为for和foreach定义的语言中,循环从一个线程中运行。Parallel类使用多个任务,因此使用多个线程来完成这个作业。

Parallel.For()和Parallel.ForEach()方法多次调用同一个方法,而Parallel.Invoke()允许同时调用不同的方法。

6.1 用Parallel.For()方法循环

Parallel.For()类似于C#的for循环语句,也是多次执行一个任务。使用Parallel.For(),可以并行运行迭代。迭代的顺序没有定义。

在For()中,前两个参数定义了循环的开头和结束。

ParallelLoopResult result = Parallel.For(0, 10, i =>
{
    Console.WriteLine("{0}, task: {1}, thread: {2}", i, 
        Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(50);
});
Console.WriteLine(result.IsCompleted);

也可以提前中断Parallel.For()。For()的一个重载版本接受3个Action<int,ParallelLoopState>类型的参数。使用这些参数调用ParallelLoopState的Break()或Stop(),以影响循环的结果。

注意,迭代的顺序没有定义。

ParallelLoopResult result = Parallel.For(10, 40, 
    (int i, ParallelLoopState pls) => {
    Console.WriteLine("i: {0} task {1}", i, Task.CurrentId);
    Thread.Sleep(10);
    if (i > 15)
    {
        pls.Break();
    }
});
Console.WriteLine(result.IsCompleted);
Console.WriteLine("lowest break iteration: {0}",
    result.LowestBreakIteration);

Parallel.For()方法可能使用几个线程来执行循环。如果需要对每个线程进行初始化,就可以使用Parallel.For<TLocal>()。

6.2 使用Parallel.ForEach()方法循环

Parallel.ForEach()方法遍历实现了IEnumerable的集合,其方式类似于foreach语句,但以异步方式遍历。这里也没有确定遍历顺序。

string[] data = { "zero", "one", "two", "three", "four", "five", "six" };
Parallel.ForEach<string>(data, s => {
    Console.WriteLine(s);
});

6.3 通过Parallel.Invoke()方法调用多个方法

如果多个任务应并行运行,就可以使用Parallel.Invoke()。Parallel.Invoke()允许传递一个Action委托数组,在其中可以指定应运行的方法。

static void ParallelInvoke() 
{
    Parallel.Invoke(Foo,Bar);
}

static void Foo() 
{
    Console.WriteLine("Foo");
}

static void Bar() 
{
    Console.WriteLine("Bar");
}

#7. 取消架构

.Net4包含一个新的取消架构,允许以标准方式取消长时间运行的任务。

取消架构基于协作行为,它不是强制的。长时间运行的任务会检查它是否被取消,并返回控制权。

支持取消的方法接受一个CancellationToken参数。这个类定义了IsCancellationRequested属性,其中长时间运行的操作可以检查它是否应终止。长时间运行的操作检查取消的方式有:取消标记时,使用标记的WaitHandle属性,或者使用Register()。Register()接受Action和ICancelableOperation类型的参数。Action委托引用的方法在取消标记时调用。

7.1 Parallel.For()方法的取消

Parallel类提供了For()的重载版本,在重载版本中,可以传递ParallelOptions类型的参数。使用该参数类型可以传递一个CancellationToken参数。CacellationToken参数通过创建CancellationTokenSource来生成。由于CancellationTokenSource实现了ICancelableOperation接口,因此可以用CancellationToken注册,并允许使用Cancel()取消操作。

static void Main(string[] args)
{
    var cts = new CancellationTokenSource();
    cts.Token.Register(() =>
    {
        Console.WriteLine("****token canceled");
    });

    //Start a task that sends a cancel to the 
    //cts after 500 ms
    new Task(() =>
    {
        Thread.Sleep(500);
        cts.Cancel(false);
    }).Start();

    try
    {
        ParallelLoopResult result = Parallel.For(0, 100,
            new ParallelOptions()
            {
                CancellationToken = cts.Token
            },
            x =>
            {
                Console.WriteLine("Loop {0} started", x);
                int sum = 0;
                for (int i = 0; i < 100; i++)
                {
                    Thread.Sleep(2);
                    sum += i;
                }
                Console.WriteLine("Loop {0} finished", x);
            });
    }
    catch (OperationCanceledException e)
    {
        Console.WriteLine(e.Message);
    }
    Console.ReadKey();
}

通过取消操作,所有其他的迭代操作都在启动之前就取消了。启动的迭代操作允许完成,因为取消操作总是以协作方式进行,以避免在取消迭代操作的中间泄漏资源。

7.2 任务的取消

同样的取消模式也可用于任务。首先,创建一个CancellationTokenSource。如果仅需要一个取消标记,就可以访问Task.Factory.CancellationToken,以使用默认的取消标记。

static void Main(string[] args)
{
    var cts = new CancellationTokenSource();
    cts.Token.Register(() =>
    {
        Console.WriteLine("***task canceled");
    });

    //Start a task that sends a cancel to the 
    //cts after 500ms
    Task.Factory.StartNew(() =>
    {
        Thread.Sleep(500);
        cts.Cancel();
    });

    var factory = new TaskFactory(cts.Token);
    Task t1 = factory.StartNew(new Action<object>(f =>
    {
        Console.WriteLine("in task");
        for (int i = 0; i < 20; i++)
        {
            Thread.Sleep(100);
            CancellationToken ct = (f as TaskFactory).CancellationToken;
            if (ct.IsCancellationRequested)
            {
                Console.WriteLine("canceling was requested canceling from within the task");
                ct.ThrowIfCancellationRequested();
                break;
            }
            Console.WriteLine("in loop");
        }
        Console.WriteLine("task finished without cancellation");
    }), factory, cts.Token);

    try
    {
        t1.Wait();
    }
    catch (Exception e)
    {
        Console.WriteLine("exception: {0},{1}", e.GetType().Name, e.Message);
        if (e.InnerException != null)
        {
            Console.WriteLine("inner exception: {0},{1}",
                e.InnerException.GetType().Name,
                e.InnerException.Message);
        }
    }
    Console.WriteLine("Status of the task: {0}", t1.Status);
}

运行应用程序,可以看到任务启动了,运行了几个循环,并获得了取消请求。之后取消任务,并抛出TaskCanceledException异常,它是从方法调用ThrowIfCancellationRequested()中启动的。调用者等待任务时,会捕获AggregateException异常,它包含内部异常TaskCanceledException。


#8. 线程问题

8.1 争用条件

如果两个或多个线程访问相同的对象,或者访问不同步的共享状态,就会出现争用条件。

为了说明争用条件,定义一个StateObject类,它包含一个int字段和一个ChangeState()方法。在ChangeState()方法的实现代码中,验证状态变量是否包含5。如果它包含,就递增其值。下一条语句是Trace.Assert,它立刻验证state是否包含6。

class StateObject
{
    private int state = 5;
    public void ChangeState(int loop)
    {
        if (state == 5)
        {
            state++;
            Trace.Assert(state == 6, "Race condition occurred after " + loop + "loops");
        }
        state = 5;
    }
}

下面通过给任务定义一个方法来验证这一点。SampleTask类的RaceCondition方法将一个StateObject类作为其参数。在一个无限while循环中,调用ChangeState()方法。变量i仅用于显示断言消息中的循环次数。

public class SampleTask
{
    public void RaceCondition(object o)
    {
        Trace.Assert(o is StateObject, "o must be of type StateObject");
        StateObject state = o as StateObject;
        int i = 0;
        while (true)
        {
            state.ChangeState(i++);
        }
    }
}

在程序Main()方法中,新建一个StateObject对象,它由所有任务共享。在Thread类的构造函数中,给RaceCondition的地址传递一个SampleObject类型的对象,以创建Task对象。接着传递State对象,使用Start()方法启动这个任务。

static void Main(string[] args)
{
    var state = new StateObject();
    for (int i = 0; i < 20; i++)
    {
        new Task(new SampleTask().RaceCondition, state).Start();
    }
    Thread.Sleep(3000);
}

启动程序,就会出现争用条件。要避免该问题,可以锁定共享的对象。

8.2 死锁

过多的锁定也会有麻烦。在死锁中,至少有两个线程被挂起,并等待对方解除锁定。由于两个线程都在等待对方,就出现了死锁,线程将无限等待下去。

 public class SampleThread
{
    private StateObject s1;
    private StateObject s2;

    public SampleThread(StateObject s1, StateObject s2)
    {
        this.s1 = s1;
        this.s2 = s2;
    }

    public void DeadLock1()
    {
        int i = 0;
        while (true)
        {
            lock (s1)
            {
                lock (s2)
                {
                    s1.ChangeState(i);
                    s2.ChangeState(i++);
                    Console.WriteLine("still running, {0}", i);
                }
            }
        }
    }

    public void DeadLock2()
    {
        int i = 0;
        while (true)
        {
            lock (s2)
            {
                lock (s1)
                {
                    s1.ChangeState(i);
                    s2.ChangeState(i++);
                    Console.WriteLine("still running, {0}", i);
                }
            }
        }
    }
}

#9. 同步

要避免同步问题,最好不要在线程之间共享数据。如果需要共享数据,就必须使用同步技术,确保一次只有一个线程访问和改变共享状态。注意,同步问题与争用条件和死锁有关。

9.1 lock语句和线程安全

C#为多个线程的同步提供了自己的关键字:lock语句。lock语句是设置锁定和解除锁定的一种简单方式。

在添加lock语句之前,先进入另一个争用条件。SharedState类说明了如何使用线程之间的共享状态,并保存一个整数值。

public class SharedState
{
    public int State { get; set; }
}

Job包含DoTheJob()方法,该方法是新任务的入口点。通过其实现代码,将SharedState变量的State递增50000次。sharedState变量在这个类的构造函数中初始化:

public class Job
{
    SharedState sharedState;
    public Job(SharedState sharedState)
    {
        this.sharedState = sharedState;
    }

    public void DoTheJob()
    {
        for (int i = 0; i < 50000; i++)
        {
            sharedState.State++;
        }
    }
}

在Main()方法中,创建一个SharedState对象,并把它传递给20个Task对象的构造函数。在启动所有的任务后,Main()方法进入另一个循环,使得20个任务全部处于等待状态,直到所有的任务都执行完毕为止。任务执行完毕后,把共享状态的合计值写入控制台。因为执行了50000此循环,有20个任务,所以写入控制台的值应是1000000。但是,事实常常并非如此。

class SynchronizationTest
{
    static void Main(string[] args)
    {
        int numTasks = 20;
        var state = new SharedState();
        var tasks = new Task[numTasks];

        for (int i = 0; i < numTasks; i++)
        {
            tasks[i] = new Task(new Job(state).DoTheJob);
            tasks[i].Start();
        }

        for (int i = 0; i < numTasks; i++)
        {
            tasks[i].Wait();
        }
        Console.WriteLine("summarized {0}", state.State);
    }
}

多次运行应用程序的结果如下所示:

summarized 304111
summarized 287270

每次运行结果都不同,但没有一个结果是正确的。调试版本和发布版本的区别很大。根据使用的CPU类型,结果也不一样。

必须在这个程序中添加同步功能,这可以用lock关键字实现。

用lock语句定义的对象表示,要等待指定对象的锁定。只能传递引用类型。锁定值类型只是锁定了一个副本,这没有什么意义。如果对值类型使用了lock语句,c#编译器就会提供一个错误。进行了锁定后——只有锁定一个线程,就可以运行lock语句块。在lock语句块的最后,对象的锁定被解除,另一个等待锁定的线程就可以获得该锁定块了。

lock(obj) {
    //synchronized region
}

要锁定静态成员,可以把锁放在object类型上:

lock(typeof(StaticClass)) {
    
}

使用lock关键字可以将类的实例成员设置为线程安全的。这样,一次只有一个线程能访问相同实例的DoThis()和DoThat()方法。

public class {
    public void DoThis() {
        lock(this) {
            //Only one thread at a time can access the DoThis and DoThat methods
        }
    }
    public void DoThat() {
        lock(this) {
            
        }
    }
}

但是,因为实例的对象也可以用于外部的同步访问,我们不能在类自身中控制这种访问,所以应采用SyncRoot模式。通过SyncRoot模式,创建一个私有对象syncRoot,将这个对象用于lock语句。

public class Demo {
    private object syncRoot = new object();
    
    public void DoThis() {
        lock(syncRoot) {
            //Only one thread at a time can access the DoThis and DoThat methods
        }
    }
    public void DoThat() {
        lock(syncRoot) {
            
        }
    }
}

首先修改异步的SharedState类,以使用SyncRoot模式。如果试图用SyncRoot模式锁定对属性的访问,使SharedState类变成线程安全的,就仍会出现前面描述的争用条件。

public class SharedState
{
    private int state;
    private readonly object syncRoot = new object();
    public int State
    {
        get{ lock (syncRoot) { return state;  }}

        set{ lock (syncRoot) { state = value; }}
    }
}

调用方法DoTheTask()方法的线程访问SharedState类的get存取器,以获得state的当前值,接着get存取器给state设置新值。在调用对象的get和set存取器期间,对象没有锁定,另一个线程可以获得临时值。

public void DoTheJob()
{
    for (int i = 0; i < 50000; i++)
    {
        lock(sharedState)
        {
            sharedState.State++;
        }
    }
}

==在一个地方使用lock语句并不意味着,访问对象的其他线程都正在等待。必须对每个访问共享状态的线程显式地使用同步功能。==

9.2 Interlocked类

Interlocked类用于使变量的简单语句原子化。i++不是线程安全的,它的操作包括从内存中获取一个值,给该值递增1,再将它存储回内存。这些操作都可能会被线程调度器打断。Interlocked类提供了以线程安全的方式递增、递减、交换和读取值的方法。

与其他同步技术相比,使用Interlocked类会快得多。但是,它只能用于简单的同步问题。

例如,这里不使用lock语句锁定对someState变量的访问,把它设置为一个新值,以防止它是空的,而可以使用Interlocked类,它比较快:

lock(this) {
    if(someState == null) {
        someState = newState;
    }
}

这个功能相同但比较快的版本使用了Interlocked.CompareExchange()方法:

Interlocked.CompareExchange<SomeState>(ref someState,newState,null);

并且不在lock语句中执行递增操作:

public int State {
    get {
        lock(this) {
            return ++state;
        }
    }
}

而使用较快的Interlocked.Increment()方法:

public int State {
    get {
        return Interlocked.Increment(ref state);
    }
}

9.3 Monitor类

c#的lock语句由编译器解析为使用Monitor类。下面的lock语句:

lock(obj) {
    //synchronized region for obj
}

被解析为调用Enter()方法,该方法会一直等待,直到线程被对象锁定为止。一次只有一个线程能被对象锁定。只要解除锁定,线程就可以进入同步阶段。Monitor类的Exit()方法解除了锁定。编译器把Exit()方法放在try块的finally处理程序中,所以如果抛出了异常,就也会解除该锁定。

Monitor.Enter(obj);
try {
    //synchronized region for obj   
}finally {
    Monitor.Exit(obj);
}

与c#的lock语句相比,Monitor类的主要优点是:可以添加一个等待被锁定的超时值。这样就不会无限期地等待被锁定,而可以使用TryEnter(),其中传递一个超时值,指定等待被锁定的最长时间。

bool lockTaken = false;
Monitor.TryEnter(obj,500,ref lockTaken);
if(lockTaken) {
    try{
        //acquired the lock
        //synchronized region for obj
    }finally {
        Monitor.Exit(obj);
    }
}else {
    //didn't get the lock,do somthing else.
}

9.4 ReaderWriterLockSlim类

为了使锁定机制允许锁定多个读取器(而不是一个写入器)访问某个资源,可以使用ReaderWriterLockSlim类。这个类提供了一个锁定功能,如果没有写入器锁定资源,就运行多个读取器访问资源,但只能有一个写入器锁定该资源。

ReaderWriterLockSlim类的属性可获得读取阻塞或不阻塞的锁定,如EnterReadLock()和TryEnterReadLock()方法。还可以使用EnterWriterLock()和TryEnterWriterLock()方法获得写入锁定。如果任务先读取资源,之后写入资源,它就可以使用EnterUpgradableReadLock()或TryEnterUpgradableReadLock()方法获得可升级的读取锁定。有了这个锁定,就可以获得写入锁定,而无需释放读取锁定。

class MonitorTest
{
    private static List<int> items = new List<int>() { 0, 1, 2, 3, 4, 5 };
    private static ReaderWriterLockSlim rwl = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    
    //获得读取锁定,读取集合的值
    static void ReaderMethod(object reader)
    {
        try
        {
            rwl.EnterReadLock();

            for (int i = 0; i < items.Count; i++)
            {
                Console.WriteLine("reader {0},loop: {1},item: {2}", reader, i, items[i]);
                Thread.Sleep(40);
            }
        }
        finally
        {
            rwl.ExitReadLock();
        }
    }
    
    //获得写入锁定,改变集合的值
    static void WriterMethod(object writer)
    {
        try
        {
            while (!rwl.TryEnterWriteLock(50))
            {
                Console.WriteLine("Writer {0} waiting for the write lock", writer);
                Console.WriteLine("current reader count: {0}", rwl.CurrentReadCount);
            }
            Console.WriteLine("Writer {0} acquired the lock", writer);
            for (int i = 0; i < items.Count; i++)
            {
                items[i]++;
                Thread.Sleep(50);
            }
            Console.WriteLine("Writer {0} finished", writer);
        }
        finally
        {
            rwl.ExitWriteLock();
        }
    }

    static void Main(string[] args)
    {
        var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning,TaskContinuationOptions.None);
        var task = new Task[6];
        task[0] = taskFactory.StartNew(WriterMethod, 1);
        task[1] = taskFactory.StartNew(ReaderMethod, 1);
        task[2] = taskFactory.StartNew(ReaderMethod, 2);
        task[3] = taskFactory.StartNew(WriterMethod, 2);
        task[4] = taskFactory.StartNew(ReaderMethod, 3);
        task[5] = taskFactory.StartNew(ReaderMethod, 4);

        for (int i = 0; i < 6; i++)
        {
            task[i].Wait();
        }
    }
}

#10. Timer类

.Net Framework提供了几个Timer类,用于在某个时间间隔后调用某个方法。

使用System.Threading.Timer类,可以把要调用的方法作为构造函数的第一个参数传递。这个方法必须满足TimeCallback委托的要求,该委托定义了一个void返回类型和一个object参数。通过第二个参数,可以传递任意对象,用回调方法中的object参数接受对应的对象。

private static void ThreadingTimer()
{
    var t1 = new Timer(TimeAction, null, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(3));

    Thread.Sleep(15000);

    t1.Dispose();
}

private static void TimeAction(object o)
{
    Console.WriteLine("System.Threading.Timer {0:T}",DateTime.Now);
}
上一篇下一篇

猜你喜欢

热点阅读