Web UI界面启动一个Python进程执行长时间后台任务
首先,我需要分解问题。需求分为两个部分:后台的Python脚本和前端Web界面。后台脚本需要能够接收参数,执行某些操作,而Web界面需要让用户输入参数,并调用这个脚本。
对于第一部分,启动进程执行操作。Python中有多种方式可以启动子进程,比如使用subprocess模块或者os.system。但subprocess更推荐,因为它更灵活,可以处理输入输出,并且更安全。需要编写一个函数,接收参数,然后启动子进程执行命令。例如,假设用户的操作是运行一个长时间的任务,比如处理数据或者训练模型,我需要让这个任务在后台运行,而不阻塞Web服务器。
以下是分步实现方案,包括启动子进程的Python代码和基于Flask的Web界面:
一、核心Python功能实现(后台任务)
task_runner.py
# task_runner.py
import subprocess
import logging
from typing import List
# 配置日志只需执行一次(移动到模块顶层)
logging.basicConfig(
filename='task.log',
level=logging.INFO,
encoding='utf-8', # 明确指定日志文件编码
format='%(asctime)s - %(levelname)s - %(message)s'
)
def run_task(params: dict) -> int:
"""
启动子进程执行任务(解决编码问题)
"""
command = [
"python",
"worker.py",
"--input", params["input_file"],
"--output", params["output_dir"]
]
try:
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True, # 自动解码输出
encoding='utf-8', # 强制使用UTF-8解码
errors='replace' # 替换无法解码的字符
)
# 实时捕获输出
with open('task.log', 'a', encoding='utf-8') as log_file:
for line in process.stdout:
cleaned_line = line.strip()
log_file.write(f"{cleaned_line}\n")
logging.info(cleaned_line)
return process.pid
except Exception as e:
logging.error(f"任务启动失败: {str(e)}")
return -1
二、Flask Web界面实现
app.py
# app.py
from flask import Flask, request, render_template
from task_runner import run_task
import threading
app = Flask(__name__)
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST':
# 获取表单参数
params = {
"input_file": request.form.get("input_file"),
"output_dir": request.form.get("output_dir")
}
# 使用线程异步执行任务(避免阻塞Web请求)
thread = threading.Thread(target=run_task, args=(params,))
thread.start()
return "任务已启动!查看日志:<a href='/logs'>task.log</a>"
return render_template('index.html')
@app.route('/logs')
def show_logs():
try:
# 关键修复:指定UTF-8编码打开文件
with open('task.log', 'r', encoding='utf-8', errors='replace') as f:
logs = f.read()
return f"<pre>{logs}</pre>"
except FileNotFoundError:
return "日志文件不存在"
except UnicodeDecodeError as e:
# 二次容错处理
return f"日志解码错误:{str(e)}"
if __name__ == '__main__':
# 强制设置系统编码(Windows特别需要)
import sys
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
app.run(host='0.0.0.0', port=5000, debug=True)
三、HTML模板文件
<!-- templates/index.html -->
<!DOCTYPE html>
<html>
<head>
<title>任务提交界面</title>
<style>
body { max-width: 600px; margin: 20px auto; padding: 20px; }
.form-group { margin: 15px 0; }
label { display: block; margin-bottom: 5px; }
input { width: 100%; padding: 8px; }
button { padding: 10px 20px; background: #007bff; color: white; border: none; }
</style>
</head>
<body>
<h1>任务提交</h1>
<form method="POST">
<div class="form-group">
<label>输入文件路径:</label>
<input type="text" name="input_file" required>
</div>
<div class="form-group">
<label>输出目录:</label>
<input type="text" name="output_dir" required>
</div>
<button type="submit">启动任务</button>
</form>
</body>
</html>
四、示例任务脚本(worker.py)
worker.py(模拟耗时任务)
# worker.py
import time
import argparse
import sys
if __name__ == '__main__':
# 强制标准输出使用UTF-8编码
sys.stdout.reconfigure(encoding='utf-8')
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True)
parser.add_argument("--output", required=True)
args = parser.parse_args()
# 示例输出包含中文(测试编码)
print(f"[{time.ctime()}] 任务启动,参数:")
print(f"输入文件: {args.input}")
print(f"输出目录: {args.output}")
for i in range(1, 11):
time.sleep(1)
print(f"[{time.ctime()}] 处理进度: {i*10}% - 状态✓")
print(f"[{time.ctime()}] 任务完成!")
五、运行与测试
-
创建文件结构:
project/
├── app.py
├── taskrunner.py
├── worker.py
└── templates/ index.html
-
安装依赖:
pip install flask -
启动服务:
python app.py -
访问界面:
打开浏览器访问 http://localhost:5000,输入参数后提交。因2个参数没有真实参与生成和读取动作,页面随便输入2个参数的值。
点击“启动任务”按钮。
查看后台任务执行日志。
六、关键优化点说明
- 安全增强:
- 使用subprocess.Popen的列表参数形式,避免shell注入
- 对用户输入进行合法性校验(示例未展示,建议添加)
- 异步处理:
- 通过threading.Thread实现请求-响应分离
- 避免Web请求被长时间任务阻塞
- 日志追踪:
- 所有任务输出记录到task.log文件
- 提供/logs端点查看实时日志
- 扩展性设计:
- 可替换worker.py为实际业务逻辑
- 支持添加进度查询接口(需结合数据库或状态文件)
效果演示
输入参数提交后,后台将启动独立进程执行任务,用户可立即获得响应,并通过日志页面跟踪任务进度
七、分布式执行后台任务
以上通过提交参数在本地启动子进程执行任务。现在希望将任务分发到其他服务器上执行,正在构建一个分布式任务处理平台希望实现分布式处理。
常见的远程执行方法有SSH连接、使用消息队列(如Celery结合RabbitMQ或Redis)、或者使用专门的分布式任务框架(如Dask、Kubernetes Jobs)。
7.1 方案一:SSH远程执行(直接命令模式)
改造task_runner.py
import paramiko
from io import StringIO
def remote_execute(host: str, username: str, key_path: str, params: dict) -> dict:
"""
通过SSH在远程服务器执行任务
返回格式:{"status": "success/error", "output": "..."}
"""
# 构造远程命令
command = f"python /path/to/worker.py --input {params['input_file']} --output {params['output_dir']}"
try:
# 建立SSH连接
private_key = paramiko.RSAKey.from_private_key_file(key_path)
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname=host, username=username, pkey=private_key)
# 执行远程命令
stdin, stdout, stderr = client.exec_command(command)
output = stdout.read().decode('utf-8')
error = stderr.read().decode('utf-8')
# 返回统一格式
return {
"status": "success" if not error else "error",
"output": output or error
}
except Exception as e:
return {"status": "error", "output": str(e)}
finally:
client.close()
方案二:Celery分布式任务队列
pip install celery redis
创建celery_app.py
from celery import Celery
import subprocess
app = Celery(
'tasks',
broker='redis://:password@remote-server:6379/0',
backend='redis://:password@remote-server:6379/1'
)
@app.task
def remote_task(params: dict, server_ip: str):
# 在指定服务器执行(需预先部署worker)
command = [
"ssh", f"user@{server_ip}",
"python", "/app/worker.py",
"--input", params['input_file'],
"--output", params['output_dir']
]
result = subprocess.run(command, capture_output=True, text=True)
return {
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr
}
Flask集成。
from celery_app import remote_task
@app.route('/distribute', methods=['POST'])
def distribute():
task = remote_task.delay(
params=request.json,
server_ip="192.168.1.100" # 可动态选择服务器
)
return jsonify({"task_id": task.id}), 202
@app.route('/status/<task_id>')
def task_status(task_id):
task = remote_task.AsyncResult(task_id)
return jsonify({
"status": task.status,
"result": task.result
})
方案三:Kubernetes集群部署
创建Job模板(job_template.yaml)
apiVersion: batch/v1
kind: Job
metadata:
name: ml-job-{{ job_id }}
spec:
template:
spec:
containers:
- name: worker
image: your-registry/ml-worker:latest
args:
- "--input=$(INPUT_FILE)"
- "--output=$(OUTPUT_DIR)"
env:
- name: INPUT_FILE
value: "{{ input_file }}"
- name: OUTPUT_DIR
value: "{{ output_dir }}"
restartPolicy: Never
Python控制代码
from kubernetes import client, config
def create_k8s_job(params: dict):
config.load_kube_config(context="remote-cluster") # 加载远程集群配置
job_id = uuid.uuid4().hex
job_manifest = render_template(
"job_template.yaml",
job_id=job_id,
input_file=params['input_file'],
output_dir=params['output_dir']
)
api = client.BatchV1Api()
api.create_namespaced_job(
namespace="default",
body=yaml.safe_load(job_manifest)
)
return job_id
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| SSH直连 | 实现简单,无需额外基础设施 | 安全性低,难以扩展 | 小规模临时任务,内部可信环境 |
| Celery队列 | 支持分布式,有成熟的任务管理机制 | 需要维护Redis/RabbitMQ中间件 | 中等规模集群,需要任务队列管理的场景 |
| Kubernetes | 资源调度能力强,支持自动扩缩容 | 学习曲线陡峭,需要集群环境 | 大规模生产环境,云原生架构 |
| API网关 | 标准化接口,易于集成第三方服务 | 依赖外部API,可能产生调用延迟 | 需要与现有微服务架构整合的场景 |
| 混合方案 | 灵活组合不同方案的优势 | 系统复杂度高 | 异构环境下的复杂任务调度需求 |
八、高级功能扩展
跨服务器文件同步
def sync_files(host: str, local_path: str, remote_path: str):
transport = paramiko.Transport((host, 22))
transport.connect(username='user', pkey=private_key)
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.put(local_path, remote_path)
sftp.close()