python 脚本辅助运行脚本
2020-11-19 本文已影响0人
Lupino
在一个 python 项目中我们会写很多脚本,不管是一次性的还是定期运行的,尤其在子目录里面,常规添加 path 到头上,如下:
import sys
sys.path.append('..')
动态导入模块
每次加这个也挺麻烦的。
我们是不是有更好的方法,答案是有的。
我们看看python是如何动态导入库的
from importlib import import_module
module = import_module('scripts.run_once')
我们从 importlib
引入一个叫 import_module
的函数,
然后导入 scripts.run_once
这个模块。
我们可以通过 sys.argv
来获取模块名字
import sys
from importlib import import_module
module = import_module(sys.argv[1])
保存为文件 script.py
, 然后运行 python script.py scripts.run_once
我们约定每个脚本以 main
作为入口。
module.main(*sys.argv[2:])
接受模块文件路径
我们让 script.py
接受文件路径如 python script.py scripts/run_once.py
我们需要把文件名变成模块名字
import sys
from importlib import import_module
import os
import os.path
def fixed_module_name(module_name):
if os.path.isfile(module_name):
if module_name.endswith('.py'):
module_name = module_name[:-3]
if module_name.startswith('./'):
module_name = module_name[2:]
return module_name.replace('/', '.')
return module_name
module = import_module(fixed_module_name(sys.argv[1]))
module.main(*sys.argv[2:])
更完善的辅助运行脚本
在正常使用过程中,我们的脚本,可以是同步的,可以是异步,可以是单进程的,也可以是多进程的,脚本可以接收参数,辅助脚本也要参数,下面是我在生产环境上用的启动脚本。
脚本需要根据实际情况添加更多新初始化功能。
import asyncio
from importlib import import_module
import os
import os.path
import argparse
from multiprocessing import Process
import sys
import logging
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s"
formatter += " - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)
logger = logging.getLogger(__name__)
def fixed_module_name(module_name):
if os.path.isfile(module_name):
if module_name.endswith('.py'):
module_name = module_name[:-3]
if module_name.startswith('./'):
module_name = module_name[2:]
return module_name.replace('/', '.')
return module_name
def start(module_name, argv, process_id=None):
logger.info('Start running module {}'.format(module_name))
module = import_module(fixed_module_name(module_name))
if process_id is not None:
os.environ['PROCESS_ID'] = str(process_id)
if hasattr(module, 'parse_args'):
argv = [module.parse_args(argv)]
if asyncio.iscoroutinefunction(module.main):
loop = asyncio.get_event_loop()
task = module.main(*argv)
if getattr(module, 'run_forever', False):
loop.create_task(task)
loop.run_forever()
else:
loop.run_until_complete(task)
else:
module.main(*argv)
logger.info('Finish running module {}'.format(module_name))
def main(script, *argv):
parser = argparse.ArgumentParser(description='Prepare and Run command.')
parser.add_argument('-p',
'--processes',
dest='processes',
default=1,
type=int,
help='process size. default is 1')
parser.add_argument('module_name',
type=str,
help='module name or module file')
parser.add_argument('argv', nargs='*', help='module arguments')
script_argv = []
is_module_argv = False
module_argv = []
argv = list(argv)
argv.reverse()
while True:
if len(argv) == 0:
break
arg = argv.pop()
if is_module_argv:
module_argv.append(arg)
else:
script_argv.append(arg)
if arg.startswith('-'):
if arg.find('=') == -1:
script_argv.append(argv.pop())
else:
is_module_argv = True
args = parser.parse_args(script_argv)
if args.processes > 1:
processes = []
for i in range(args.processes):
p = Process(target=start,
args=(args.module_name, module_argv, i + 1))
p.start()
processes.append(p)
for p in processes:
p.join()
else:
start(args.module_name, module_argv)
if __name__ == '__main__':
main(*sys.argv)