bifangback

django异步视图在bifangback项目中的应用

2021-01-17  本文已影响0人  万州客

只能算是捣鼓出来一个可执行流程。更多的优化在后面~

一,前端参数

{
    "app_name": "go-demo",
    "env_name":"dev",
    "release_name": "20210117132941344089GA",
    "deploy_type": "deploy",
    "op_type": "deploy",
    "target_list": ["192.168.1.211", "192.168.1.212"]
}

二,django的路由

from django.urls import path
from . import deploy_views
from django.views.decorators.csrf import csrf_exempt

app_name = "deploy"

urlpatterns = [
    # csrf_exempt用在这里,deploy异步视图才会生效,不然,得注释settings里的CsrfViewMiddleware
    path('deploy/', csrf_exempt(deploy_views.deploy), name='deploy'),
]

三,主要异步处理函数视图

import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
from asgiref.sync import async_to_sync
from cmdb.models import App
from utils.saltstack import salt_cmd
from cmdb.models import Env
from cmdb.models import Release
from .serializers import DeploySerializer
from utils.ret_code import *


@async_to_sync
# django异步视图
async def deploy(request):
    if request.method == 'POST':

        req_data = json.loads(request.body.decode('utf-8'))
        # 序列化前端数据,并判断是否有效
        serializer = DeploySerializer(data=req_data)
        if serializer.is_valid():
            ser_data = serializer.validated_data
            app_name = ser_data['app_name']
            release_name = ser_data['release_name']
            env_name = ser_data['env_name']
            # op_type用于定义是部署应用,还是服务器启停,deploy_type用于定义操作指令
            op_type = ser_data['op_type']
            deploy_type = ser_data['deploy_type']
            target_list = ser_data['target_list']

            if deploy_type == 'deploy' and op_type == 'deploy':
                action_list = ['fetch', 'stop', 'stop_status', 'deploy', 'start', 'start_status', 'health_check']
            elif deploy_type == 'rollback' and op_type == 'deploy':
                action_list = ['stop', 'stop_status', 'rollback', 'start', 'start_status', 'health_check']
            elif deploy_type == 'stop' and op_type == 'maintenance':
                action_list = ['stop', 'stop_status']
            elif deploy_type == 'start' and op_type == 'maintenance':
                action_list = ['start', 'start_status']
            elif deploy_type == 'restart' and op_type == 'maintenance':
                action_list = ['stop', 'stop_status', 'start', 'start_status']
            else:
                pass

            loop = asyncio.get_event_loop()
            loop.create_task(thread_async(action_list, env_name, app_name, release_name, target_list))

            return_dict = build_ret_data(OP_SUCCESS, 'success')
            return render_json(return_dict)
        else:
            return_dict = build_ret_data(THROW_EXP, '序列化条件不满足')
            return render_json(return_dict)


# 异步任务
async def thread_async(action_list, env_name, app_name, release_name, target_list):

    try:
        await asyncio.sleep(1)
        for action in action_list:
            print('action: ', action)
            # 多线程版本,应用为IO密集型,适合threading模式
            executor = ThreadPoolExecutor()
            for data in executor.map(cmd_run, [env_name], [app_name], [release_name], [target_list], [action]):
                if not data:
                    print('data_false: ', data)
                    return_dict = build_ret_data(THROW_EXP, action)
                    return render_json(return_dict)
                print('data_true: ', data)
        print("finish: ", action_list, env_name, app_name, release_name, target_list)
    except asyncio.CancelledError:
        print('Cancel the future.')
    except Exception as e:
        print(e)


# cmd_run函数是在每一个线程当中运行的
def cmd_run(env_name, app_name, release_name, target_list, action):
    print(env_name, app_name, release_name, target_list, action, "@@@@@@@@@")
    env = Env.objects.get(name=env_name)
    app = App.objects.get(name=app_name)
    release = Release.objects.get(name=release_name)

    salt_url = env.salt.salt_url
    salt_user = env.salt.salt_user
    salt_pwd = env.salt.salt_pwd
    eauth = env.salt.eauth

    deploy_script_url = release.deploy_script_url
    zip_package_name = app.zip_package_name
    zip_package_url = release.zip_package_url
    service_port = app.service_port

    ret = salt_cmd(salt_url, salt_user, salt_pwd, eauth,
                   target_list, deploy_script_url,
                   app_name, release, env, action,
                   zip_package_name, zip_package_url,
                   service_port)
    time.sleep(1)
    for server in ret:
        for ip, detail in server.items():
            print('ip: ', ip)
            print('retcode: ', detail['retcode'])
            print('stdout: ', detail['stdout'])
            print('stderr: ', detail['stderr'])
            print('pid: ', detail['pid'])
    return True

这里达到的效果是,当前端传递到deploy之后,deploy能作好自己本分的数据处理之后,很快返回给前端。
但同时,deploy能调用thread_async这个异步任务,开始处理之前要用异步队列才能干的事。
这样一事,我就可以通过deploy这个API提交任务,其不会阻塞,很快返回,然后,使用deploy_status这样的API获取长时间任务执行的进度,而构架又不用扩张。

上一篇 下一篇

猜你喜欢

热点阅读