python ansible celery 实现任务异步执行(ArsyncResult问题)

网友投稿 1347 2022-10-15

本站部分文章、图片属于网络上可搜索到的公开信息,均用于学习和交流用途,不能代表睿象云的观点、立场或意见。我们接受网民的监督,如发现任何违法内容或侵犯了您的权益,请第一时间联系小编邮箱jiasou666@gmail.com 处理。

python ansible celery 实现任务异步执行(ArsyncResult问题)

项目背景

在公司内部运维自动化平台开发中,使用 ansible + celery 来实现添加主机之后,异步的更新主机的额外信息,比如CPU型号,序列号等等。celery delay执行对应的task任务,在接口 JsonResponse 返回时遇到 报错

Object of type AsyncResult is not JSON serializable

部分代码如

def update_host_extra_info(request, host_name):    .... ...    res = tasks.update_host_info.delay(host_name)    return JsonResponse({"task_result": res})

问题分析

从上面的代码和报错信息,我们可以推断出 AsyncResult 是 celery 的执行结果的返回类型,查看celery相关代码我们发现 AsyncResult 位于 celery.result 中

Demo实操

>>> from celery.result import AsyncResult>>> from demo_ansible import celery_app>>>>>> res = AsyncResult(id="053a9dad-4362-4e6f-8ab6-c4ef2a61e52c", app=celery_app)>>> res.res.TimeoutError(         res.maybe_reraise(res.app                   res.maybe_throw(res.args                  res.nameres.as_list(              res.on_ready(res.as_tuple(             res.parentres.backend               res.queueres.build_graph(          res.ready(res.children              res.resultres.collect(              res.retriesres.date_done             res.revoke(res.failed(               res.stateres.forget(               res.statusres.get(                  res.successful(res.get_leaf(             res.supports_native_joinres.graph                 res.task_idres.id                    res.then(res.ignored               res.throw(res.info                  res.tracebackres.iterdeps(             res.wait(res.kwargs                res.worker>>> res.task_id'053a9dad-4362-4e6f-8ab6-c4ef2a61e52c'>>> res.status'SUCCESS'>>> res.successful()True>>> type(res.info)>>> res.info{'host_name': 'global-nginx-01-vpc', 'host_ip': '172.16.xx.xx', ... ...}]}>>>

从上面我们看到 res.info 的类型是 dict, dict 是可以在 JsonResponse中直接使用的。

所以最开始的代码部分应该修改为:

def update_host_extra_info(request, host_name):    .... ...    res = tasks.update_host_info.delay(host_name)    return JsonResponse({"task_result": res.info})

如果此时去验证的话发现返回的结果是空,为什么呢?

因为任务是异步的,返回的时候任务还没有执行成功,这个时候获取任务结果当然是空的。可以通过在 程序中 debug 输出 res.status(PENDING) 或者 res.successful() (False) 来验证。

问题解决

两种方式,

1、既然是异步执行,那么当前接口只负责调用执行task, 另外新增接口获取celery task执行结果

def update_host_extra_info(request, host_name):    print(host_name)    res = update_host_info.delay(host_name)    return JsonResponse({"task_id": res.task_id, "msg": "update successfully!"})def check_task(request, task_id):    # 单独接口获取task执行结果    task = AsyncResult(id=task_id, app=celery_app)    return JsonResponse({'task_id': task_id, 'result': task.info})

2、如果一定要在一个接口中执行之后等待然后获取结果,可以修改代码为如下:

def update_host_extra_info(request, host_name):    print(host_name)    res = update_host_info.delay(host_name)    print(res, res.status, res.successful())    # 这里执行等待,成功的时候再返回    if res.wait():        return JsonResponse({"task_id": res.task_id, "result": res.info})

上面两种方式,一般是建议第二种,不用等待,实际更新操作异步执行即可。因为不用立马看到执行的结果

上一篇:第37期:适当的使用 MySQL 原生表分区
下一篇:CentOS 7.5 部署蓝鲸运维平台
相关文章

 发表评论

暂时没有评论,来抢沙发吧~