第20章:线程、任务和同步
- #1. 概述
- #2. 异步委托
- #3. thread类
- #4. 线程池
- #5. 任务
- #6. Parallel类
- #7. 取消架构
- #8. 线程问题
- #9. 同步
- #10. Timer类
#1 概述
线程是程序中独立的指令流。使用C#编写任何程序时,都有一个入口点:Main()方法。程序从Main()方法的第一条语句开始执行,直到这个方法返回为止。
#2. 异步委托
创建线程的一个简单方式是定义一个委托,并异步调用它。委托是方法的类型安全的引用。Delegate类还支持异步地调用方法。在后台,Delegate类会创建一个执行任务的线程。
==委托使用线程池来完成异步任务。==
为了说明委托的异步特性,从一个需要一定的时间才能执行完毕的方法开始。TaskAWwhile()执行一定程度的耗时操作,其定义如下:
static int TaskAWhile(int data,int ms)
{
Console.WriteLine("TaskAWhile started");
Thread.Sleep(ms);
Console.WriteLine("TaskAWhile completed");
return ++data;
}
要从委托中调用这个方法,必须定义一个有相同参数和返回类型的委托,如下面的TaskAWhileDelegate()所示:
public delegate int TaskAWhileDelegate(int data, int ms);
2.1 投票
一种技术是投票,并检查委托是否完成了它的任务,所创建的Delegate类提供了BeginInvoke()方法,在该方法中,可以传递用委托类型定义的输入参数。BeginInvoke()方法总是有AsyncCallback和object类型的两个额外参数。现在重要的是BeginInvoke()方法的返回类型:IAsyncResult。通过IAsyncResult,可以获得该委托的相关信息,并通过IsCompleted属性验证该委托是否完成了任务。只要委托没有完成其任务,程序的主线程就继续执行while循环。
static void Main(string[] args)
{
//synchronous method call
//TaskAWhile(1,3000);
//asychronous by using a delegate
TaskAWhileDelegate dl = TaskAWhile;
IAsyncResult ar = dl.BeginInvoke(1, 3000, null, null);
while (!ar.IsCompleted)
{
//doing something else in the main thread.
Console.Write(".");
Thread.Sleep(50);
}
int result = dl.EndInvoke(ar);
Console.WriteLine("result: {0}", result);
}
==如果在委托结束之前不等委托完成其任务就结束主线程,委托线程就会停止。==
2.2 等待句柄
等待异步委托的结果的第2种方式是使用与IAsyncResult相关联的等待句柄。使用AsyncWaitHandle属性可以访问等待句柄。这个属性返回一个WaitHandle类型的对象,它可以等待委托线程完成其任务。WaitOne()将一个超时时间作为可选的第一个参数,在其中可以定义要等待的最长时间。如果发生超时,WaitOne()就返回false,while循环会继续执行。如果等待操作成功,就用一个中断退出while循环,用委托EndInvoke方法接受结果。
static void Main(string[] args)
{
//synchronous method call
//TaskAWhile(1,3000);
//asychronous by using a delegate
TaskAWhileDelegate dl = TaskAWhile;
IAsyncResult ar = dl.BeginInvoke(1, 3000, null, null);
while(true)
{
Console.Write(".");
if (ar.AsyncWaitHandle.WaitOne(50, false))
{
Console.WriteLine("Can get result now");
break;
}
}
int result = dl.EndInvoke(ar);
Console.WriteLine("result: {0}", result);
}
2.3 异步回调
等待委托结果的第3中方式是使用异步回调。在BeginInvoke()的第3个参数中,可以传递一个满足AsyncCallback委托的需求的方法。AsyncCallback委托定义了一个IAsyncResult类型参数,其返回类型是void。这里,把TaskAWhileCompleted()的地址赋予第3个参数,它满足AsyncCallback委托的需求。对于最后一个参数,可以传递任意对象,以便从回调方法中访问它。传递委托实例很有用,这样回调方法就可以使用它获得异步方法的结果。
现在,只要TaskAWhileDelegate委托完成其任务,就调用TaskAWhileCompleted()。不需要在主线程中等待结果。但是在委托线程的任务完成之前,不能停止主线程,除非主线程结束时停止的委托线程没有问题。
static void Main(string[] args)
{
TaskAWhileDelegate dl = TaskAWhile;
dl.BeginInvoke(1, 3000, TaskAWhileCompleted, dl);
for (int i = 0; i < 100; i++)
{
Console.Write(".");
Thread.Sleep(50);
}
}
TaskAWhileCompleted声明如下:
static void TaskAWhileCompleted(IAsyncResult ar)
{
if (ar == null)
{
throw new ArgumentException("ar");
}
TaskAWhileDelegate dl = ar.AsyncState as TaskAWhileDelegate;
Trace.Assert(dl != null, "Invalid object type.");
int result = dl.EndInvoke(ar);
Console.WriteLine("result: {0}", result);
}
==使用回调方法,必须注意这个方法从委托线程中调用,而不是从主线程中调用。==
除了定义一个单独的方法,并给它传递BeginInvoke(),lambda表达式也非常适合这种情况。参数ar是IAsyncResult类型。在实现代码中,不需要把一个值赋予BeginInvoke()方法的最后一个参数,因为lambda表达式可以直接访问该作用域外部的变量dl。但是,Lambda表达式的实现代码仍是从委托线程中调用,以这种方式定义方法时,这不是很明显。
static void Main(string[] args)
{
TaskAWhileDelegate dl = TaskAWhile;
dl.BeginInvoke(1, 3000, ar => {
int result = dl.EndInvoke(ar);
Console.WriteLine("result: {0}", result);
},
null);
for (int i = 0; i < 100; i++) {
Console.Write(".");
Thread.Sleep(50);
}
}
#3. Thread类
使用Thread类可以创建和控制线程。下面的代码是创建和启动一个新线程的简单例子。Thread类构造函数重载并接受ThreadStart和ParameterizedThreadStart类型的委托参数。ThreadStart委托定义了一个返回类型为void的无参数方法。在创建了Thread对象后,就可以调用Start()方法启动线程:
static void Main(string[] args)
{
var t1 = new Thread(ThreadMain);
t1.Start();
Console.WriteLine("This is a main thread.");
}
static void ThreadMain()
{
Console.WriteLine("Running a thread.");
}
3.1 给线程传递数据
给线程传递数据可以采用两种方式:
- 使用带ParameterizedThreadStart委托参数的Thread构造函数;
- 创建一个自定义类,把线程的方法定义为实例方法,这样可以初始化实例的数据,之后启动线程。
要给线程传递数据,需要某个存储数据的类或结构。这里定义了包含字符串Data结构,但可以传递任意对象。
public struct Data
{
public string Message;
}
如果使用了ParameterizedThreadStart委托,线程的入口点必须有一个object类型的参数,且返回类型为void,对象可以强制转换为任意数据类型,这里把消息写入控制台。
static void ThreadMainWithParameters(object o) {
Data d = (Data)o;
Console.WriteLine("Running a thread, received {0}",d.Message);
}
通过Thread类的构造函数,可以将新的入口点赋予ThreadMainWithParamters,传递变量d,以此调用Start()。
static void Main(string[] args)
{
var d = new Data {
Message = "Info"
};
var t2 = new Thread(ThreadMainWithParameters);
t2.Start(d);
}
3.2 后台线程
只要有一个前台线程在运行,应用程序的进程就在运行。如果多个前台线程在运行,而Main()方法结束了,应用程序的进程就仍然是激活的,直到所有前台线程完成其任务为止。
在默认情况下,用Thread类创建的线程是前台线程。线程池中的线程总是后台线程。
在用Thread类创建线程时,可以设置IsBackground属性,以确定该线程是前台线程还是后台线程。
static void Main(string[] args)
{
var t1 = new Thread(ThreadMain) {
Name = "MyNewThread",
IsBackground = false,
};
t1.Start();
Console.WriteLine("Main thread ending now.");
}
static void ThreadMain()
{
Console.WriteLine("Thread {0} started",Thread.CurrentThread.Name);
Thread.Sleep(3000);
Console.WriteLine("Thread {0} completed",Thread.CurrentThread.Name);
}
3.3 线程的优先级
线程由操作系统调度。给线程指定优先级,就可以影响调度顺序。
在改变优先级之前。必须理解线程调度器。操作系统根据优先级来调度线程。调度优先级最高的线程以在CPU上运行。线程如果在等待资源,它就会停止运行,并释放CPU。
在Thread类中,可以设置Priority属性,以影响线程的基本优先级。Priority属性需要ThreadPrority枚举定义的一个值。定义的级别有Highest、AboveNormal、BelowNormal和Lowest。
在给线程指定优先级的时候要小心,因为这可能降低其他线程的运行概率。根据需要,可以短暂的改变优先级。
3.4 控制线程
调用Thread对象的Start()可以创建线程。但是,在调用Start()后,新线程仍不是处于Running状态,而是处于Unstarted状态。只要操作系统的线程调度器选择了要运行的线程,线程就会改为Runnnig状态。读取Thread.ThreadState属性,就可以获得线程的当前状态。
使用Thread.sleep(),会使线程处于WaitSleepJoin状态,在经历Sleep()方法定义的时间段后,线程就会等待再次被唤醒。
要停止一个线程,可以调用Thread.abort()。调用这个方法时,会在接到终止命令的线程中抛出一个ThreadAbortException类型的异常。用一个处理程序捕获这个异常,线程可以在结束前完成一些清理工作。线程还可以在接受到调用Thread.ResetAbort()方法的结果ThreadAbortException异常后继续执行。如果线程没有重置终止,接受到终止请求的线程的状态就从AbortRequested改为Aborted。
如果需要等待线程的结束,就可以调用Thread.join()。Thread.join()会停止当前线程,并把它设置为WaitSleepJoin状态,直到加入的线程完成终止。
#4. 线程池
创建线程需要时间。如果有不同的小任务要完成,就可以事先创建许多线程,在应完成这些任务时发出请求。这个线程数最好在需要更多的线程数的时候增加,在需要释放资源的时候减少。
不需要自己创建这样一个列表。该列表由ThreadPool类托管。这个类在需要时增减池中的线程数,直到最大的线程数。池中的最大线程数是可配置的。在双核CPU中,默认设置为1023个工作线程和1000个I/O线程。
static void Main(string[] args)
{
int nWorkerThreads;
int nCompletePortThreads;
ThreadPool.GetMaxThreads(out nWorkerThreads, out nCompletePortThreads);
Console.WriteLine("Max worker threads: {0}" + " I/O completion threads: {1}", nWorkerThreads, nCompletePortThreads);
for (int i = 0; i < 5; i++)
{
ThreadPool.QueueUserWorkItem(JobForAThread);
}
Thread.Sleep(3000);
}
static void JobForAThread(object state)
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine("loop {0},running inside pooled thread {1}", i, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(50);
}
}
线程池使用起来很简单,但它有一些限制:
- 线程池中所有线程都是后台线程。如果线程的所有前台线程都结束了,所有的后台线程就会停止。不能把入池的线程改为前台线程。
- 不能给入池的线程设置优先级或名称。
- 对于COM对象,入池的所有线程都是多线程单元线程。许多COM对象都需要单线程单元线程。
- 入池的线程只能用于时间较短的任务。如果线程要一直运行,就应使用Thread类创建一个线程。
#5. 任务
.NET 4包含新名称空间System.Threading.Tasks,它包含的类抽象了线程功能。在后台使用ThreadPool。任务表示要完成的某个单元的工作。这个单元的工作可以在单独的线程中运行,也可以以同步方式启动一个任务,这需要等待主调线程。使用任务不仅可以获得一个抽象层,还可以对底层线程进行很多控制。
5.1 启动任务
要启动任务,可以使用TaskFactory类或Task类的构造函数和Start()。Task类的构造函数在创建任务上提供的灵活性较大。
在启动任务时,会创建Task类的一个实例,利用Action或Action<object>委托(不带参数或带一个object参数),可以指定应运行的代码。这类似于Thread类。下面定义一个无参数的方法。在实现代码中,把任务ID写入控制台。
static void TaskMethod()
{
Console.WriteLine("running in a task.");
Console.WriteLine("Task id: {0}",Task.CurrentId);
}
在上面代码中,可以看到启动新任务的不同方式:
- 第一种方式使用实例化的TaskFactory类,在其中把TaskMethod()传递给StartNew(),就会立即启动任务。
- 第二种方式使用Task类的构造函数。实例化Task对象时,任务不会立即运行,而是指定Created状态。接着调用Task类的Start(),来启动任务。
使用Task类时,除了调用Start(),还可以调用RunSynchronously()。这样任务也会启动,但在调用者的当前线程中它正在运行,调用者需要等待任务结束。默认情况下,任务是异步运行的。
//using task factory
TaskFactory tf = new TaskFactory();
Task t1 = tf.StartNew(TaskMethod);
//using the task factory via a task
Task t2 = Task.Factory.StartNew(TaskMethod);
//using Task constructor
Task t3 = new Task(TaskMethod);
t3.Start();
使用Task类的构造函数和TaskFactory类的StartNew(),都可以传递TaskCreationOptions枚举中的值。设置LongRunning选项,可以通知任务调度器,该任务需要较长时间执行,这样调度器更可能使用新线程。如果该任务应关联到父任务上,而父任务取消了,则该任务也应取消,此时应设置AttachToParent选项。
5.2 连续的任务
通过任务,可以指定在任务完成后,应开始运行另一个特定任务,例如,一个使用前一个任务的结果的新任务,如果前一个任务失败了,这个任务应执行一些清理工作。
任务处理程序或者不带参数或者带一个对象参数,而连续处理程序有一个Task类型的参数,这里可以访问起始任务的相关信息:
static void DoOnFirst()
{
Console.WriteLine("doing some task {0}",Task.CurrentId);
Thread.Sleep(3000);
}
static void DoOnSecond(Task t)
{
Console.WriteLine("task {0} finished",t.Id);
Console.WriteLine("this task id {0}",Task.CurrentId);
Console.WriteLine("do some cleanup");
Thread.Sleep(3000);
}
连续任务通过任务上调用ContinueWith()来定义。也可以使用TaskFactory类来定义。
Task t1 = new Task(DoOnFirst);
Task t2 = t1.ContinueWith(DoOnSecond);
Task t3 = t1.ContinueWith(DoOnSecond);
Task t4 = t2.ContinueWith(DoOnSecond);
5.3 任务层次架构
利用任务连续性,可以在一个任务结束后启动另一个任务。任务也可以构成一个层次结构。一个任务启动一个新任务时,就启动了一个父/子层次结构。
下面的代码段在父任务内部新建一个任务。创建子任务的代码与创建父任务的代码相同。唯一的区别是这个任务从另一个任务内部创建。
static void ParentAndChild()
{
var parent = new Task(ParentTask);
parent.Start();
Thread.Sleep(2000);
Console.WriteLine(parent.Status);
Thread.Sleep(4000);
Console.WriteLine(parent.Status);
}
static void ParentTask()
{
Console.WriteLine("task id {0}",Task.CurrentId);
var child = new Task(ChildTask);
child.Start();
Thread.Sleep(1000);
Console.WriteLine("parent started child");
}
static void ChildTask()
{
Console.WriteLine("child");
Thread.Sleep(5000);
Console.WriteLine("child finished");
}
如果父任务在子任务之前结束,父任务的状态就显示为WaitingForChildrenToComplete。只要子任务也结束时,父任务的状态就变成RanToCompletion。当然,如果父任务用TaskCreationOptions枚举中的DetachedFromParent创建子任务时,这就无效。
5.4 任务的结果
任务结束时,它可以把一些有用的状态信息写到共享对象中。这个共享对象必须是线程安全的。另一个选项是使用返回某个结果的任务。使用Task类的泛型版本,就可以定义返回某个结果的任务返回类型。
为了返回某个结果任务调用的方法就可以声明为带任意返回类型。
static Tuple<int,int> TaskWithResult(object division)
{
Tuple<int,int> div = (Tuple<int,int>)division;
int result = div.Item1/div.Item2;
int reminder = div.Item1 % div.Item2;
Console.WriteLine("task creates a result...")
return Tuple.Create<int,int>(result,reminder);
}
定义一个调用TaskWithResult()的任务时,要使用泛型类Task<Result>。泛型参数定义了返回类型。通过构造函数,把这个方法传递给Func委托,第二个参数定义了输入值。因为这个任务在onject参数中需要两个输入值,所以还创建了一个元祖。接着启动该任务。
var t1 = new Task<Tuple<int,int>>(TaskWithResult,Tuple.Create<int,int>(8,3));
t1.Start();
Console.WriteLine(t1.Result);
t1.Wait();
Console.WriteLine("result from task: {0} {1}",t1.Result.Item1,t1.Result.Item2);
#6. Parallel类
在.Net4中,另一个新增的抽象线程是Parallel类。这个类定义了并行的for和foreach的静态方法。在为for和foreach定义的语言中,循环从一个线程中运行。Parallel类使用多个任务,因此使用多个线程来完成这个作业。
Parallel.For()和Parallel.ForEach()方法多次调用同一个方法,而Parallel.Invoke()允许同时调用不同的方法。
6.1 用Parallel.For()方法循环
Parallel.For()类似于C#的for循环语句,也是多次执行一个任务。使用Parallel.For(),可以并行运行迭代。迭代的顺序没有定义。
在For()中,前两个参数定义了循环的开头和结束。
ParallelLoopResult result = Parallel.For(0, 10, i =>
{
Console.WriteLine("{0}, task: {1}, thread: {2}", i,
Task.CurrentId, Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(50);
});
Console.WriteLine(result.IsCompleted);
也可以提前中断Parallel.For()。For()的一个重载版本接受3个Action<int,ParallelLoopState>类型的参数。使用这些参数调用ParallelLoopState的Break()或Stop(),以影响循环的结果。
注意,迭代的顺序没有定义。
ParallelLoopResult result = Parallel.For(10, 40,
(int i, ParallelLoopState pls) => {
Console.WriteLine("i: {0} task {1}", i, Task.CurrentId);
Thread.Sleep(10);
if (i > 15)
{
pls.Break();
}
});
Console.WriteLine(result.IsCompleted);
Console.WriteLine("lowest break iteration: {0}",
result.LowestBreakIteration);
Parallel.For()方法可能使用几个线程来执行循环。如果需要对每个线程进行初始化,就可以使用Parallel.For<TLocal>()。
6.2 使用Parallel.ForEach()方法循环
Parallel.ForEach()方法遍历实现了IEnumerable的集合,其方式类似于foreach语句,但以异步方式遍历。这里也没有确定遍历顺序。
string[] data = { "zero", "one", "two", "three", "four", "five", "six" };
Parallel.ForEach<string>(data, s => {
Console.WriteLine(s);
});
6.3 通过Parallel.Invoke()方法调用多个方法
如果多个任务应并行运行,就可以使用Parallel.Invoke()。Parallel.Invoke()允许传递一个Action委托数组,在其中可以指定应运行的方法。
static void ParallelInvoke()
{
Parallel.Invoke(Foo,Bar);
}
static void Foo()
{
Console.WriteLine("Foo");
}
static void Bar()
{
Console.WriteLine("Bar");
}
#7. 取消架构
.Net4包含一个新的取消架构,允许以标准方式取消长时间运行的任务。
取消架构基于协作行为,它不是强制的。长时间运行的任务会检查它是否被取消,并返回控制权。
支持取消的方法接受一个CancellationToken参数。这个类定义了IsCancellationRequested属性,其中长时间运行的操作可以检查它是否应终止。长时间运行的操作检查取消的方式有:取消标记时,使用标记的WaitHandle属性,或者使用Register()。Register()接受Action和ICancelableOperation类型的参数。Action委托引用的方法在取消标记时调用。
7.1 Parallel.For()方法的取消
Parallel类提供了For()的重载版本,在重载版本中,可以传递ParallelOptions类型的参数。使用该参数类型可以传递一个CancellationToken参数。CacellationToken参数通过创建CancellationTokenSource来生成。由于CancellationTokenSource实现了ICancelableOperation接口,因此可以用CancellationToken注册,并允许使用Cancel()取消操作。
static void Main(string[] args)
{
var cts = new CancellationTokenSource();
cts.Token.Register(() =>
{
Console.WriteLine("****token canceled");
});
//Start a task that sends a cancel to the
//cts after 500 ms
new Task(() =>
{
Thread.Sleep(500);
cts.Cancel(false);
}).Start();
try
{
ParallelLoopResult result = Parallel.For(0, 100,
new ParallelOptions()
{
CancellationToken = cts.Token
},
x =>
{
Console.WriteLine("Loop {0} started", x);
int sum = 0;
for (int i = 0; i < 100; i++)
{
Thread.Sleep(2);
sum += i;
}
Console.WriteLine("Loop {0} finished", x);
});
}
catch (OperationCanceledException e)
{
Console.WriteLine(e.Message);
}
Console.ReadKey();
}
通过取消操作,所有其他的迭代操作都在启动之前就取消了。启动的迭代操作允许完成,因为取消操作总是以协作方式进行,以避免在取消迭代操作的中间泄漏资源。
7.2 任务的取消
同样的取消模式也可用于任务。首先,创建一个CancellationTokenSource。如果仅需要一个取消标记,就可以访问Task.Factory.CancellationToken,以使用默认的取消标记。
static void Main(string[] args)
{
var cts = new CancellationTokenSource();
cts.Token.Register(() =>
{
Console.WriteLine("***task canceled");
});
//Start a task that sends a cancel to the
//cts after 500ms
Task.Factory.StartNew(() =>
{
Thread.Sleep(500);
cts.Cancel();
});
var factory = new TaskFactory(cts.Token);
Task t1 = factory.StartNew(new Action<object>(f =>
{
Console.WriteLine("in task");
for (int i = 0; i < 20; i++)
{
Thread.Sleep(100);
CancellationToken ct = (f as TaskFactory).CancellationToken;
if (ct.IsCancellationRequested)
{
Console.WriteLine("canceling was requested canceling from within the task");
ct.ThrowIfCancellationRequested();
break;
}
Console.WriteLine("in loop");
}
Console.WriteLine("task finished without cancellation");
}), factory, cts.Token);
try
{
t1.Wait();
}
catch (Exception e)
{
Console.WriteLine("exception: {0},{1}", e.GetType().Name, e.Message);
if (e.InnerException != null)
{
Console.WriteLine("inner exception: {0},{1}",
e.InnerException.GetType().Name,
e.InnerException.Message);
}
}
Console.WriteLine("Status of the task: {0}", t1.Status);
}
运行应用程序,可以看到任务启动了,运行了几个循环,并获得了取消请求。之后取消任务,并抛出TaskCanceledException异常,它是从方法调用ThrowIfCancellationRequested()中启动的。调用者等待任务时,会捕获AggregateException异常,它包含内部异常TaskCanceledException。
#8. 线程问题
8.1 争用条件
如果两个或多个线程访问相同的对象,或者访问不同步的共享状态,就会出现争用条件。
为了说明争用条件,定义一个StateObject类,它包含一个int字段和一个ChangeState()方法。在ChangeState()方法的实现代码中,验证状态变量是否包含5。如果它包含,就递增其值。下一条语句是Trace.Assert,它立刻验证state是否包含6。
class StateObject
{
private int state = 5;
public void ChangeState(int loop)
{
if (state == 5)
{
state++;
Trace.Assert(state == 6, "Race condition occurred after " + loop + "loops");
}
state = 5;
}
}
下面通过给任务定义一个方法来验证这一点。SampleTask类的RaceCondition方法将一个StateObject类作为其参数。在一个无限while循环中,调用ChangeState()方法。变量i仅用于显示断言消息中的循环次数。
public class SampleTask
{
public void RaceCondition(object o)
{
Trace.Assert(o is StateObject, "o must be of type StateObject");
StateObject state = o as StateObject;
int i = 0;
while (true)
{
state.ChangeState(i++);
}
}
}
在程序Main()方法中,新建一个StateObject对象,它由所有任务共享。在Thread类的构造函数中,给RaceCondition的地址传递一个SampleObject类型的对象,以创建Task对象。接着传递State对象,使用Start()方法启动这个任务。
static void Main(string[] args)
{
var state = new StateObject();
for (int i = 0; i < 20; i++)
{
new Task(new SampleTask().RaceCondition, state).Start();
}
Thread.Sleep(3000);
}
启动程序,就会出现争用条件。要避免该问题,可以锁定共享的对象。
8.2 死锁
过多的锁定也会有麻烦。在死锁中,至少有两个线程被挂起,并等待对方解除锁定。由于两个线程都在等待对方,就出现了死锁,线程将无限等待下去。
public class SampleThread
{
private StateObject s1;
private StateObject s2;
public SampleThread(StateObject s1, StateObject s2)
{
this.s1 = s1;
this.s2 = s2;
}
public void DeadLock1()
{
int i = 0;
while (true)
{
lock (s1)
{
lock (s2)
{
s1.ChangeState(i);
s2.ChangeState(i++);
Console.WriteLine("still running, {0}", i);
}
}
}
}
public void DeadLock2()
{
int i = 0;
while (true)
{
lock (s2)
{
lock (s1)
{
s1.ChangeState(i);
s2.ChangeState(i++);
Console.WriteLine("still running, {0}", i);
}
}
}
}
}
#9. 同步
要避免同步问题,最好不要在线程之间共享数据。如果需要共享数据,就必须使用同步技术,确保一次只有一个线程访问和改变共享状态。注意,同步问题与争用条件和死锁有关。
9.1 lock语句和线程安全
C#为多个线程的同步提供了自己的关键字:lock语句。lock语句是设置锁定和解除锁定的一种简单方式。
在添加lock语句之前,先进入另一个争用条件。SharedState类说明了如何使用线程之间的共享状态,并保存一个整数值。
public class SharedState
{
public int State { get; set; }
}
Job包含DoTheJob()方法,该方法是新任务的入口点。通过其实现代码,将SharedState变量的State递增50000次。sharedState变量在这个类的构造函数中初始化:
public class Job
{
SharedState sharedState;
public Job(SharedState sharedState)
{
this.sharedState = sharedState;
}
public void DoTheJob()
{
for (int i = 0; i < 50000; i++)
{
sharedState.State++;
}
}
}
在Main()方法中,创建一个SharedState对象,并把它传递给20个Task对象的构造函数。在启动所有的任务后,Main()方法进入另一个循环,使得20个任务全部处于等待状态,直到所有的任务都执行完毕为止。任务执行完毕后,把共享状态的合计值写入控制台。因为执行了50000此循环,有20个任务,所以写入控制台的值应是1000000。但是,事实常常并非如此。
class SynchronizationTest
{
static void Main(string[] args)
{
int numTasks = 20;
var state = new SharedState();
var tasks = new Task[numTasks];
for (int i = 0; i < numTasks; i++)
{
tasks[i] = new Task(new Job(state).DoTheJob);
tasks[i].Start();
}
for (int i = 0; i < numTasks; i++)
{
tasks[i].Wait();
}
Console.WriteLine("summarized {0}", state.State);
}
}
多次运行应用程序的结果如下所示:
summarized 304111
summarized 287270
每次运行结果都不同,但没有一个结果是正确的。调试版本和发布版本的区别很大。根据使用的CPU类型,结果也不一样。
必须在这个程序中添加同步功能,这可以用lock关键字实现。
用lock语句定义的对象表示,要等待指定对象的锁定。只能传递引用类型。锁定值类型只是锁定了一个副本,这没有什么意义。如果对值类型使用了lock语句,c#编译器就会提供一个错误。进行了锁定后——只有锁定一个线程,就可以运行lock语句块。在lock语句块的最后,对象的锁定被解除,另一个等待锁定的线程就可以获得该锁定块了。
lock(obj) {
//synchronized region
}
要锁定静态成员,可以把锁放在object类型上:
lock(typeof(StaticClass)) {
}
使用lock关键字可以将类的实例成员设置为线程安全的。这样,一次只有一个线程能访问相同实例的DoThis()和DoThat()方法。
public class {
public void DoThis() {
lock(this) {
//Only one thread at a time can access the DoThis and DoThat methods
}
}
public void DoThat() {
lock(this) {
}
}
}
但是,因为实例的对象也可以用于外部的同步访问,我们不能在类自身中控制这种访问,所以应采用SyncRoot模式。通过SyncRoot模式,创建一个私有对象syncRoot,将这个对象用于lock语句。
public class Demo {
private object syncRoot = new object();
public void DoThis() {
lock(syncRoot) {
//Only one thread at a time can access the DoThis and DoThat methods
}
}
public void DoThat() {
lock(syncRoot) {
}
}
}
首先修改异步的SharedState类,以使用SyncRoot模式。如果试图用SyncRoot模式锁定对属性的访问,使SharedState类变成线程安全的,就仍会出现前面描述的争用条件。
public class SharedState
{
private int state;
private readonly object syncRoot = new object();
public int State
{
get{ lock (syncRoot) { return state; }}
set{ lock (syncRoot) { state = value; }}
}
}
调用方法DoTheTask()方法的线程访问SharedState类的get存取器,以获得state的当前值,接着get存取器给state设置新值。在调用对象的get和set存取器期间,对象没有锁定,另一个线程可以获得临时值。
public void DoTheJob()
{
for (int i = 0; i < 50000; i++)
{
lock(sharedState)
{
sharedState.State++;
}
}
}
==在一个地方使用lock语句并不意味着,访问对象的其他线程都正在等待。必须对每个访问共享状态的线程显式地使用同步功能。==
9.2 Interlocked类
Interlocked类用于使变量的简单语句原子化。i++不是线程安全的,它的操作包括从内存中获取一个值,给该值递增1,再将它存储回内存。这些操作都可能会被线程调度器打断。Interlocked类提供了以线程安全的方式递增、递减、交换和读取值的方法。
与其他同步技术相比,使用Interlocked类会快得多。但是,它只能用于简单的同步问题。
例如,这里不使用lock语句锁定对someState变量的访问,把它设置为一个新值,以防止它是空的,而可以使用Interlocked类,它比较快:
lock(this) {
if(someState == null) {
someState = newState;
}
}
这个功能相同但比较快的版本使用了Interlocked.CompareExchange()方法:
Interlocked.CompareExchange<SomeState>(ref someState,newState,null);
并且不在lock语句中执行递增操作:
public int State {
get {
lock(this) {
return ++state;
}
}
}
而使用较快的Interlocked.Increment()方法:
public int State {
get {
return Interlocked.Increment(ref state);
}
}
9.3 Monitor类
c#的lock语句由编译器解析为使用Monitor类。下面的lock语句:
lock(obj) {
//synchronized region for obj
}
被解析为调用Enter()方法,该方法会一直等待,直到线程被对象锁定为止。一次只有一个线程能被对象锁定。只要解除锁定,线程就可以进入同步阶段。Monitor类的Exit()方法解除了锁定。编译器把Exit()方法放在try块的finally处理程序中,所以如果抛出了异常,就也会解除该锁定。
Monitor.Enter(obj);
try {
//synchronized region for obj
}finally {
Monitor.Exit(obj);
}
与c#的lock语句相比,Monitor类的主要优点是:可以添加一个等待被锁定的超时值。这样就不会无限期地等待被锁定,而可以使用TryEnter(),其中传递一个超时值,指定等待被锁定的最长时间。
bool lockTaken = false;
Monitor.TryEnter(obj,500,ref lockTaken);
if(lockTaken) {
try{
//acquired the lock
//synchronized region for obj
}finally {
Monitor.Exit(obj);
}
}else {
//didn't get the lock,do somthing else.
}
9.4 ReaderWriterLockSlim类
为了使锁定机制允许锁定多个读取器(而不是一个写入器)访问某个资源,可以使用ReaderWriterLockSlim类。这个类提供了一个锁定功能,如果没有写入器锁定资源,就运行多个读取器访问资源,但只能有一个写入器锁定该资源。
ReaderWriterLockSlim类的属性可获得读取阻塞或不阻塞的锁定,如EnterReadLock()和TryEnterReadLock()方法。还可以使用EnterWriterLock()和TryEnterWriterLock()方法获得写入锁定。如果任务先读取资源,之后写入资源,它就可以使用EnterUpgradableReadLock()或TryEnterUpgradableReadLock()方法获得可升级的读取锁定。有了这个锁定,就可以获得写入锁定,而无需释放读取锁定。
class MonitorTest
{
private static List<int> items = new List<int>() { 0, 1, 2, 3, 4, 5 };
private static ReaderWriterLockSlim rwl = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
//获得读取锁定,读取集合的值
static void ReaderMethod(object reader)
{
try
{
rwl.EnterReadLock();
for (int i = 0; i < items.Count; i++)
{
Console.WriteLine("reader {0},loop: {1},item: {2}", reader, i, items[i]);
Thread.Sleep(40);
}
}
finally
{
rwl.ExitReadLock();
}
}
//获得写入锁定,改变集合的值
static void WriterMethod(object writer)
{
try
{
while (!rwl.TryEnterWriteLock(50))
{
Console.WriteLine("Writer {0} waiting for the write lock", writer);
Console.WriteLine("current reader count: {0}", rwl.CurrentReadCount);
}
Console.WriteLine("Writer {0} acquired the lock", writer);
for (int i = 0; i < items.Count; i++)
{
items[i]++;
Thread.Sleep(50);
}
Console.WriteLine("Writer {0} finished", writer);
}
finally
{
rwl.ExitWriteLock();
}
}
static void Main(string[] args)
{
var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning,TaskContinuationOptions.None);
var task = new Task[6];
task[0] = taskFactory.StartNew(WriterMethod, 1);
task[1] = taskFactory.StartNew(ReaderMethod, 1);
task[2] = taskFactory.StartNew(ReaderMethod, 2);
task[3] = taskFactory.StartNew(WriterMethod, 2);
task[4] = taskFactory.StartNew(ReaderMethod, 3);
task[5] = taskFactory.StartNew(ReaderMethod, 4);
for (int i = 0; i < 6; i++)
{
task[i].Wait();
}
}
}
#10. Timer类
.Net Framework提供了几个Timer类,用于在某个时间间隔后调用某个方法。
使用System.Threading.Timer类,可以把要调用的方法作为构造函数的第一个参数传递。这个方法必须满足TimeCallback委托的要求,该委托定义了一个void返回类型和一个object参数。通过第二个参数,可以传递任意对象,用回调方法中的object参数接受对应的对象。
private static void ThreadingTimer()
{
var t1 = new Timer(TimeAction, null, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(3));
Thread.Sleep(15000);
t1.Dispose();
}
private static void TimeAction(object o)
{
Console.WriteLine("System.Threading.Timer {0:T}",DateTime.Now);
}