Python运维与数据分析Linux运维

Python Ansible API 实战

2021-10-09  本文已影响0人  菩提老鹰

Python 调用 Ansible API 实现自动化管理,为后续运维平台自动化管理提供帮助,也是学习Jumpserver源码做实战演练。

代码已经做过测试,效果如下:

playbook 测试


python-ansible-playbook.png

adhoc 测试


python-ansible-adhoc.png

demo代码

#!/usr/bin/env python
# encoding: utf-8
# Author: ColinSpace.com
# Date:2021-09
# Desc: 利用Python实现ansible的Adhoc 和 playbook 两种任务方式
# 


import json
import shutil

from ansible import context
from ansible.module_utils.common.collections import ImmutableDict

# 注意: 使用namedtuple实现Options的方式,官网已经不推荐使用
#       网上很多例子都是按照Options的方式,会有报错如下:
# error_msg: "msg": "the connection plugin '<class 'ansible.utils.sentinel.Sentinel'>' was not found",
# 网上提供解决方案: https://blog.csdn.net/FMT21/article/details/103468284
# 但是目前验证报错,options 最终的tuple类型,在初始化的时候用到了 vars 系统函数,但是该函数报错
# TypeError: vars() argument must have __dict__ attribute
# from collections import namedtuple
# Options = namedtuple('Options', ['connection', 'module_path', 'forks', 'become', 'become_method', 'become_user', 'check', 'diff'])
# options = Options(connection='ssh', module_path=['/to/mymodules'], forks=10, become=None, become_method=None, become_user=None, check=False, diff=False)

from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase
from ansible.executor.playbook_executor import PlaybookExecutor

import ansible.constants as C

# 注意: 该配置是为了避免没有第一次公钥访问写入 know_hosts 文件问题,
# 关于如何避免该问题,详见 
C.HOST_KEY_CHECKING = False

class ResultCallback(CallbackBase):

    def v2_runner_on_ok(self, result, **kwargs):
        host = result._host
        print(json.dumps({host.name: result._result}, indent=4))

    def v2_runner_on_failed(self, result, **kwargs):
        host = result._host
        print(json.dumps({host.name: result._result}, indent=4))

    def v2_runner_on_unreachable(self, result, **kwargs):
        host = result._host
        print(json.dumps({host.name: result._result}, indent=4))


class AnsibleApi:
    """
    Description:
    """

    context.CLIARGS = ImmutableDict(
            connection='ssh', remote_user=None, listtags=None, listhosts=None, listtasks=None,
            module_path=None, verbosity=5, ask_sudo_pass=False, private_key_file=None,
            # become=None, become_method=None, become_user=None, 
            become=True, become_method='sudo', become_user='root', 
            forks=10, check=False, diff=False, syntax=None,start_at_task=None,
    )

    """

    Options = namedtuple('Options', ['connection', 'remote_user', 'listtags', 'listhosts', 'listtasks',
        'module_path', 'verbosity', 'ask_sudo_pass', 'private_key_file',
        'become', 'become_method', 'become_user',
        'forks', 'check', 'diff', 'syntax', 'start_at_task'])
    options = Options(
        connection='ssh', remote_user=None, listtags=None, listhosts=None, listtasks=None,
        module_path=None, verbosity=5, ask_sudo_pass=False, private_key_file=None,
        # become=None, become_method=None, become_user=None,
        become=True, become_method='sudo', become_user='root',
        forks=10, check=False, diff=False, syntax=None,start_at_task=None,
    )
    # 报错: TypeError: vars() argument must have __dict__ attribute
    context._init_global_context(options)
    """


    # 一般使用ansible的时候都会通过免密的方式,所以这里直接初始化passwords变为空
    def __init__(self):
        # self.name = name
        # self.host_list = host_list
        # self.task_list = task_list
        self.loader = DataLoader()
        self.result_callback = ResultCallback()
        # self.passwords = dict(vault_pass=passwords)
        self.passwords = dict()
        self.inventory = InventoryManager(loader=self.loader, sources=['/etc/ansible/inventory/hosts', '/etc/ansible/hosts'])
        self.variable_manager = VariableManager(loader=self.loader, inventory=self.inventory)

    def run_adhoc(self, name, hosts, tasks):
        play_source = dict(
            name=name,
            hosts=hosts,
            gather_facts='no',
            tasks=tasks
        )
        play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader)
        tqm = None
        try:
            tqm = TaskQueueManager(
                inventory=self.inventory,
                variable_manager=self.variable_manager,
                loader=self.loader,
                passwords=self.passwords,
                stdout_callback=self.result_callback,  # Use our custom callback instead of the ``default`` callback plugin, which prints to stdout
                run_additional_callbacks=C.DEFAULT_LOAD_CALLBACK_PLUGINS,
                run_tree=False,
            )
            result = tqm.run(play) # most interesting data for a play is actually sent to the callback's methods
        finally:
            # we always need to cleanup child procs and the structres we use to communicate with them
            if tqm is not None:
                tqm.cleanup()

            # Remove ansible tmpdir
            shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)

    def run_playhook(self, playbook):
        # self.variable_manager.extra_vars = {
        #     'customer': 'test',
        #     'disabled': 'yes'
        # }
        playbook = PlaybookExecutor(
            playbooks=playbook,
            inventory=self.inventory,
            variable_manager = self.variable_manager,
            loader=self.loader,
            passwords=self.passwords,
        )
        result = playbook.run()
        return result

if __name__ == '__main__':
    a = AnsibleApi()
    host_list = ['192.168.3.4']
    task_list = [
        dict(action=dict(module='shell', args="ls -l "))
    ]
    # a.run_adhoc(name="checkConnection", hosts=host_list, tasks=task_list)
    a.run_playhook(playbook=["/home/james.liu/test.yml"])

上一篇下一篇

猜你喜欢

热点阅读