python调用系统命令的简单小案例
2023-11-04 本文已影响0人
Joening
subprocess小案例
调用命令
import subprocess
def run_cmd(cmd):
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, error = process.communicate()
out = out.decode('utf-8')
error = error.decode('utf-8')
r = process.returncode
return r, str(out), str(error)
def command():
cmd = "docker images "
r, out, error = run_cmd(cmd)
if r == 0:
print(out)
else:
print(error)
if __name__ == '__main__':
command()
push chart 小案例
import os
import sys
import subprocess
def run_cmd(cmd):
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, error = process.communicate()
r = process.returncode
return r, out, error
def for_chart_name(chart_file_name):
if not os.path.exists(chart_file_name):
exit(1)
with open(chart_file_name, mode='rt') as chart_name:
chart_lst = list()
for line in chart_name:
if line.startswith("#") or not line:
continue
line = line.strip()
chart_lst.append(line)
return chart_lst
def command():
chart_names = for_chart_name(chart_file_name)
for chart_name in chart_names:
error_list = list()
chartName = chart_name.split(',')[0]
chartVersion = chart_name.split(',')[-1]
helmpull_cmd = "helm pull {}/{} --version {}".format("itg", chartName, chartVersion)
r, out, error = run_cmd(helmpull_cmd)
if r == 0:
print("{} pull ok".format(chartName))
else:
print("{} pull error 失败原因是: {}".format(chartName, error.decode('utf-8')))
error_list.append(error.decode('utf-8'))
exit(-1)
return error_list
if __name__ == '__main__':
basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
chart_file = "chart.txt"
chart_file_name = os.path.join(basedir, chart_file)
ret = command()
print(ret)
import subprocess
import os
images = '''
flannel/flannel:v0.21.5
flannel/flannel-cni-plugin:v1.1.2
registry.aliyuncs.com/google_containers/kube-apiserver:v1.23.5
registry.aliyuncs.com/google_containers/kube-proxy:v1.23.5
registry.aliyuncs.com/google_containers/kube-controller-manager:v1.23.5
registry.aliyuncs.com/google_containers/kube-scheduler:v1.23.5
registry.aliyuncs.com/google_containers/etcd:3.5.1-0
registry.aliyuncs.com/google_containers/coredns:v1.8.6
registry.aliyuncs.com/google_containers/pause:3.6
'''
dest_dir = "/tmp/qiaoning"
def run_cmd(cmd):
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, error = process.communicate()
r = process.returncode
return r, out, error
def image(images, dest_dir):
image_list = []
for imagename in images.split():
imagename = imagename.strip()
image_list.append(imagename)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)
if len(os.listdir(dest_dir)) > 1:
print("请清理磁盘{}目录下的内容".format(dest_dir))
exit(1)
return image_list, dest_dir
def command():
image_name, destdir = image(images, dest_dir)
for name in image_name:
docker_pull_image_cmd = 'docker pull {}'.format(name)
r, out, error = run_cmd(docker_pull_image_cmd)
if r != 0:
print(name, '错误原因是:', error.encode('utf-8'))
newname = name.split('/')[-1].replace(':', '-')
docker_save_image_cmd = 'docker save {} | gzip > {}/{}.tar.gz'.format(name, destdir, newname)
r, out, error = run_cmd(docker_save_image_cmd)
if r != 0:
print(name, '错误原因是:', error.encode('utf-8'))
if __name__ == '__main__':
command()
小案例
import subprocess
import os
import argparse
def commad(cmd):
porcess = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, error = porcess.communicate()
r = porcess.returncode
return r, out, error
def check_directory(dest_directory):
if not os.path.exists(dest_directory):
os.mkdir(dest_directory)
if len(os.listdir(dest_directory)) > 1:
print(dest_directory, "目录内容请清理")
exit(2)
return dest_directory
def get_image_file_list(image_file_svc):
if not os.path.exists(image_file_svc):
print(image_file_svc, 'no such file or directory')
exit(1)
lst = list()
with open(image_file_svc, mode='rt', encoding='utf-8') as file_name:
for item in file_name:
if item.startswith('#') or not item:
continue
item = item.strip()
lst.append(item)
return lst
def pull_image(dockerurl, username, password):
print("[*** 检查docker二进制命令是否存在 ***]")
images_list = get_image_file_list(image_file_svc)
check_docker = "which %s" % 'docker'
r, out, error = commad(check_docker)
if r != 0:
print('docker is not command')
exit(1)
print("[*** 登录docker私有仓库地址 地址为 %s ***]" % dockerurl)
docker_login = f'docker login --username "{username}" --password "{password}" {dockerurl}'
print(docker_login)
# r, out, error = commad(docker_login)
# if r != 0:
# print('docker login error 错误原因是:', error.decode('utf-8'))
# exit(1)
print("[*** 开始pull镜像 ***]")
for images in images_list:
docke_pull_cmd = "docker pull {}".format(images)
r, out, error = commad(docke_pull_cmd)
if r == 0:
print(images, 'is pull ok')
else:
print('错误原因是:', error.decode('utf8'))
def save_image():
dest_dir = check_directory(dest_directory)
image_list = get_image_file_list(image_file_svc)
print('[*** 开始打包镜像 打包到 {} ***]'.format(dest_dir))
for image in image_list:
new_image = image.split('/')[-1].replace(':', '-')
docker_tag = f'docker save {image} | gzip > {dest_dir}{new_image}.tar.gz'
r, out, error = commad(docker_tag)
if r == 0:
print(image, "save ok")
else:
print(image, 'save 失败原因是:', error.decode('utf-8'))
if __name__ == '__main__':
argars = argparse.ArgumentParser(description="this is python file")
argars.add_argument('--image_file_svc', required=True, help='--image_file_svc')
argars.add_argument('--username', required=True, help='--username')
argars.add_argument('--password', required=True, help='--password')
argars.add_argument('--dockerurl', required=True, help='--dockerurl')
argars.add_argument('--dest_directory', required=True, help='--dest_directory')
argars = argars.parse_args()
username = argars.username
password = argars.password
dockerurl = argars.dockerurl
dest_directory = argars.dest_directory
image_file_svc = argars.image_file_svc
print('========================================= 开始执行 ==============================')
check_directory(dest_directory)
pull_image(dockerurl, username, password)
save_image()
print('========================================= 执行结束 ===============================')
改tag 推送镜像小案例
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import os
import subprocess
import sys
def run_cmd(cmd):
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, error = process.communicate()
r = process.returncode
return r, str(out), str(error)
def for_image_list(image_svc):
with open(image_svc, mode='rt') as images:
for image in images:
image = image.strip()
new_image = image.replace(old_suffix.strip(), new_suffix.strip())
new_image = new_image.strip()
print("docker tag {} {}".format(image, new_image))
docker_tag_cmd = "docker tag {} {}".format(image, new_image)
r, out, error = run_cmd(docker_tag_cmd)
if r == 0:
print("{}执行成功".format(image))
else:
print("{}执行失败 错误原因是: {}".format(image, error))
exit(1)
print("=========================== 推送镜像 {}========================".format(new_image))
print("docker push {}".format(new_image))
docker_push_cmd = "docker push {}".format(new_image)
r, out, error = run_cmd(docker_push_cmd)
if r == 0:
print("{} 推送成功".format(new_image))
else:
print("{} 推送失败".format(new_image))
exit(1)
if __name__ == '__main__':
basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
imageslist = "imagelist.txt"
image_csv = os.path.join(basedir, imageslist)
old_suffix = "hub-vpc.jdcloud.com"
new_suffix = "xxxxx"
for_image_list(image_csv)
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import sys
import subprocess
import argparse
def run_cmd(cmd):
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, error = process.communicate()
r = process.returncode
return r, str(out), str(error)
def for_image_list(image_csv):
if not os.path.exists(image_csv):
print("{}文件不存在".format(image_csv))
exit(1)
image_list = list()
with open(image_csv, mode='rt') as images:
for image in images:
if image.startswith('#') or not image:
continue
image = image.strip()
image_list.append(image)
return image_list
def image_tag_push_cmd(old_suffix, new_suffix):
images = for_image_list(image_svc)
for image in images:
image = image.strip()
new_image = image.replace(old_suffix, new_suffix)
new_image = new_image.strip()
image_tag_cmd = "docker tag {} {}".format(image, new_image)
r, out, error = run_cmd(image_tag_cmd)
if r != 0:
print("{} tag失败 失败原因是: {}".format(image, error))
exit(1)
image_push_cmd = "docker push {}".format(new_image)
r, out, error = run_cmd(image_push_cmd)
if r != 0:
print("{} push失败 失败原因是: {}".format(image, error))
if __name__ == '__main__':
basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
old_suffix = "hub-vpc.jdcloud.com"
new_suffix = "xxxxxxxxx"
argars = argparse.ArgumentParser(description="this is python file")
argars.add_argument('--image_svc', required=True, help="请输入你要推送的镜像列表")
argars = argars.parse_args()
image_svc = argars.image_svc
image_tag_push_cmd(old_suffix, new_suffix)
python脚本的编写思路
面向过程编程
1. 导入各种外部库
2. 设计各种全局变量
3. 写一个函数完成某个功能
4. 写一个函数完成某个功能
5. 写一个函数完成某个功能
6. 写一个函数完成某个功能
7. 写一个函数完成某个功能
8. ......
9. 写一个main函数作为程序入口
在面向过程编程中,许多重要的数据被放置在全局变量区,这样它们就可以被所有函数访问。每个函数都可以具有自己的局部变量数据,封装某些功能代码,无需重复编写,日后仅需调用函数即可。从代码的组织形式来看,面向函数编程的一般套路就是根据业务逻辑从上到下垒代码。
面向对象编程
1. 导入各种外部库
2. 设计各种全局变量
3. 决定你要的类
4. 给每个类提供完整的一组操作
5. 明确地使用继承来表现不同类之间的共同点
6. ......
7. 根据需要,决定是否写一个main函数作为程序入口
在面向对象编程中,将函数和变量进一步封装成类。类是面向对象程序的基本元素,它将数据(类属性)和操作(类方法)紧密地连接在一起,并保护数据不会被外界的函数意外地改变。类和和类的实例(也称对象)是面向对象的核心概念,是与面向过程编程的根本区别。面向对象编程并非必须,而要看你的程序怎么设计方便,但是就目前来说,基本上都是使用面向对象编程。
面向过程的编程思路通常包括以下几个关键概念:
分解问题:首先需要将整个问题分解为一系列可以顺序执行的子任务或操作,每个子任务对应一个具体的处理过程。
定义函数:针对每个子任务或操作,可以定义相应的函数来实现具体的逻辑。函数可以接受参数并返回结果,有助于模块化代码和提高重用性。
流程控制:使用条件语句(如if-else语句)和循环结构(如for循环、while循环)来控制程序的执行流程,根据不同的条件执行不同的操作。
数据处理:在面向过程的编程中,通常会对数据进行操作和处理,这可能涉及到数据的读取、修改、存储等操作。
顺序执行:最终,整个程序按照一定的顺序执行各个子任务或操作,以完成整体的问题解决过程。
面向过程的编程思路注重过程和操作的顺序和逻辑,通过将整个问题分解为可执行的步骤,并按照特定的顺序执行这些步骤来实现程序的功能。与面向对象编程相比,面向过程更加直观和直接,适用于一些简单的脚本和小型程序的编写。
面向过程的简单小案例
import os
# 定义函数:获取指定目录下的所有文件
def get_files_in_directory(directory):
files = []
for filename in os.listdir(directory):
if os.path.isfile(os.path.join(directory, filename)):
files.append(filename)
return files
# 定义函数:处理文件并输出结果
def process_files(files):
results = []
for file in files:
# 处理文件的具体操作(这里只是简单示意)
result = f"Processed {file}"
results.append(result)
return results
# 主程序
def main():
directory = "/path/to/directory"
files = get_files_in_directory(directory)
results = process_files(files)
# 输出结果
for result in results:
print(result)
# 调用主程序
main()