.NET看场电影ndk

64行C# 代码实现异步队列

2018-06-03  本文已影响353人  冰麟轻武

不喜欢看废话的同学直接跳到 看代码

一、

有时候我们会有这样的需求:
一个或多个线程(Senders)向一个队列(FIFO)中写入数据,
另外一个或多个线程(Receivers)从这个队列中取数据,并对数据进行处理或加工
这就是异步队列


图片来自网络

PS:发送者(sender)/接收者(receiver)有时也被叫做生产者(producer)/消费者(consumer )

二、

最近在项目中有使用(本地)异步队列的需求,在网上搜了一圈,都不是很满意:(所以说.NET生态还有待加强)
一种是通过事件机制来触发,这种方式写出的代码比较“分散”,不易阅读,理解和维护,这种当然不能接受啦,
另一种是通过“阻塞”模式来优化代码的可读性,代价就是浪费性能,
拜托,现在都什么年代了,怎么可能阻塞线程呢?当然是使用 C# 5.0 引入的 async/await啦。
因为搜不到,所以只能自己动手了

三、

我们的目标当然是写出这样的代码:

var x = await queue.Dequeue(cancellationToken);

并且内部的实现必须是非阻塞式的,
基于这个目标我们需要知道一个知识点信号量

四、

信号量简单来说就是对一个资源打上一个数字的标记,
这个数字(正数)表示了这个资源可以同时被多少个对象访问,(负数)还有多少个对象需要访问他
打个比方:一支笔,他同时只能被一个人使用,所以我可以初始给他打上一个信号量1
当第一个小朋友来借笔时,首先观察信号量1(大于0),则表示可以将笔(资源)借(分配)给小朋友(对象),并将信号量-1,此时信号量为0
第二个小朋友来借笔时,信号量为0,表示需要等待,并将信号量-1,此时信号量为-1(表示有1个对象正在等待资源释放)
如果这时,第一个小朋友,将笔(资源)归还(释放),则将信号量+1,并将笔借给第二个小朋友,此时信号量为0(表示无等待)
如果在第一个小朋友还没有将笔归还之前,第二个小朋友表示不愿意再等了,则信号量也-1

例子2:
一个小游泳池,可以同时允许10个人一起下水,则初始信号量为10
第一个人来,信号量-1,得到9,大于等于0,表示可以进去玩
第二人人来,信号量-1,得到8,大于等于0,表示可以进去玩
......
第十个人来,信号量-1,得到0,大于等于0,表示可以进去玩
第十一个人来,信号量-1,得到-1,小于0,表示需要等待
第十二个人来,信号量-1,得到-2,小于0,表示需要等待
第十三个人来,信号量-1,得到-3,小于0,表示需要等待
第一个人走了,信号量+1,将第十个人放进去,信号量等于-2,有2个人在等待
第十二个人走了,信号量+1,信号量等于-1,有1个人在等待

与信号量的处理相关的还有一个PV操作,了解一下

五、

在C#中有专门用于解决信号量问题的类:SemaphoreSemaphoreSlim

Semaphore:限制可同时访问某一资源或资源池的线程数。
SemaphoreSlim:对可同时访问资源或资源池的线程数加以限制的 System.Threading.Semaphore 的轻量替代。

这里我选择更轻量的SemaphoreSlim来实现,他的用法也非常简单

var s = new SemaphoreSlim(1);         // 计数器初始值1
await s.WaitAsync(cancellationToken); // 计数器-1,如果计数不足则等待(这个类的设计是计数到0就不会再减少了)
s.Release();                          // 计数器+1

下面就开始实现一个异步队列

六、

先定义一个异步队列的接口

// 异步队列接口
public interface IAsyncQueue<T>: IDisposable
{
    // 清空队列。
    Task Clear(CancellationToken token);
    // 移除并返回位于队列开始处的对象。
    Task<T> Dequeue(CancellationToken token);
    // 将对象添加到队列的结尾处。
    Task Enqueue(T item, CancellationToken token);
}

定义接口的好处是为了方便写扩展方法和以后对实现的修改

七、

定义信号量
从接口中可以看出,入和出2个操作都是异步的,所以需要定义2个信号量

private readonly SemaphoreSlim _in = new SemaphoreSlim(1);
private readonly SemaphoreSlim _out = new SemaphoreSlim(0);

入操作的信号量初始值是1,表示允许1个并发执行
出操作的信号量初始值是0,因为出操作的信号量是根据队列中的元素个数来决定的,初始队列元素个数为0

定义一个内部队列,用于实现队列的基本操作

private readonly Queue<T> _queue = new Queue<T>();

实现类定义:

// 异步消息队列实现
sealed class AsyncQueue<T> : IAsyncQueue<T>
{
    // 内部队列实例
    private readonly Queue<T> _queue = new Queue<T>();
    // 入操作信号量
    private readonly SemaphoreSlim _in = new SemaphoreSlim(1);
    // 出操作信号量
    private readonly SemaphoreSlim _out = new SemaphoreSlim(0);

    public Task Clear(CancellationToken token) => throw new NotImplementedException();
    public Task<T> Dequeue(CancellationToken token) => throw new NotImplementedException();
    public Task Enqueue(T item, CancellationToken token) => throw new NotImplementedException();
    public void Dispose() => throw new NotImplementedException();
}

八、

入(Enqueue)操作

public async Task Enqueue(T item, CancellationToken token)
{
    await _in.WaitAsync(token); // 入操作信号量-1,并发时等待,只允许一个线程操作
    try
    {
        _queue.Enqueue(item);   // 将对象放入队列
        _out.Release();         // “出”操作信号量+1
    }
    finally
    {
        _in.Release();          // 如果Wait操作完成,则必须将信号量施放
    }
}

出(Dequeue)操作

public async Task<T> Dequeue(CancellationToken token)
{
    await _out.WaitAsync(token);  // 同上,出操作比较简单就不赘述了
    return _queue.Dequeue();
}

清空(Clear)操作

public async Task Clear(CancellationToken token)
{
    await _in.WaitAsync(token);  // 先占中入操作的资源,防止操作中插入新的对象
    try
    {
        // 循环调用出操作的Wait,将信号量减为0
        // WaitAsync(100)表示每次操作等待100毫秒,为了防止另一个线程将`_out`的最后一个资源抢先领取后,清空操作无限期等待
        while (await _out.WaitAsync(100) || _out.CurrentCount > 0) 
        {
        }
        _queue.Clear();
    }
    finally
    {
        _in.Release();
    }
}

九、
完整代码:

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace blqw
{
    sealed class AsyncQueue<T> : IAsyncQueue<T>
    {
        private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
        private readonly SemaphoreSlim _in = new SemaphoreSlim(1);
        private readonly SemaphoreSlim _out = new SemaphoreSlim(0);

        public async Task Clear(CancellationToken token)
        {
            await _in.WaitAsync(token);
            try
            {
                while (await _out.WaitAsync(100) || _out.CurrentCount > 0)
                {
                    _queue.TryDequeue(out _);
                }
            }
            finally
            {
                _in.Release();
            }
        }

        public async Task<T> Dequeue(CancellationToken token)
        {
            await _out.WaitAsync(token);
            return _queue.TryDequeue(out var val) ? val : throw new System.InvalidOperationException();
        }

        public async Task Enqueue(T item, CancellationToken token)
        {
            await _in.WaitAsync(token);
            try
            {
                _queue.Enqueue(item);
                _out.Release();
            }
            finally
            {
                _in.Release();
            }
        }

        void DisposeSemaphoreSlim(SemaphoreSlim ss)
        {
            try
            {
                ss.Dispose();
            }
            catch { }
        }

        public void Dispose()
        {
            DisposeSemaphoreSlim(_in);
            DisposeSemaphoreSlim(_out);
        }
    }
}

64行

十、

工厂类

/// <summary>
/// 异步队列
/// </summary>
public static class AsyncQueue
{
    public static IAsyncQueue<T> Create<T>() => new AsyncQueue<T>();
}

不直接公开 AsyncQueue<T> 是考虑到以后方便替换实现类

拓展类

public static class AsyncQueueExtensions
{
    public static Task Clear<T>(this IAsyncQueue<T> aq) => aq.Clear(CancellationToken.None);

    public static Task Clear<T>(this IAsyncQueue<T> aq, int millisecondsTimeout)
    {
        var source = new CancellationTokenSource(millisecondsTimeout);
        return aq.Clear(source.Token).ContinueWith(t => source.Dispose());
    }

    public static Task Clear<T>(this IAsyncQueue<T> aq, TimeSpan timeout)
    {
        var source = new CancellationTokenSource(timeout);
        return aq.Clear(source.Token).ContinueWith(t => source.Dispose());
    }

    public static Task<T> Dequeue<T>(this IAsyncQueue<T> aq) => aq.Dequeue(CancellationToken.None);

    public static async Task<T> Dequeue<T>(this IAsyncQueue<T> aq, int millisecondsTimeout)
    {
        using (var source = new CancellationTokenSource(millisecondsTimeout))
        {
            return await aq.Dequeue(source.Token);
        }
    }

    public static async Task<T> Dequeue<T>(this IAsyncQueue<T> aq, TimeSpan timeout)
    {
        using (var source = new CancellationTokenSource(timeout))
        {
            return await aq.Dequeue(source.Token);
        }
    }

    public static Task Enqueue<T>(this IAsyncQueue<T> aq, T item) => aq.Enqueue(item, CancellationToken.None);

    public static Task Enqueue<T>(this IAsyncQueue<T> aq, T item, int millisecondsTimeout)
    {
        var source = new CancellationTokenSource(millisecondsTimeout);
        return aq.Enqueue(item, source.Token).ContinueWith(t => source.Dispose());
    }

    public static Task Enqueue<T>(this IAsyncQueue<T> aq, T item, TimeSpan timeout)
    {
        var source = new CancellationTokenSource(timeout);
        return aq.Enqueue(item, source.Token).ContinueWith(t => source.Dispose());
    }


    public static async Task EnqueueRange<T>(this IAsyncQueue<T> aq, IEnumerable<T> items)
    {
        if (items != null)
        {
            foreach (var item in items)
            {
                await aq.Enqueue(item, CancellationToken.None);
            }
        }
    }

    public static async Task EnqueueRange<T>(this IAsyncQueue<T> aq, IEnumerable<T> items, int millisecondsTimeout)
    {
        if (items != null)
        {
            using (var source = new CancellationTokenSource(millisecondsTimeout))
            {
                foreach (var item in items)
                {
                    await aq.Enqueue(item, CancellationToken.None);
                }
            }
        }
    }

    public static async Task EnqueueRange<T>(this IAsyncQueue<T> aq, IEnumerable<T> items, TimeSpan timeout)
    {
        if (items != null)
        {
            using (var source = new CancellationTokenSource(timeout))
            {
                foreach (var item in items)
                {
                    await aq.Enqueue(item, CancellationToken.None);
                }
            }
        }
    }
}

十一、
现在来测试一下
为了方便观察测试结果,先写一个将结果改为彩色的类,并且是异步的,不影响测试代码

static class ColorConsole
{
    public static void WriteLine(string value, ConsoleColor? backgroundColor = null, ConsoleColor? foregroundColor = null)
    {
        Task.Run(() =>
        {
            lock (typeof(Console))
            {
                Console.ResetColor();
                if (backgroundColor != null)
                {
                    Console.BackgroundColor = backgroundColor.Value;
                }
                if (foregroundColor != null)
                {
                    Console.ForegroundColor = foregroundColor.Value;
                }
                Console.WriteLine(value);
            }
        });
    }
}

发送者:

class Sender
{
    private readonly int _index;
    private readonly IAsyncQueue<string> _queue;
    private readonly ConsoleColor _background;

    public Sender(int index, IAsyncQueue<string> queue, ConsoleColor background)
    {
        _index = index;
        _queue = queue ?? throw new ArgumentNullException(nameof(queue));
        _background = background;
    }

    public async Task Send(string message)
    {
        ColorConsole.WriteLine($"{_index}号发送者写入{message}", backgroundColor: _background);
        await Task.Delay(100 + Math.Abs(new object().GetHashCode() % 300)); // 加入延迟模拟实际场景
        await _queue.Enqueue(message);  // 关键代码
    }
}

接收者

class Receiver
{
    private readonly int _index;
    private readonly IAsyncQueue<string> _queue;
    private readonly ConsoleColor _foreground;

    public Receiver(int index, IAsyncQueue<string> queue, ConsoleColor foreground)
    {
        _index = index;
        _queue = queue ?? throw new ArgumentNullException(nameof(queue));
        _foreground = foreground;
    }

    public async Task Receive(CancellationToken token)
    {
        try
        {
            while (true)
            {
                var str = await _queue.Dequeue(token); // 关键代码
                ColorConsole.WriteLine($"{_index}号接收者获取到:{str}", foregroundColor: _foreground);
                await Task.Delay(100 + Math.Abs(new object().GetHashCode() % 300)); // 加入延迟,模拟实际场景
            }
        }
        catch (OperationCanceledException)
        {
            ColorConsole.WriteLine($"{_index}号接收者关闭", foregroundColor: _foreground);
        }
    }
}

测试类

static void Main(string[] args)
{
    var queue = AsyncQueue.Create<string>(); // 初始化异步队列
    var source = new CancellationTokenSource(); // 初始化取消标志
    var token = source.Token; 
    var senders = Enumerable.Range(0, 3).Select(index => new Sender(index, queue, (ConsoleColor)(index+13))).ToArray(); // 初始化3个发送者
    var receivers = Enumerable.Range(0, 10).Select(index => new Receiver(index, queue, (ConsoleColor)(index + 5))).ToArray(); // 初始化10个接收者

    Parallel.ForEach(receivers, async x => await x.Receive(token)); // 并行启动10个接收者

    Thread.Sleep(1000); // 延迟1秒 等待接收者全部启动完成
    var message = 0;
    // 并行启动3个发送者,每个发送者发送10次,发送内容为从1开始自增的整型数字,也就是1~30
    Parallel.ForEach(senders, async x =>
    {
        for (var i = 0; i < 10; i++)
        {
            await x.Send(Interlocked.Increment(ref message).ToString());
        }
    });

    Console.ReadLine();
    source.Cancel(); // 停止所有接收者
    Console.ReadLine();
}

十二、

由于整个过程都是异步的,所以打印结果并不会是顺序的

运行效果

十三、

github
nuget

十四、

如果可以帮到你,别忘了帮我点一下喜欢,让更多的人看到


上一篇下一篇

猜你喜欢

热点阅读