celery使用group或者chord如何实时更新状态进度?

2019-05-02  本文已影响0人  雨夜剪魂

在celery中单任务可以使用self.update_state方法来更新进度的,如下:

self.update_state(task_id = task_id, state = state, meta = meta)

最重要的一个属性就是taskid了,这里可以不写,不写的话默认就是self.request.id自动生成的是当前的id

那么如果是group,chord这样的批量任务产生的多个任务,就有多个任务id,这样就没办法更新了,也没有办法将task id 传到前端来更新进度条了

在搜索这样的解决方案后找到了一个方法.

class progress_chord(chord):

    def __call__(self, body = None, **kwargs):

        _chord = self.type

        body = (body or self.kwargs['body']).clone()

        kwargs = dict(self.kwargs, body=body, **kwargs)

        if _chord.app.conf.CELERY_ALWAYS_EAGER:

            return self.apply((), kwargs)

        # 设置chord自定义类的跟踪id

        callback_id = body.options.setdefault('task_id', uuid())

        r = _chord(**kwargs)

        return _chord.AsyncResult(callback_id), r

这里重新继承了chord,并在body中的options字典中,将task_id 放入了, 这样当我们使用这个类作为默认的celery.chord的功能时候就可以获取到这个task id 了

header = [task.s(url = item['href'], page = item['page'], total =self.total, filename =self.filename)for itemin items]

callback = templink.s(1)

task = progress_chord(group(header))(callback) # callback 是一个回调的celery task任务

在task类中,使用self.request.chord['options']['task_id']来得到id

并使用

self.update_state(task_id = task_id, state = state, meta = meta)

来更新

那么思考下,group的操作可能与chord类似

上一篇 下一篇

猜你喜欢

热点阅读