如何周期性的对任务节流

2020-04-16  本文已影响0人  gruan

Limit action execution max count per period.

工作中, 有很多地方需要限制一段时间内, 对某个方法的调用.
比如某某API 限制你每分钟只能请求 600次, 怎么办呢?? 一次请求的耗时, 要看对方的响应速度, 服务器的网速, 等多方面的综合因素. 所以不好估算每秒,每分钟我能调用多少次, 只能一次次的去估算, 把并发数调低, 请求间隔调长. 调低, 调长, 会拉长执行间; 调高,调短 会被限制频率, 很是头疼...

疫情过后, 一地鸡毛, 别人复工, 我们轮流在家休假. 上天终于给我安排了时间面对这个问题了.


一地鸡毛

合理安排任务

是周期性的把一批任务压进内存, 还是视任务队列的执行情况, 在决定要压进多少新任务呢??
假设每个任务要执行2秒, 而每1秒可以执行100次, 那么完成这100个任务就需要2秒.
如果不考虑任务的执行情况, 在第二秒的时候, 任务队列里就会有200个任务在处理.
就好比高铁, 每一站下多少人, 就可以在卖几张票, 而不是不管下了多少人, 都按整列车的座位数重新售票.
前一秒的还没有执行完, 后一秒的又进来了, 这是要累死牛的节凑, 人都不干, 更何况机器, 如果真这样, 内存/CPU 迟早要完玩...


抢票

处理并发

秉承最短时间内,做最多的工作的原则, 我们可以认为: 一个周期内, 最大可执行的次数, 就是周期内的最大并发数.
如果想限制最大任务并行数, 可以用 SemaphoreSlimSemaphore .
但是如果想限制一段时间内, 最大任务执行次数, 用 Semaphore 就不好办了, 因为不能确定每个任务啥时间运行完.(欠考虑, 其实是和 BlockingCollection<T> 是一样的,只不过需要手动 release 而已。)
那要怎么限制并发呢??

BlockingCollection<T>

这个是什么不需要我解释, 我们用它来模拟那辆高铁的座位: 位置是有限的, 下去几个人就可以上几个人, 多了上不了,买票请排队. 但是这个对象不能用来存放 Task, 因为哪个 Task 先完成, 哪个 Task 后完成不是可以安排的, 只有当任务完成后, 才能释放出来一个空位, 就如同高铁上的座位只有等乘客下车才能释放出来, 而不是一上车,说我到哪哪下之后就可以释放出来. 所以这个 BlockingCollection<T> 只用来存占位符, 真正的任务队列我们另请高明.
本着先进先出的原则, 我们用 ConcurrentQueue<T> 来存放任务队列.

/// <summary>
/// 用于限速的阻止队列, 如果空不足,插入操作就会等待.
/// </summary>
private readonly BlockingCollection<int> block;

/// <summary>
/// 真正的任务队列
/// </summary>
private readonly ConcurrentQueue<Task> tsks = new ConcurrentQueue<Task>();

有票才能上车

上车的时候, 需要在阻止队列中插入一个占位符, 占位符插入成后, 才能把任务添加到任务队列中.

//占用一个空间, 如果空间占满, 会无限期等待,直至有空间释放出来
this.block.Add(0);

//Console.WriteLine($"{DateTime.Now}..................Add");

//占用一个空间后, 才能将任务插入队列
this.tsks.Enqueue(task);

上车之前, 还要对任务做一下扩展: 下车的时候, 把你占用的位置释放出来

_tsk.ContinueWith(tt =>
{
    //当任务执行完时, 才能阻止队列的一个空间出来,供下一个任务进入
    this.block.TryTake(out _);
    Interlocked.Increment(ref this._totalExecuted);
    //Console.WriteLine($"{DateTime.Now}..................Release");

}, TaskContinuationOptions.AttachedToParent);

帽子戏法

如果一个任务是 Task<Task>Task<Task<T>> , 你一定要对它 Unwrap, 否则一眨眼, 你的任务就执行完了, 下车回头一看, 才发现: 我艹, 我包呢?? 你的包(子任务)还在等待着开往春天的地铁呢.

这车不对劲
if (task is Task<Task> t)
{
    //如果 task 是 task 的嵌套, 需要先解包装 
    _tsk = t.Unwrap();
}

但是 task is Task<Task<>> (语法错误) 还是task is Task<object>(类型不匹配) 呢?? 怎么都不对, 这个时候才发现 C# 太TMD 的不地道了, 这么简单的车, 还需要我在兜一圈.

...
else if (task is IUnwrap tt)
{
    _tsk = tt.GetUnwrapped();
}

...
public interface IUnwrap
{
    Task GetUnwrapped();
}

public class WrapFuncTask<T> : Task<Task<T>>, IUnwrap
{
    ...

    public Task GetUnwrapped()
    {
        return this.Unwrap();
    }
}

差不多了, 开车

先加个开车提醒:

this.timer = new System.Timers.Timer(period.TotalMilliseconds)
{
    AutoReset = true
};
this.timer.Elapsed += Timer_Elapsed;
this.timer.Start();

...
private void Timer_Elapsed(object sender, ElapsedEventArgs e)
{
    this._currentCount = 0;
    this.PeriodNumber++;

    this.TryProcessQueue();
}

开车了....

/// <summary>
/// 
/// </summary>
private void ProcessQueue()
{
    //当 当前计数 小于周期内最大允许的任务数
    //且 任务队列中有任务可以取出来
    while ((this._currentCount < this.MaxCountPerPeriod)
        && tsks.TryDequeue(out Task tsk))
    {
        //Console.WriteLine($"{DateTime.Now}..................Dequeue");
        Interlocked.Increment(ref this._currentCount);
        //执行任务
        tsk.Start();
    }
}
开车啦

戳: 代码 Gist 地址

上一篇下一篇

猜你喜欢

热点阅读