Task+ConcurrentQueue+Parallel多线程
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using Common.Tool.Net;
using Common.Tool;
namespace ConsoleApplication1
{
class Program
{
static string path = "D://Logs//" + DateTime.Now.ToString("yyyy-MM-dd") + "//";
static ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
static void Main(string[] args)
{
TaskHelper.GetTaskInitialBasics<int>(path, queue, IntoQueue, ScanQueue);
}
private static void IntoQueue(CancellationToken ct)
{
Console.WriteLine("入队线程启动");
List<int> list = new List<int>();
for (int i = 1; i <= 10000; i++)
{
list.Add(i);
}
//使用并行迭代提升速度
Parallel.ForEach(list, new ParallelOptions { MaxDegreeOfParallelism = 500 }, l =>
{
//入队操作
queue.Enqueue(l);
});
Console.WriteLine("入队线程结束");
Utils.WriteLog(path, "===========入队线程结束!===========", "Result", false, true);
}
private static void ScanQueue(CancellationToken ct)
{
ct.ThrowIfCancellationRequested();
Console.WriteLine("出队线程启动");
while (true)
{
if (!queue.IsEmpty)
{
//从队列中取出
int a = 0;
if (queue.TryDequeue(out a))
{
Console.WriteLine("正在录入数据");
Utils.WriteLog(path, a.ToString(), "demo", false, true);
}
}
else
{
Thread.Sleep(1500);
}
Console.WriteLine("队列剩余总数:" + queue.Count);
ct.ThrowIfCancellationRequested();
}
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
namespace Common.Tool.Net
{
public class TaskHelper
{
public delegate void IntoQueueMethod(CancellationToken token);
public delegate void ScanQueueMethod(CancellationToken token);
/// <summary>
/// task初始化基础设置
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="logPath"></param>
/// <param name="queue"></param>
/// <param name="MakeIntoQueueMethod"></param>
/// <param name="MakeScanQueueMethod"></param>
public static void GetTaskInitialBasics<T>(string logPath, ConcurrentQueue<T> queue, IntoQueueMethod MakeIntoQueueMethod, ScanQueueMethod MakeScanQueueMethod)
{
CancellationTokenSource token = new CancellationTokenSource();
var task = Task.Factory.StartNew(() => MakeIntoQueueMethod(token.Token));
var task1 = Task.Factory.StartNew(() => MakeScanQueueMethod(token.Token));
Thread.Sleep(1000);
if (task.IsFaulted)
{
/* 循环输出异常 */
foreach (Exception ex in task.Exception.InnerExceptions)
{
Console.WriteLine(ex.Message);
Utils.WriteLog(logPath, ex.Message, "IntoQueueError", false, true);
}
}
if (task1.IsFaulted)
{
/* 循环输出异常 */
foreach (Exception ex in task1.Exception.InnerExceptions)
{
Console.WriteLine(ex.Message);
Utils.WriteLog(logPath, ex.Message, "ScanQueueError", false, true);
}
}
Task.WaitAll(task); //等待入队任务结束
Thread.Sleep(5000);
while (true)
{
if (queue.Count == 0 && queue.IsEmpty) //队列为空
{
token.Cancel(); //发送取消任务指令
Console.WriteLine("出队线程结束");
Utils.WriteLog(logPath, "===========出队线程结束!===========", "Result", false, true);
Console.WriteLine("===========全部录入完成!===========");
Utils.WriteLog(logPath, "===========全部录入完成!===========", "Result", false, true);
break;
}
}
Thread.Sleep(2000);
Console.WriteLine("若要结束任务请输入T");
if (Console.ReadLine() == "T")
{
task.Dispose();
task1.Dispose();
}
}
}
}