Task+ConcurrentQueue+Parallel多线程

2019-12-24  本文已影响0人  码嘟嘟

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Collections.Concurrent;

using System.Threading;

using System.Threading.Tasks;

using Common.Tool.Net;

using Common.Tool;

namespace ConsoleApplication1

{

    class Program

    {

        static string path = "D://Logs//" + DateTime.Now.ToString("yyyy-MM-dd") + "//";

        static ConcurrentQueue<int> queue = new ConcurrentQueue<int>();

        static void Main(string[] args)

        {

            TaskHelper.GetTaskInitialBasics<int>(path, queue, IntoQueue, ScanQueue);

        }

        private static void IntoQueue(CancellationToken ct)

        {

            Console.WriteLine("入队线程启动");

            List<int> list = new List<int>();

            for (int i = 1; i <= 10000; i++)

            {

                list.Add(i);

            }

            //使用并行迭代提升速度         

            Parallel.ForEach(list, new ParallelOptions { MaxDegreeOfParallelism = 500 }, l =>

            {

                //入队操作

                queue.Enqueue(l);

            });

            Console.WriteLine("入队线程结束");

            Utils.WriteLog(path, "===========入队线程结束!===========", "Result", false, true);

        }

        private static void ScanQueue(CancellationToken ct)

        {

            ct.ThrowIfCancellationRequested();

            Console.WriteLine("出队线程启动");

            while (true)

            {

                if (!queue.IsEmpty)

                {

                    //从队列中取出 

                    int a = 0;

                    if (queue.TryDequeue(out a))

                    {

                        Console.WriteLine("正在录入数据");

                        Utils.WriteLog(path, a.ToString(), "demo", false, true);

                    }

                }

                else

                {

                    Thread.Sleep(1500);

                }

                Console.WriteLine("队列剩余总数:" + queue.Count);

                ct.ThrowIfCancellationRequested();

            }

        }

    }

}

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading;

using System.Threading.Tasks;

using System.Collections.Concurrent;

namespace Common.Tool.Net

{

    public class TaskHelper

    {

        public delegate void IntoQueueMethod(CancellationToken token);

        public delegate void ScanQueueMethod(CancellationToken token);

        /// <summary>

        /// task初始化基础设置

        /// </summary>

        /// <typeparam name="T"></typeparam>

        /// <param name="logPath"></param>

        /// <param name="queue"></param>

        /// <param name="MakeIntoQueueMethod"></param>

        /// <param name="MakeScanQueueMethod"></param>

        public static void GetTaskInitialBasics<T>(string logPath, ConcurrentQueue<T> queue, IntoQueueMethod MakeIntoQueueMethod, ScanQueueMethod MakeScanQueueMethod)

        {

            CancellationTokenSource token = new CancellationTokenSource();

            var task = Task.Factory.StartNew(() => MakeIntoQueueMethod(token.Token));

            var task1 = Task.Factory.StartNew(() => MakeScanQueueMethod(token.Token));

            Thread.Sleep(1000);

            if (task.IsFaulted)

            {

                /*  循环输出异常    */

                foreach (Exception ex in task.Exception.InnerExceptions)

                {

                    Console.WriteLine(ex.Message);

                    Utils.WriteLog(logPath, ex.Message, "IntoQueueError", false, true);

                }

            }

            if (task1.IsFaulted)

            {

                /*  循环输出异常    */

                foreach (Exception ex in task1.Exception.InnerExceptions)

                {

                    Console.WriteLine(ex.Message);

                    Utils.WriteLog(logPath, ex.Message, "ScanQueueError", false, true);

                }

            }

            Task.WaitAll(task); //等待入队任务结束

            Thread.Sleep(5000);

            while (true)

            {

                if (queue.Count == 0 && queue.IsEmpty) //队列为空

                {

                    token.Cancel(); //发送取消任务指令

                    Console.WriteLine("出队线程结束");

                    Utils.WriteLog(logPath, "===========出队线程结束!===========", "Result", false, true);

                    Console.WriteLine("===========全部录入完成!===========");

                    Utils.WriteLog(logPath, "===========全部录入完成!===========", "Result", false, true);

                    break;

                }

            }

            Thread.Sleep(2000);

            Console.WriteLine("若要结束任务请输入T");

            if (Console.ReadLine() == "T")

            {

                task.Dispose();

                task1.Dispose();

            }

        }

    }

}

上一篇下一篇

猜你喜欢

热点阅读