.NET多线程(十一)Socket
一、 Socket服务端
单个TCP/IP端口上能够被多少个进程侦听?
可以为多个,这时候是端口复用
WCF需要打开宿主程序ServiceHost
Socket需要开始监听端口TcpListener
(如果服务器界面不进行其他的操作,可以不用建立新线程)
建立新线程,启动监听,这个监听器TcpListener.AcceptSocket()会得到一个Socket,其实这里TcpListener里在初始化实例时会绑定服务器IP,PORT到Socket,
(用定时器,去判断监听是否有挂起的)
在监听处理函数中,通过建立新对象,对象拥有属性,然后在对象中至少有传入Socket参数的构造函数,以及Run方法,其中Run方法是来建立新线程,线程入口方法就是处理请求的方法,
处理请求的方法就是调用Receive和Send方法了,
这里需要设置处理超时的时间,在这个时间内,如果Socket.Available为0就让Thread睡眠,直到超时。
如果没有超时,且有了数据就让Socket.Receive接收到指定byte[512]数组,根据自己定义的字节数组的client和server端的收发格式,通常这里的格式是第一个字节表示对象类型,第二个字节表示传送对象的字节数目,根据字节数目,循环写入Socket.Receive接收到指定byte[512]数组,再写入到MemoryStream,通过Serializer.Deserialize反序列化MemoryStream,这里构造xmlserializer需要传入对象类型Type!
服务端是socket监听连接,listen方法
客户端是socket请求连接,connect方法
第1节 TCP
1,
我们首先知道TCP是面向连接的,它的意思是说两个远程主机(或者叫进程,因为实际上远程通信是进程之间的通信,而进程则是运行中的程序),
就是它是全双工的。意思是说如果两个主机上的进程(比如进程A、进程B),一旦建立好连接,那么数据就既可以由A流向B,也可以由B流向A
它还是点对点的,意思是说一个TCP连接总是两者之间的,在发送中,通过一个连接将数据发给多个接收方是不可能的。TCP还有一个特性,就是称为可靠的数据传输,意思是连接建立后,数据的发送一定能够到达,并且是有序的,就是说发的时候你发了ABC,那么收的一方收到的也一定是ABC,而不会是BCA或者别的什么。
2,
概念总结:
现在,我们大致可以得出这样几个结论:
• 如果不使用do/while循环,服务端只有一个listener.AcceptTcpClient()方法和一个TcpClient.GetStream().Read()方法,则服务端只能处理到同一客户端的一条请求。
• 如果使用一个do/while循环,并将listener.AcceptTcpClient()方法和TcpClient.GetStream().Read()方法都放在这个循环以内,那么服务端将可以处理多个客户端的一条请求。
• 如果使用一个do/while循环,并将listener.AcceptTcpClient()方法放在循环之外,将TcpClient.GetStream().Read()方法放在循环以内,那么服务端可以处理一个客户端的多条请求。
• 如果使用两个do/while循环,对它们进行分别嵌套,那么结果是什么呢?结果并不是可以处理多个客户端的多条请求。因为里层的do/while循环总是在为一个客户端服务,因为它会中断在TcpClient.GetStream().Read()方法的位置,而无法执行完毕。即使可以通过某种方式让里层循环退出,比如客户端往服务端发去“exit”字符串时,服务端也只能挨个对客户端提供服务。如果服务端想执行多个客户端的多个请求,那么服务端就需要采用多线程。主线程,也就是执行外层do/while循环的线程,在收到一个TcpClient之后,必须将里层的do/while循环交给新线程去执行,然后主线程快速地重新回到listener.AcceptTcpClient()的位置,以响应其它的客户端。
3,
对于客户端发送请求的数据,可以分1次或多次,同样在服务端接收数据,也可能会是1次或多次,此时,解决方法是自己规定发送的协议格式,在服务端解析格式,比如发送的时候包含我这次发送了多少个字节等信息,在服务端有几种情况就是,读取表示字节总数的,然后可能实际情况将好相等,或大于,或小于,
这里还有种情况就是,服务端读这个字节总数本身就是读错或没完整或丢失等,
4,
一个解决方案,多个客户端多个请求:
我们可以使用BeginRead()、BeginWrite()等异步方法,同时让这BeginRead()方法和它的回调方法形成一个类似于while的无限循环:首先在第一层循环中,接收到一个客户端后,调用BeginRead(),然后为该方法提供一个读取完成后的回调方法,然后在回调方法中对收到的字符进行处理,随后在回调方法中接着调用BeginRead()方法,并传入回调方法本身。
5,
传送文件
• 类似于FTP协议,服务端开辟两个端口,并持续对这两个端口侦听:一个用于接收字符串,类似于FTP的控制端口,它接收各种命令(接收或发送文件);一个用于传输数据,也就是发送和接收文件。
• 服务端只开辟一个端口,用于接收字符串,我们称之为控制端口。当接到请求之后,根据请求内容在客户端开辟一个端口专用于文件传输,并在传输结束后关闭端口。
我们先看一下发送文件的情况,如果我们想将文件client01.jpg由客户端发往客户端,那么流程是什么:
- 客户端开辟数据端口用于侦听,并获取端口号,假设为8005。
- 假设客户端输入了S1,则发送下面的控制字符串到服务端:[file=Client01.jpg, mode=send, port=8005]。
- 服务端收到以后,根据客户端ip和端口号与该客户端建立连接。
- 客户端侦听到服务端的连接,开始发送文件。
- 传送完毕后客户端、服务端分别关闭连接。
6,
协议:
在本文及下一篇文章中,我们采用一种新的方式来编写协议:XML。对于上面的语句,我们可以写成这样的XML:
<protocol><file name="client01.jpg" mode="send" port="8005" /></protocol>
第2节 多线程方式
处理不了过多的连接,浪费资源
可以看出,一旦socket被传递进来,一个新线程就会被创建以接收该socket。这个线程只是接收socket的数据并对其进行处理。
这种线程化的方法如果作为只有很少的客户端的服务器端性能是非常好的,并且也容易编写代码。但是,当连接数量变大时就不行了。主要的缺点就是太多的线程数目被创建和销毁,而这会消耗太多的系统资源。
第3节 Select方式
没连接限制,但是性能大大降低
有很多种方法可以避免为每个连接单独创建一个线程。第一个是使用线程池——一种只需要对以上线程化的服务器端做最少更改的方法。这种方法的好处是控制了同时出现的线程的数目,在连接存在时间较短的情况下是一种切实可行的方法。但是,如果每个连接都需要保持很长的时间的话,该方法就不那么适合了。一个更好的解决办法是使用静态的Socket.Select方法。使用该方法需要传递三个要监视的socket列表,一个为可读性,一个为可写性,另外一个为处理错误情况。该方法也接收一个微秒的参数,指示该方法反应之前等待的时间。
每次调用Select方法,都需要创建需要监控的socket的列表。Select返回后,这个列表被修改得只包含需要服务的socket。尽管看起来不错,实际上是非常没有效率的。设想如果同时有100个等待处理的I/O操作,第100个socket会一直在等待,直到前面99个socket的服务或者列表完成。socket服务同时也会阻止代码重新进入Select,这会造成更过潜在的空闲线程。在重新调用Select方法之前必须保证处理了所有socket的I/O;如果没有做到这些会导致同一个I/O的多余一次的提醒。
这种方法还有别的性能上的缺陷。如果客户端的连接超过一千,性能上就会有显著的降低,这是因为核心必须查找每一个socket确保其数据的可用性。相比每个连接一个线程的方法,尽管你可以用这种方法处理明显多的连接,但是这个量也不是很大。而且你需要管理三个不同的列表,并且对每一个反复声明以处理每一个请求。从线程使用的观点来看这很有效,但反应灵敏性却下降了很多。
多线程,异步IO服务器,异步处理
有问题,需要修改@@@
staticvoid Main(string[] args)
{
Socket s = newSocket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
s.BeginAccept(newAsyncCallback(AcceptCallback), s);
}
staticvoidAcceptCallback(IAsyncResult result)
{
Socket s = result.AsyncStateasSocket;
SocketrecvS = s.EndAccept(result);
recvS.BeginReceive(buffer, 0, buffer.length, SocketFlags.None,
newAsyncCallback(ReceiveCallback), connection);
s.BeginAccept(newAsyncCallback(AcceptCallback), s);
}
staticvoidReceiveCallback(IAsyncResult result)
{
intrecvLen = connection.Socket.EndReceive(result);
// handle business...
connection.Socket.BeginReceive();
}
第4节 异步方式
能够适应多个连接,并不会降低性能
前面两种方法创建的服务器端中,线程化的服务器端运行较快,但是接收不了太多的连接;而基于Select方法的服务器端没有连接数的限制,但是付出了性能降低的代价。异步服务器端消除了连接数目的限制,并且没有性能方面的降低。实际上,这种服务器端比线程模型的性能还要优越,因为代码里面并没有不断的创建和销毁线程。
在异步服务器端里,首先调用BeginAccept方法。在接受连接操作完成后,下一步是异步读取的排队。这允许服务器端读取socket,而不必查找或者创建线程。异步读取从对BeginRead方法的调用开始,对用EndRead后返回结果。
异步 I/O
异步I/O降低了创建和管理线程的需求,这直接会使代码相对简化,并且也是一个更高效的I/O模型。异步I/O使用回调函数处理传进的数据和连接,这就意味着将不需要建立socket列表,也不需要创建新的线程处理等待的I/O。
每一个基于.NET的应用程序都有一个线程池与其相连。当一个异步I/O函数有数据需要处理时,就会使用.NET线程池里的一个线程执行该回调函数。该回调函数执行完成以后,该线程就会被回收到线程池中。这种方法与线程池中的一个线程处理一个特定的请求是不同的;在那种情况下,线程池中的线程只被用来处理一个单独的I/O 操作。
.NET使用一个相当简单的模型处理异步操作。一切所需要做的就是调用相关的Begin 方法(BeginAccept, BeginSend, BeginReceive等等),利用合适的回调委托,在回调函数里面调用相应的End 方法(EndAccept, EndSend, EndReceive等等)以得到相应的结果。所有的异步Begin 方法都允许使用者传递一个内容状态对象,它可以是任意需要的对象。异步操作结束后,这个对象就是传递到回调函数里的IAsyncResult一部分。
异步服务端,重点
classAsyncServer
{
privateSocket_socket;
privateIPAddressip;
privatereadonlyintport = 7777;
privateList<Connection>_conns = newList<Connection>();
internalAsyncServer()
{
this._socket = newSocket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
ip = Dns.GetHostAddresses(Dns.GetHostName())[0];
this._socket.Bind(newIPEndPoint(ip, port));
}
internalvoidStart()
{
this._socket.Listen(10);
Console.WriteLine("server:" + ip + ":" + port + " is running...");
for (inti = 0; i< 10; i++)
{
this._socket.BeginAccept(newAsyncCallback(AcceptCallback), _socket);
}
}
privatevoidAcceptCallback(IAsyncResultresult)
{
Connectionconn = newConnection();
try
{
conn.Socket = this._socket.EndAccept(result);
conn.Buffer = newbyte[255];
lock (_conns)
{
this._conns.Add(conn);
}
conn.Socket.BeginReceive(conn.Buffer, 0, conn.Buffer.Length, SocketFlags.None, newAsyncCallback(ReceiveCallback), conn);
this._socket.BeginAccept(newAsyncCallback(AcceptCallback), _socket);
}
catch (Exceptionex)
{
conn.Close();
Console.WriteLine(ex.Message);
}
}
privatevoidReceiveCallback(IAsyncResultresult)
{
Connectionconn = (Connection)result.AsyncState;
try
{
intrecvLen = conn.Socket.EndReceive(result);
if (recvLen> 0)
{
IPEndPointremote = (IPEndPoint)conn.Socket.RemoteEndPoint;
stringstr = Encoding.Default.GetString(conn.Buffer, 0, recvLen);
Console.WriteLine("client:" + remote.Address.ToString() + ":" + remote.Port + " says " + str);
conn.Socket.Send(conn.Buffer, recvLen, SocketFlags.None);
conn.Socket.BeginReceive(conn.Buffer, 0, conn.Buffer.Length, SocketFlags.None, newAsyncCallback(ReceiveCallback), conn);
}
else
{
conn.Close();
}
}
catch (Exceptionex)
{
conn.Close();
Console.WriteLine(ex.Message);
}
}
}
1.1 线程独享
[ThreadStatic]
指示静态字段的值对于每个线程都是唯一的
线程中的定时器:
定时执行委托方法,注意没有tick,没有enable,从多长时间以后开始执行委托方法,多久执行一次。
使用CancellationTokenSource取消线程内逻辑,注册取消时的回调方法,使用关联的取消标记源。
staticvoid Main(string[] args)
{
CancellationTokenSource cts1 = newCancellationTokenSource();
CancellationTokenSource cts2 = newCancellationTokenSource();
/**
* 关联取消标记源,cts1 或者 cts2 取消,那么linkedCts就会取消
* 但是 cts1 取消,cts2 并不会关联取消哦
* */
CancellationTokenSourcelinkedCts = CancellationTokenSource.CreateLinkedTokenSource(cts1.Token, cts2.Token);
cts1.Token.Register(() =>Console.WriteLine("cts1 Canceled")); // 取消时回调
cts2.Token.Register(() =>Console.WriteLine("cts2 Canceled"));
linkedCts.Token.Register(() =>Console.WriteLine("linkedCts Canceled"));
ThreadPool.QueueUserWorkItem(o => Count(linkedCts.Token, 5));
cts1.Cancel(); // 执行 cts1 取消
Console.ReadLine();
}
staticvoid Count(CancellationToken token, Int32countTo)
{
for (Int32 count = 0; count <countTo; count++)
{
if (token.IsCancellationRequested)
{
Console.WriteLine("Count is cancelled");
return;
}
Console.WriteLine(count);
Thread.Sleep(2 * 1000);
}
Console.WriteLine("Count is done");
}
.net线程系列(五)线程的状态
/////
ParameterizedThreadStart
TimerCallback
WaitCallback
这3个委托的签名是相同的,原型如下:
publicdelegatevoidParameterizedThreadStart(objectobj);
publicdelegatevoidTimerCallback(object state);
publicdelegatevoidWaitCallback(object state);
// 相应使用代码
staticvoid Main(string[] args)
{
System.Threading.Thread thread = newThread(PrintNumbers); // ParameterizedThreadStart
System.Threading.Timer timer = newTimer(PrintNumbers); // TimerCallback
System.Threading.ThreadPool.QueueUserWorkItem(PrintNumbers); // WaitCallback
}
staticvoidPrintNumbers(objectobj) { }
这些都以异步的方式执行,由windows安排在一个时间点,到底执行谁,如果在多个CPU的电脑上,windows可以安排在同一时间点,在CPU1执行线程1,在CPU2执行线程2
每个线程,都有自己的执行上下文,当在线程1,启动线程2时,线程1的上下文会流动到线程2,在线程2中,如果启动线程3,则会把线程1,线程2的上下文都流动到线程3
这样做是为了安全和实际的需要,但是会影响性能。
我们可以取消这个流动。下面的代码,作为演示(请注意,不要这样传参数)
staticvoid Main(string[] args)
{
System.Runtime.Remoting.Messaging.CallContext.LogicalSetData("Name", "张学友");
ThreadPool.QueueUserWorkItem(state =>
Console.WriteLine("Name1={0}", System.Runtime.Remoting.Messaging.CallContext.LogicalGetData("Name"))
);
ExecutionContext.SuppressFlow(); // 取消流动
ThreadPool.QueueUserWorkItem(state =>
Console.WriteLine("Name2={0}", System.Runtime.Remoting.Messaging.CallContext.LogicalGetData("Name"))
);
ExecutionContext.RestoreFlow(); // 恢复流动
Console.ReadLine();
}
static readonly object obj1 = new object();
static readonly object obj2 = new object();
static void Main(string[] args)
{
Thread thread1 = new Thread(SayHello1);
Thread thread2 = new Thread(SayHello2);
thread1.Start();
thread2.Start();
}
static void SayHello1()
{
lock (obj1)
{
Console.WriteLine("thread1 锁住 obj1,等待获取 obj2 锁");
Thread.Sleep(10);
lock (obj2)
{
Console.WriteLine("hello1");
}
}
}
static void SayHello2()
{
lock (obj2)
{
Console.WriteLine("thread2 锁住 obj2,等待获取 obj1 锁");
lock (obj1)
{
Console.WriteLine("hello2");
}
}
}
**使用Monitor.TryEnter(obj1, TimeSpan.FromSeconds(5))**
**Monitor.Exit(obj1);**
Converting the APM pattern to tasks
Converting the EAP pattern to tasks
Task.WhenAny
Task.WhenAll
线程池,最大线程数,可用线程数:???
工作线程,和IO线程,最大500和1000,最小都是2,
通过获取最大的,和可用的来计算在用的。
Asynchronous Programming Model (APM)
which was historically the first asynchronous programming pattern in .NET
CountdownEvent
ManualResetEvent
ThreadPool.RegisterWaitForSingleObject
1.2 创建IO线程
使用同步,使用异步方式,不采用IO线程,这会有2个问题。
如果采用同步Download,则都在一个线程,那用户需要等待
如果采用异步,不用IO线程,则需要2个线程,任务的线程,默认来源于线程池,这个线程,如果处于等待状态,那其他请求,就无法用这个线程,这是浪费,滥用。
static void Main(string[] args)
{
Task<string> task = DownloadAsync("http://www.e-syl.com");
while (!task.IsCompleted)
{
Thread.Sleep(500);
Console.Write(".");
}
Console.WriteLine("\n部分下载内容\n" + task.Result.Substring(0, 100));
}
static Task<string> DownloadAsync(string url)
{
return Task.Factory.StartNew(() => Download(url));
}
static string Download(string url)
{
WebRequest webRequest = WebRequest.Create(url);
WebResponse webResponse = webRequest.GetResponse();
using (StreamReader streamReader = new StreamReader(webResponse.GetResponseStream()))
{
return streamReader.ReadToEnd();
}
}
使用异步,IO线程
volatile
first, to turn off thread-sensitive JIT compiler optimizations
second, to prevent reordering of write operations
async 目的就是说明 await 在方法中
async 的返回参数,要么是 Task<T> 和 Task,返回 void 的情况,尽量避免
一种同步构造,是让等待的线程,进入阻塞状态,使用最少的CPU,需要上下文切换,这种就是内核模式同步构造,适合等待时间较长的操作。
另一种同步构造,就是等待,这种就需要使用CPU,没有上下文切换,适合非常短时的等待,性能较好,这也称为用户模式同步构造。
第三种,就是混合模式同步构造了,首先使用用户模式同步构造,如果操作时间长了,就使用内核模式同步构造。