人工智能AI

Web UI界面启动一个Python进程执行长时间后台任务

2025-07-05  本文已影响0人  梅西爱骑车

首先,我需要分解问题。需求分为两个部分:后台的Python脚本和前端Web界面。后台脚本需要能够接收参数,执行某些操作,而Web界面需要让用户输入参数,并调用这个脚本。

对于第一部分,启动进程执行操作。Python中有多种方式可以启动子进程,比如使用subprocess模块或者os.system。但subprocess更推荐,因为它更灵活,可以处理输入输出,并且更安全。需要编写一个函数,接收参数,然后启动子进程执行命令。例如,假设用户的操作是运行一个长时间的任务,比如处理数据或者训练模型,我需要让这个任务在后台运行,而不阻塞Web服务器。

以下是分步实现方案,包括启动子进程的Python代码和基于Flask的Web界面:

一、核心Python功能实现(后台任务)

task_runner.py

# task_runner.py
import subprocess
import logging
from typing import List

# 配置日志只需执行一次(移动到模块顶层)
logging.basicConfig(
    filename='task.log',
    level=logging.INFO,
    encoding='utf-8',  # 明确指定日志文件编码
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def run_task(params: dict) -> int:
    """
    启动子进程执行任务(解决编码问题)
    """
    command = [
        "python",
        "worker.py",
        "--input", params["input_file"],
        "--output", params["output_dir"]
    ]

    try:
        process = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,  # 自动解码输出
            encoding='utf-8',  # 强制使用UTF-8解码
            errors='replace'  # 替换无法解码的字符
        )

        # 实时捕获输出
        with open('task.log', 'a', encoding='utf-8') as log_file:
            for line in process.stdout:
                cleaned_line = line.strip()
                log_file.write(f"{cleaned_line}\n")
                logging.info(cleaned_line)

        return process.pid
    except Exception as e:
        logging.error(f"任务启动失败: {str(e)}")
        return -1

二、Flask Web界面实现

app.py

# app.py
from flask import Flask, request, render_template
from task_runner import run_task
import threading

app = Flask(__name__)


@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'POST':
        # 获取表单参数
        params = {
            "input_file": request.form.get("input_file"),
            "output_dir": request.form.get("output_dir")
        }

        # 使用线程异步执行任务(避免阻塞Web请求)
        thread = threading.Thread(target=run_task, args=(params,))
        thread.start()

        return "任务已启动!查看日志:<a href='/logs'>task.log</a>"

    return render_template('index.html')


@app.route('/logs')
def show_logs():
    try:
        # 关键修复:指定UTF-8编码打开文件
        with open('task.log', 'r', encoding='utf-8', errors='replace') as f:
            logs = f.read()
        return f"<pre>{logs}</pre>"
    except FileNotFoundError:
        return "日志文件不存在"
    except UnicodeDecodeError as e:
        # 二次容错处理
        return f"日志解码错误:{str(e)}"


if __name__ == '__main__':
    # 强制设置系统编码(Windows特别需要)
    import sys
    import io

    sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
    sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')

    app.run(host='0.0.0.0', port=5000, debug=True)

三、HTML模板文件

<!-- templates/index.html -->
<!DOCTYPE html>
<html>
<head>
    <title>任务提交界面</title>
    <style>
        body { max-width: 600px; margin: 20px auto; padding: 20px; }
        .form-group { margin: 15px 0; }
        label { display: block; margin-bottom: 5px; }
        input { width: 100%; padding: 8px; }
        button { padding: 10px 20px; background: #007bff; color: white; border: none; }
    </style>
</head>
<body>
    <h1>任务提交</h1>
    <form method="POST">
        <div class="form-group">
            <label>输入文件路径:</label>
            <input type="text" name="input_file" required>
        </div>
        
        <div class="form-group">
            <label>输出目录:</label>
            <input type="text" name="output_dir" required>
        </div>
        
        <button type="submit">启动任务</button>
    </form>
</body>
</html>

四、示例任务脚本(worker.py)

worker.py(模拟耗时任务)

# worker.py
import time
import argparse
import sys

if __name__ == '__main__':
    # 强制标准输出使用UTF-8编码
    sys.stdout.reconfigure(encoding='utf-8')

    parser = argparse.ArgumentParser()
    parser.add_argument("--input", required=True)
    parser.add_argument("--output", required=True)
    args = parser.parse_args()
    
    # 示例输出包含中文(测试编码)
    print(f"[{time.ctime()}] 任务启动,参数:")
    print(f"输入文件: {args.input}")
    print(f"输出目录: {args.output}")
    
    for i in range(1, 11):
        time.sleep(1)
        print(f"[{time.ctime()}] 处理进度: {i*10}% - 状态✓")
    
    print(f"[{time.ctime()}] 任务完成!")

五、运行与测试

  1. 创建文件结构:
    project/
    ├── app.py
    ├── taskrunner.py
    ├── worker.py
    └── templates/ index.html


  2. 安装依赖:
    pip install flask

  3. 启动服务:
    python app.py

  4. 访问界面:
    打开浏览器访问 http://localhost:5000,输入参数后提交。因2个参数没有真实参与生成和读取动作,页面随便输入2个参数的值。


    点击“启动任务”按钮。

    查看后台任务执行日志。

六、关键优化点说明

  1. 安全增强:
    • 使用subprocess.Popen的列表参数形式,避免shell注入
    • 对用户输入进行合法性校验(示例未展示,建议添加)
  2. 异步处理:
    • 通过threading.Thread实现请求-响应分离
    • 避免Web请求被长时间任务阻塞
  3. 日志追踪:
    • 所有任务输出记录到task.log文件
    • 提供/logs端点查看实时日志
  4. 扩展性设计:
    • 可替换worker.py为实际业务逻辑
    • 支持添加进度查询接口(需结合数据库或状态文件)
      效果演示
      输入参数提交后,后台将启动独立进程执行任务,用户可立即获得响应,并通过日志页面跟踪任务进度

七、分布式执行后台任务

以上通过提交参数在本地启动子进程执行任务。现在希望将任务分发到其他服务器上执行,正在构建一个分布式任务处理平台希望实现分布式处理。

常见的远程执行方法有SSH连接、使用消息队列(如Celery结合RabbitMQ或Redis)、或者使用专门的分布式任务框架(如Dask、Kubernetes Jobs)。

7.1 方案一:SSH远程执行(直接命令模式)

改造task_runner.py

import paramiko
from io import StringIO

def remote_execute(host: str, username: str, key_path: str, params: dict) -> dict:
    """
    通过SSH在远程服务器执行任务
    返回格式:{"status": "success/error", "output": "..."}
    """
    # 构造远程命令
    command = f"python /path/to/worker.py --input {params['input_file']} --output {params['output_dir']}"
    
    try:
        # 建立SSH连接
        private_key = paramiko.RSAKey.from_private_key_file(key_path)
        client = paramiko.SSHClient()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        client.connect(hostname=host, username=username, pkey=private_key)

        # 执行远程命令
        stdin, stdout, stderr = client.exec_command(command)
        output = stdout.read().decode('utf-8')
        error = stderr.read().decode('utf-8')

        # 返回统一格式
        return {
            "status": "success" if not error else "error",
            "output": output or error
        }
    except Exception as e:
        return {"status": "error", "output": str(e)}
    finally:
        client.close()

方案二:Celery分布式任务队列
pip install celery redis
创建celery_app.py

from celery import Celery
import subprocess

app = Celery(
    'tasks',
    broker='redis://:password@remote-server:6379/0',
    backend='redis://:password@remote-server:6379/1'
)

@app.task
def remote_task(params: dict, server_ip: str):
    # 在指定服务器执行(需预先部署worker)
    command = [
        "ssh", f"user@{server_ip}",
        "python", "/app/worker.py",
        "--input", params['input_file'],
        "--output", params['output_dir']
    ]
    result = subprocess.run(command, capture_output=True, text=True)
    return {
        "returncode": result.returncode,
        "stdout": result.stdout,
        "stderr": result.stderr
    }

Flask集成。

from celery_app import remote_task

@app.route('/distribute', methods=['POST'])
def distribute():
    task = remote_task.delay(
        params=request.json, 
        server_ip="192.168.1.100"  # 可动态选择服务器
    )
    return jsonify({"task_id": task.id}), 202

@app.route('/status/<task_id>')
def task_status(task_id):
    task = remote_task.AsyncResult(task_id)
    return jsonify({
        "status": task.status,
        "result": task.result
    })

方案三:Kubernetes集群部署
创建Job模板(job_template.yaml)

apiVersion: batch/v1
kind: Job
metadata:
  name: ml-job-{{ job_id }}
spec:
  template:
    spec:
      containers:
      - name: worker
        image: your-registry/ml-worker:latest
        args:
        - "--input=$(INPUT_FILE)"
        - "--output=$(OUTPUT_DIR)"
        env:
        - name: INPUT_FILE
          value: "{{ input_file }}"
        - name: OUTPUT_DIR
          value: "{{ output_dir }}"
      restartPolicy: Never

Python控制代码

from kubernetes import client, config

def create_k8s_job(params: dict):
    config.load_kube_config(context="remote-cluster")  # 加载远程集群配置
    
    job_id = uuid.uuid4().hex
    job_manifest = render_template(
        "job_template.yaml",
        job_id=job_id,
        input_file=params['input_file'],
        output_dir=params['output_dir']
    )
    
    api = client.BatchV1Api()
    api.create_namespaced_job(
        namespace="default",
        body=yaml.safe_load(job_manifest)
    )
    return job_id
方案 优点 缺点 适用场景
SSH直连 实现简单,无需额外基础设施 安全性低,难以扩展 小规模临时任务,内部可信环境
Celery队列 支持分布式,有成熟的任务管理机制 需要维护Redis/RabbitMQ中间件 中等规模集群,需要任务队列管理的场景
Kubernetes 资源调度能力强,支持自动扩缩容 学习曲线陡峭,需要集群环境 大规模生产环境,云原生架构
API网关 标准化接口,易于集成第三方服务 依赖外部API,可能产生调用延迟 需要与现有微服务架构整合的场景
混合方案 灵活组合不同方案的优势 系统复杂度高 异构环境下的复杂任务调度需求

八、高级功能扩展

跨服务器文件同步

def sync_files(host: str, local_path: str, remote_path: str):
    transport = paramiko.Transport((host, 22))
    transport.connect(username='user', pkey=private_key)
    sftp = paramiko.SFTPClient.from_transport(transport)
    sftp.put(local_path, remote_path)
    sftp.close()
上一篇 下一篇

猜你喜欢

热点阅读