python调用系统命令的简单小案例

2023-11-04  本文已影响0人  Joening

subprocess小案例

调用命令

import subprocess


def run_cmd(cmd):
    process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, error = process.communicate()
    out = out.decode('utf-8')
    error = error.decode('utf-8')
    r = process.returncode
    return r, str(out), str(error)


def command():
    cmd = "docker images "
    r, out, error = run_cmd(cmd)
    if r == 0:
        print(out)
    else:
        print(error)


if __name__ == '__main__':
    command()

push chart 小案例

import os
import sys
import subprocess


def run_cmd(cmd):
    process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, error = process.communicate()
    r = process.returncode
    return r, out, error


def for_chart_name(chart_file_name):
    if not os.path.exists(chart_file_name):
        exit(1)
    with open(chart_file_name, mode='rt') as chart_name:
        chart_lst = list()
        for line in chart_name:
            if line.startswith("#") or not line:
                continue
            line = line.strip()
            chart_lst.append(line)
        return chart_lst


def command():
    chart_names = for_chart_name(chart_file_name)
    for chart_name in chart_names:
        error_list = list()
        chartName = chart_name.split(',')[0]
        chartVersion = chart_name.split(',')[-1]
        helmpull_cmd = "helm pull {}/{} --version {}".format("itg", chartName, chartVersion)
        r, out, error = run_cmd(helmpull_cmd)
        if r == 0:
            print("{} pull ok".format(chartName))
        else:
            print("{} pull error 失败原因是: {}".format(chartName, error.decode('utf-8')))
            error_list.append(error.decode('utf-8'))
            exit(-1)
        return error_list


if __name__ == '__main__':
    basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
    chart_file = "chart.txt"
    chart_file_name = os.path.join(basedir, chart_file)
    ret = command()
    print(ret)

import subprocess
import os

images = '''
flannel/flannel:v0.21.5
flannel/flannel-cni-plugin:v1.1.2
registry.aliyuncs.com/google_containers/kube-apiserver:v1.23.5
registry.aliyuncs.com/google_containers/kube-proxy:v1.23.5
registry.aliyuncs.com/google_containers/kube-controller-manager:v1.23.5
registry.aliyuncs.com/google_containers/kube-scheduler:v1.23.5
registry.aliyuncs.com/google_containers/etcd:3.5.1-0
registry.aliyuncs.com/google_containers/coredns:v1.8.6
registry.aliyuncs.com/google_containers/pause:3.6
'''
dest_dir = "/tmp/qiaoning"


def run_cmd(cmd):
    process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, error = process.communicate()
    r = process.returncode
    return r, out, error


def image(images, dest_dir):
    image_list = []
    for imagename in images.split():
        imagename = imagename.strip()
        image_list.append(imagename)
    if not os.path.exists(dest_dir):
        os.makedirs(dest_dir)
    if len(os.listdir(dest_dir)) > 1:
        print("请清理磁盘{}目录下的内容".format(dest_dir))
        exit(1)
    return image_list, dest_dir


def command():
    image_name, destdir = image(images, dest_dir)
    for name in image_name:
        docker_pull_image_cmd = 'docker pull {}'.format(name)
        r, out, error = run_cmd(docker_pull_image_cmd)
        if r != 0:
            print(name, '错误原因是:', error.encode('utf-8'))
        newname = name.split('/')[-1].replace(':', '-')
        docker_save_image_cmd = 'docker save {} | gzip > {}/{}.tar.gz'.format(name, destdir, newname)
        r, out, error = run_cmd(docker_save_image_cmd)
        if r != 0:
            print(name, '错误原因是:', error.encode('utf-8'))


if __name__ == '__main__':
    command()

小案例

import subprocess
import os
import argparse


def commad(cmd):
    porcess = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, error = porcess.communicate()
    r = porcess.returncode
    return r, out, error


def check_directory(dest_directory):
    if not os.path.exists(dest_directory):
        os.mkdir(dest_directory)
    if len(os.listdir(dest_directory)) > 1:
        print(dest_directory, "目录内容请清理")
        exit(2)
    return dest_directory


def get_image_file_list(image_file_svc):
    if not os.path.exists(image_file_svc):
        print(image_file_svc, 'no such file or directory')
        exit(1)
    lst = list()
    with open(image_file_svc, mode='rt', encoding='utf-8') as file_name:
        for item in file_name:
            if item.startswith('#') or not item:
                continue
            item = item.strip()
            lst.append(item)
    return lst


def pull_image(dockerurl, username, password):
    print("[*** 检查docker二进制命令是否存在 ***]")
    images_list = get_image_file_list(image_file_svc)
    check_docker = "which %s" % 'docker'
    r, out, error = commad(check_docker)
    if r != 0:
        print('docker is not command')
        exit(1)
    print("[*** 登录docker私有仓库地址 地址为 %s ***]" % dockerurl)
    docker_login = f'docker login --username "{username}" --password "{password}" {dockerurl}'
    print(docker_login)
    # r, out, error = commad(docker_login)
    # if r != 0:
    #     print('docker login error 错误原因是:', error.decode('utf-8'))
    #     exit(1)
    print("[*** 开始pull镜像 ***]")
    for images in images_list:
        docke_pull_cmd = "docker pull {}".format(images)
        r, out, error = commad(docke_pull_cmd)
        if r == 0:
            print(images, 'is pull ok')
        else:
            print('错误原因是:', error.decode('utf8'))


def save_image():
    dest_dir = check_directory(dest_directory)
    image_list = get_image_file_list(image_file_svc)
    print('[*** 开始打包镜像 打包到 {} ***]'.format(dest_dir))
    for image in image_list:
        new_image = image.split('/')[-1].replace(':', '-')
        docker_tag = f'docker save {image} | gzip > {dest_dir}{new_image}.tar.gz'
        r, out, error = commad(docker_tag)
        if r == 0:
            print(image, "save ok")
        else:
            print(image, 'save 失败原因是:', error.decode('utf-8'))


if __name__ == '__main__':
    argars = argparse.ArgumentParser(description="this is python file")
    argars.add_argument('--image_file_svc', required=True, help='--image_file_svc')
    argars.add_argument('--username', required=True, help='--username')
    argars.add_argument('--password', required=True, help='--password')
    argars.add_argument('--dockerurl', required=True, help='--dockerurl')
    argars.add_argument('--dest_directory', required=True, help='--dest_directory')
    argars = argars.parse_args()
    username = argars.username
    password = argars.password
    dockerurl = argars.dockerurl
    dest_directory = argars.dest_directory
    image_file_svc = argars.image_file_svc
    print('========================================= 开始执行 ==============================')
    check_directory(dest_directory)
    pull_image(dockerurl, username, password)
    save_image()
    print('========================================= 执行结束 ===============================')

改tag 推送镜像小案例

#!/usr/bin/python3
# -*- coding: utf-8 -*-
import os
import subprocess
import sys


def run_cmd(cmd):
    process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, error = process.communicate()
    r = process.returncode
    return r, str(out), str(error)


def for_image_list(image_svc):
    with open(image_svc, mode='rt') as images:
        for image in images:
            image = image.strip()
            new_image = image.replace(old_suffix.strip(), new_suffix.strip())
            new_image = new_image.strip()
            print("docker tag {} {}".format(image, new_image))
            docker_tag_cmd = "docker tag {} {}".format(image, new_image)
            r, out, error = run_cmd(docker_tag_cmd)
            if r == 0:
                print("{}执行成功".format(image))
            else:
                print("{}执行失败 错误原因是: {}".format(image, error))
                exit(1)

            print("=========================== 推送镜像 {}========================".format(new_image))
            print("docker push {}".format(new_image))
            docker_push_cmd = "docker push {}".format(new_image)
            r, out, error = run_cmd(docker_push_cmd)
            if r == 0:
                print("{} 推送成功".format(new_image))
            else:
                print("{} 推送失败".format(new_image))
                exit(1)


if __name__ == '__main__':
    basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
    imageslist = "imagelist.txt"
    image_csv = os.path.join(basedir, imageslist)
    old_suffix = "hub-vpc.jdcloud.com"
    new_suffix = "xxxxx"
    for_image_list(image_csv)



#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import sys
import subprocess
import argparse


def run_cmd(cmd):
    process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, error = process.communicate()
    r = process.returncode
    return r, str(out), str(error)


def for_image_list(image_csv):
    if not os.path.exists(image_csv):
        print("{}文件不存在".format(image_csv))
        exit(1)
    image_list = list()
    with open(image_csv, mode='rt') as images:
        for image in images:
            if image.startswith('#') or not image:
                continue
            image = image.strip()
            image_list.append(image)
    return image_list


def image_tag_push_cmd(old_suffix, new_suffix):
    images = for_image_list(image_svc)
    for image in images:
        image = image.strip()
        new_image = image.replace(old_suffix, new_suffix)
        new_image = new_image.strip()
        image_tag_cmd = "docker tag {} {}".format(image, new_image)
        r, out, error = run_cmd(image_tag_cmd)
        if r != 0:
            print("{} tag失败 失败原因是: {}".format(image, error))
            exit(1)
        image_push_cmd = "docker push {}".format(new_image)
        r, out, error = run_cmd(image_push_cmd)
        if r != 0:
            print("{} push失败 失败原因是: {}".format(image, error))


if __name__ == '__main__':
    basedir = os.path.dirname(os.path.realpath(sys.argv[0]))
    old_suffix = "hub-vpc.jdcloud.com"
    new_suffix = "xxxxxxxxx"
    argars = argparse.ArgumentParser(description="this is python file")
    argars.add_argument('--image_svc', required=True, help="请输入你要推送的镜像列表")
    argars = argars.parse_args()
    image_svc = argars.image_svc
    image_tag_push_cmd(old_suffix, new_suffix)

python脚本的编写思路

面向过程编程

1. 导入各种外部库

2. 设计各种全局变量

3. 写一个函数完成某个功能
4. 写一个函数完成某个功能
5. 写一个函数完成某个功能
6. 写一个函数完成某个功能
7. 写一个函数完成某个功能
8. ......

9. 写一个main函数作为程序入口

在面向过程编程中,许多重要的数据被放置在全局变量区,这样它们就可以被所有函数访问。每个函数都可以具有自己的局部变量数据,封装某些功能代码,无需重复编写,日后仅需调用函数即可。从代码的组织形式来看,面向函数编程的一般套路就是根据业务逻辑从上到下垒代码。

面向对象编程

1. 导入各种外部库

2. 设计各种全局变量

3. 决定你要的类
4. 给每个类提供完整的一组操作
5. 明确地使用继承来表现不同类之间的共同点
6. ......

7. 根据需要,决定是否写一个main函数作为程序入口

在面向对象编程中,将函数和变量进一步封装成类。类是面向对象程序的基本元素,它将数据(类属性)和操作(类方法)紧密地连接在一起,并保护数据不会被外界的函数意外地改变。类和和类的实例(也称对象)是面向对象的核心概念,是与面向过程编程的根本区别。面向对象编程并非必须,而要看你的程序怎么设计方便,但是就目前来说,基本上都是使用面向对象编程。
面向过程的编程思路通常包括以下几个关键概念:
分解问题:首先需要将整个问题分解为一系列可以顺序执行的子任务或操作,每个子任务对应一个具体的处理过程。
定义函数:针对每个子任务或操作,可以定义相应的函数来实现具体的逻辑。函数可以接受参数并返回结果,有助于模块化代码和提高重用性。
流程控制:使用条件语句(如if-else语句)和循环结构(如for循环、while循环)来控制程序的执行流程,根据不同的条件执行不同的操作。
数据处理:在面向过程的编程中,通常会对数据进行操作和处理,这可能涉及到数据的读取、修改、存储等操作。
顺序执行:最终,整个程序按照一定的顺序执行各个子任务或操作,以完成整体的问题解决过程。

面向过程的编程思路注重过程和操作的顺序和逻辑,通过将整个问题分解为可执行的步骤,并按照特定的顺序执行这些步骤来实现程序的功能。与面向对象编程相比,面向过程更加直观和直接,适用于一些简单的脚本和小型程序的编写。

面向过程的简单小案例

import os

# 定义函数:获取指定目录下的所有文件
def get_files_in_directory(directory):
    files = []
    for filename in os.listdir(directory):
        if os.path.isfile(os.path.join(directory, filename)):
            files.append(filename)
    return files

# 定义函数:处理文件并输出结果
def process_files(files):
    results = []
    for file in files:
        # 处理文件的具体操作(这里只是简单示意)
        result = f"Processed {file}"
        results.append(result)
    return results

# 主程序
def main():
    directory = "/path/to/directory"
    files = get_files_in_directory(directory)
    results = process_files(files)
    
    # 输出结果
    for result in results:
        print(result)

# 调用主程序
main()
上一篇下一篇

猜你喜欢

热点阅读