使用Python调用Nessus 接口实现自动化扫描

2020-01-12  本文已影响0人  一叶障目

之前在项目中需要接入nessus扫描器,研究了一下nessus的api,现在将自己的成果分享出来。
Nessus提供了丰富的二次开发接口,无论是接入其他系统还是自己实现自动化扫描,都十分方便。
同时Nessus也提供了完备的API文档,可以在 Settings->My Account->API Keys->API documentation


nessus图片

认证

nessus提供两种认证方式,第一种采用常规的登录后获取token的方式,在https://localhost:8834/api#/resources/session条目中可以找到这种方式,它的接口定义如下:

POST /session

{
    "username":{string},
    "password":{string}
}

输入正确的用户名和密码,登录成功后会返回一个token
···
{
"token": {string}
}
···

在后续请求中,将token放入请求头信息中请求头的key为X-Cookie,值为 token=xxxx,例如 :X-Cookie: token=5fa3d3fd97edcf40a41bb4dbdfd0b470ba45dde04ebc37f8;,下面是获取任务列表的例子

import requests
import json
def get_token(ip, port, username, password):
    url = "https://{0}:{1}/session".format(ip, port)
    post_data = {
        'username': username,
        'password': password
    }
    
    respon = requests.post(url, data=post_data, verify=False)
    if response.status_code == 200:
        data = json.loads(response.text)
        return data["token"]

def get_scan_list()
    # 这里ip和port可以从配置文件中读取或者从数据库读取,这里我省略了获取这些配置值得操作
    url = "https://{ip}:{port}/scans".format(ip, port)
    token = get_token(ip, port, username, password)
    if token:
        header = {
            "X-Cookie":"token={0}".format(token),
            "Content-Type":"application/json"
        }
        response = requests.get(url, headers=header, verify=False)
        if response.status_code == 200:
            result = json.loads(respon.text)
            return result

第二种方式是使用Nessus生成的API Key,这里我们可以依次点击 Settings->My Account->API Keys-->Generate按钮,生成一个key,后续使用时填入头信息中,还是以获取扫描任务列表作为例子

def get_scan_list()
    accessKey = "XXXXXX" #此处填入真实的内容
    secretKey = "XXXXXX" #此处填入真实内容
    
    url = "https://{ip}:{port}/scans".format(ip, port)
    token = get_token(ip, port, username, password)
    if token:
        header = {
            'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=accessKey, secretkey=secretKey)
            "Content-Type":"application/json"
        }
        response = requests.get(url, headers=header, verify=False)
        if response.status_code == 200:
            result = json.loads(respon.text)
            return result

对比来看使用第二种明显方便一些,因此后续例子都采用第二种方式来呈现

策略模板配置

策略模板的接口文档在 https://localhost:8834/api#/resources/policies 中。

创建策略模板

创建策略模板使用的是 策略模板的create接口,它里面有一个必须填写的参数 uuid 这个参数是一个uuid值,表示以哪种现有模板进行创建。在创建之前需要先获取系统中可用的模板。获取的接口是 /editor/{type}/templates,type 可以选填policy或者scan。这里我们填policy

一般我们都是使用模板中的 Advanced 来创建,如下图

在这里插入图片描述
下面是获取该模板uuid的方法,主要思路是获取系统中所有模板,然后根据模板名称返回对应的uuid值
def get_nessus_template_uuid(ip, port, template_name = "advanced"):
    header = {
        'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        'Content-type': 'application/json',
        'Accept': 'text/plain'}

    api = "https://{ip}:{port}/editor/scan/templates".format(ip=ip, port=port)
    response = requests.get(api, headers=header, verify=False)
    templates = json.loads(response.text)['templates']

    for template in templates:
        if template['name'] == template_name:
            return template['uuid']
    return None

有了这个id之后,下面来创建策略模板,这个接口的参数较多,但是很多参数都是选填项。

这个部分文档写的很简陋,很多参数不知道是干嘛用的,当时我为了搞清楚每个参数的作用,一个个的尝试,然后去界面上看它的效果,最后终于把我感兴趣的给弄明白了。
它的主体部分如下:

{
   "uuid": {template_uuid},
   "audits": {
       "feed": {
           "add": [
               {
                   "id": {audit_id},
                   "variables": {
                       "1": {audit_variable_value},
                       "2": {audit_variable_value},
                       "3": {audit_variable_value}
                   }
               }
           ]
       }
   },
   "credentials": {
       "add": {
           {credential_category}: {
               {credential_name}: [
                   {
                       {credential_input_name}: {string}
                   }
               ]
           }
       }
   },
   "plugins": {
       {plugin_family_name}: {
           "status": {string},
           "individual": {
               {plugin_id}: {string}
           }
       }
   },
   "scap": {
       "add": {
           {scap_category}: [
               {
                   {scap_input_name}: {string}
               }
           ]
       }
   },
   "settings": {
       "acls": [
           permission Resource
       ],
       //其他的减值对,这里我将他们都省略了
}

他们与界面上配置的几个大项有对应关系,能对应的上的我给做了标记,但是有的部分对应不上。


在这里插入图片描述

settings 是给策略模板做基础配置的,包括配置扫描的端口范围,服务检测范围等等。
credentials 是配置登录扫描的,主要包括 windows、ssh、telnet等等
plugins 配置扫描使用的插件,例如服务扫描版本漏洞等等

在settings中,对应关系如下图所示


host scan
port scan
在这里插入图片描述

下面是创建扫描策略模板的实际例子:

def create_template(ip, port, **kwargs): # kwargs 作为可选参数,用来配置settings和其他项
    header = {
        "X-ApiKeys": "accessKey={accesskey};secretKey={secretkey}".format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        "Content-Type": "application/json",
        "Accept": "text/plain"
    }
    policys = {}

    # 这里 grouppolicy_set 存储的是策略模板中各个脚本名称以及脚本是否启用的信息
    for policy in grouppolicy_set:
        enabled = "enabled" if policy.enable else "disabled"
        policys[policy.name] = {
            "status": enabled
        }
    
    # settings里面的各小项必须得带上,否则会创建不成功
    "settings": {
        "name": template.name,
        "watchguard_offline_configs": "",
        "unixfileanalysis_disable_xdev": "no",
        "unixfileanalysis_include_paths": "",
        "unixfileanalysis_exclude_paths": "",
        "unixfileanalysis_file_extensions": "",
        "unixfileanalysis_max_size": "",
        "unixfileanalysis_max_cumulative_size": "",
        "unixfileanalysis_max_depth": "",
        "unix_docker_scan_scope": "host",
        "sonicos_offline_configs": "",
        "netapp_offline_configs": "",
        "junos_offline_configs": "",
        "huawei_offline_configs": "",
        "procurve_offline_configs": "",
        "procurve_config_to_audit": "Saved/(show config)",
        "fortios_offline_configs": "",
        "fireeye_offline_configs": "",
        "extremeos_offline_configs": "",
        "dell_f10_offline_configs": "",
        "cisco_offline_configs": "",
        "cisco_config_to_audit": "Saved/(show config)",
        "checkpoint_gaia_offline_configs": "",
        "brocade_offline_configs": "",
        "bluecoat_proxysg_offline_configs": "",
        "arista_offline_configs": "",
        "alcatel_timos_offline_configs": "",
        "adtran_aos_offline_configs": "",
        "patch_audit_over_telnet": "no",
        "patch_audit_over_rsh": "no",
        "patch_audit_over_rexec": "no",
        "snmp_port": "161",
        "additional_snmp_port1": "161",
        "additional_snmp_port2": "161",
        "additional_snmp_port3": "161",
        "http_login_method": "POST",
        "http_reauth_delay": "",
        "http_login_max_redir": "0",
        "http_login_invert_auth_regex": "no",
        "http_login_auth_regex_on_headers": "no",
        "http_login_auth_regex_nocase": "no",
        "never_send_win_creds_in_the_clear": "yes" if kwargs["never_send_win_creds_in_the_clear"] else "no",
        "dont_use_ntlmv1": "yes" if kwargs["dont_use_ntlmv1"] else "no",
        "start_remote_registry": "yes" if kwargs["start_remote_registry"] else "no",
        "enable_admin_shares": "yes" if kwargs["enable_admin_shares"] else "no",
        "ssh_known_hosts": "",
        "ssh_port": kwargs["ssh_port"],
        "ssh_client_banner": "OpenSSH_5.0",
        "attempt_least_privilege": "no",
        "region_dfw_pref_name": "yes",
        "region_ord_pref_name": "yes",
        "region_iad_pref_name": "yes",
        "region_lon_pref_name": "yes",
        "region_syd_pref_name": "yes",
        "region_hkg_pref_name": "yes",
        "microsoft_azure_subscriptions_ids": "",
        "aws_ui_region_type": "Rest of the World",
        "aws_us_east_1": "",
        "aws_us_east_2": "",
        "aws_us_west_1": "",
        "aws_us_west_2": "",
        "aws_ca_central_1": "",
        "aws_eu_west_1": "",
        "aws_eu_west_2": "",
        "aws_eu_west_3": "",
        "aws_eu_central_1": "",
        "aws_eu_north_1": "",
        "aws_ap_east_1": "",
        "aws_ap_northeast_1": "",
        "aws_ap_northeast_2": "",
        "aws_ap_northeast_3": "",
        "aws_ap_southeast_1": "",
        "aws_ap_southeast_2": "",
        "aws_ap_south_1": "",
        "aws_me_south_1": "",
        "aws_sa_east_1": "",
        "aws_use_https": "yes",
        "aws_verify_ssl": "yes",
        "log_whole_attack": "no",
        "enable_plugin_debugging": "no",
        "audit_trail": "use_scanner_default",
        "include_kb": "use_scanner_default",
        "enable_plugin_list": "no",
        "custom_find_filepath_exclusions": "",
        "custom_find_filesystem_exclusions": "",
        "reduce_connections_on_congestion": "no",
        "network_receive_timeout": "5",
        "max_checks_per_host": "5",
        "max_hosts_per_scan": "100",
        "max_simult_tcp_sessions_per_host": "",
        "max_simult_tcp_sessions_per_scan": "",
        "safe_checks": "yes",
        "stop_scan_on_disconnect": "no",
        "slice_network_addresses": "no",
        "allow_post_scan_editing": "yes",
        "reverse_lookup": "no",
        "log_live_hosts": "no",
        "display_unreachable_hosts": "no",
        "report_verbosity": "Normal",
        "report_superseded_patches": "yes",
        "silent_dependencies": "yes",
        "scan_malware": "no",
        "samr_enumeration": "yes",
        "adsi_query": "yes",
        "wmi_query": "yes",
        "rid_brute_forcing": "no",
        "request_windows_domain_info": "no",
        "scan_webapps": "no",
        "start_cotp_tsap": "8",
        "stop_cotp_tsap": "8",
        "modbus_start_reg": "0",
        "modbus_end_reg": "16",
        "hydra_always_enable": "yes" if kwargs["hydra_always_enable"] else "no",
        "hydra_logins_file": "" if kwargs["hydra_logins_file"] else kwargs["hydra_logins_file"], # 弱口令文件需要事先上传,后面会提到上传文件接口
        "hydra_passwords_file": "" if kwargs["hydra_passwords_file"] else kwargs["hydra_passwords_file"],
        "hydra_parallel_tasks": "16",
        "hydra_timeout": "30",
        "hydra_empty_passwords": "yes",
        "hydra_login_as_pw": "yes",
        "hydra_exit_on_success": "no",
        "hydra_add_other_accounts": "yes",
        "hydra_postgresql_db_name": "",
        "hydra_client_id": "",
        "hydra_win_account_type": "Local accounts",
        "hydra_win_pw_as_hash": "no",
        "hydra_cisco_logon_pw": "",
        "hydra_web_page": "",
        "hydra_proxy_test_site": "",
        "hydra_ldap_dn": "",
        "test_default_oracle_accounts": "no",
        "provided_creds_only": "yes",
        "smtp_domain": "example.com",
        "smtp_from": "nobody@example.com",
        "smtp_to": "postmaster@[AUTO_REPLACED_IP]",
        "av_grace_period": "0",
        "report_paranoia": "Normal",
        "thorough_tests": "no",
        "detect_ssl": "yes",
        "tcp_scanner": "no",
        "tcp_firewall_detection": "Automatic (normal)",
        "syn_scanner": "yes",
        "syn_firewall_detection": "Automatic (normal)",
        "wol_mac_addresses": "",
        "wol_wait_time": "5",
        "scan_network_printers": "no",
        "scan_netware_hosts": "no",
        "scan_ot_devices": "no",
        "ping_the_remote_host": "yes",
        "tcp_ping": "yes",
        "icmp_unreach_means_host_down": "no",
        "test_local_nessus_host": "yes",
        "fast_network_discovery": "no",

        "arp_ping": "yes" if kwargs["arp_ping"] else "no",
        "tcp_ping_dest_ports": kwargs["tcp_ping_dest_ports"],
        "icmp_ping": "yes" if kwargs["icmp_ping"] else "no",
        "icmp_ping_retries": kwargs["icmp_ping_retries"],
        "udp_ping": "yes" if kwargs["udp_ping"] else "no",
        "unscanned_closed": "yes" if kwargs["unscanned_closed"] else "no",
        "portscan_range": kwargs["portscan_range"],
        "ssh_netstat_scanner": "yes" if kwargs["ssh_netstat_scanner"] else "no",
        "wmi_netstat_scanner": "yes" if kwargs["wmi_netstat_scanner"] else "no",
        "snmp_scanner": "yes" if kwargs["snmp_scanner"] else "no",
        "only_portscan_if_enum_failed": "yes" if kwargs["only_portscan_if_enum_failed"] else "no",
        "verify_open_ports": "yes" if kwargs["verify_open_ports"] else "no",
        "udp_scanner": "yes" if kwargs["udp_scanner"] else "no",
        "svc_detection_on_all_ports": "yes" if kwargs["svc_detection_on_all_ports"] else "no",
        "ssl_prob_ports": "Known SSL ports" if kwargs["ssl_prob_ports"] else "All ports",
        "cert_expiry_warning_days": kwargs["cert_expiry_warning_days"],
        "enumerate_all_ciphers": "yes" if kwargs["enumerate_all_ciphers"] else "no",
        "check_crl": "yes" if kwargs["check_crl"] else "no",
   }
    
    credentials = {
            "add": {
                "Host": {
                    "SSH": [],
                    "SNMPv3": [],
                    "Windows": [],
                },
                "Plaintext Authentication": {
                    "telnet/rsh/rexec": []
                }
            }
        }
        try:
            if kwargs["snmpv3_username"] and kwargs["snmpv3_port"] and kwargs["snmpv3_level"]:
                level = kwargs["snmpv3_level"]
                if level == NessusSettings.LOW:
                    credentials["add"]["Host"]["SNMPv3"].append({
                        "security_level": "No authentication and no privacy",
                        "username": kwargs["snmpv3_username"],
                        "port": kwargs["snmpv3_port"]
                    })
                elif level == NessusSettings.MID:
                    credentials["add"]["Host"]["SNMPv3"].append({
                        "security_level": "Authentication without privacy",
                        "username": kwargs["snmpv3_username"],
                        "port": kwargs["snmpv3_port"],
                        "auth_algorithm": NessusSettings.AUTH_ALG[kwargs["snmpv3_auth"][1]],
                        "auth_password": kwargs["snmpv3_auth_psd"]
                    })
                elif level == NessusSettings.HIGH:
                    credentials["add"]["Host"]["SNMPv3"].append({
                        "security_level": "Authentication and privacy",
                        "username": kwargs["snmpv3_username"],
                        "port": kwargs["snmpv3_port"],
                        "auth_algorithm": NessusSettings.AUTH_ALG[kwargs["snmpv3_auth"]][1],
                        "auth_password": kwargs["snmpv3_auth_psd"],
                        "privacy_algorithm": NessusSettings.PPIVACY_ALG[kwargs["snmpv3_hide"]][1],
                        "privacy_password": kwargs["snmpv3_hide_psd"]
                    })

            if kwargs["ssh_username"] and kwargs["ssh_psd"]:
                credentials["add"]["Host"]["SSH"].append(
                    {
                        "auth_method": "password",
                        "username": kwargs["ssh_username"],
                        "password": kwargs["ssh_psd"],
                        "elevate_privileges_with": "Nothing",
                        "custom_password_prompt": "",
                    })

            if kwargs["windows_username"] and kwargs["windows_psd"]:
                credentials["add"]["Host"]["Windows"].append({
                    "auth_method": "Password",
                    "username": kwargs["windows_username"],
                    "password": kwargs["windows_psd"],
                    "domain": kwargs["ssh_host"]
                })

            if kwargs["telnet_username"] and kwargs["telnet_password"]:
                credentials["add"]["Plaintext Authentication"]["telnet/rsh/rexec"].append({
                    "username": kwargs["telnet_username"],
                    "password": kwargs["telnet_password"]
                })
    
    data = {
            "uuid": get_nessus_template_uuid(terminal, "advanced"),
            "settings": settings,
            "plugins": policys,
            "credentials": credentials
        }
    
    api = "https://{0}:{1}/policies".format(ip, port)
    response = requests.post(api, headers=header, data=json.dumps(data, ensure_ascii=False).encode("utf-8"), # 这里做一个转码防止在nessus端发生中文乱码
                             verify=False)
    if response.status_code == 200:
        data = json.loads(response.text)
        return data["policy_id"] # 返回策略模板的id,后续可以在创建任务时使用
    else:
        return None

策略还有copy、delete、config等操作,这里就不再介绍了,这个部分主要弄清楚各参数的作用,后面的这些接口使用的参数都是一样的

任务

任务部分的API 在https://localhost:8834/api#/resources/scans

创建任务

创建任务重要的参数如下说明如下:

  1. uuid: 创建任务时使用的模板id,这个id同样是我们上面说的系统自带的模板id
  2. name:任务名称
  3. policy_id:策略模板ID,这个是可选的,如果要使用上面我们自己定义的扫描模板,需要使用这个参数来指定,并且设置上面的uuid为 custom 的uuid,这个值表示使用用户自定义模板;当然如果就想使用系统提供的,这个字段可以不填
  4. text_targets:扫描目标地址,这个参数是一个数组,可以填入多个目标地址,用来一次扫描多个主机

创建任务的例子如下:

def create_task(task_name, policy_id, hosts): # host 是一个列表,存放的是需要扫描的多台主机
    uuid = get_nessus_template_uuid(terminal, "custom") # 获取自定义策略的uuid
    if uuid is None:
        return False

    data = {"uuid": uuid, "settings": {
        "name": name,
        "policy_id": policy_id,
        "enabled": True,
        "text_targets": hosts,
        "agent_group_id": []
    }}

    header = {
        'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        'Content-type': 'application/json',
        'Accept': 'text/plain'}

    api = "https://{ip}:{port}/scans".format(ip=terminal.ip, port=terminal.port)
    response = requests.post(api, headers=header, data=json.dumps(data, ensure_ascii=False).encode("utf-8"),
                             verify=False)
    if response.status_code == 200:
        data = json.loads(response.text)
        if data["scan"] is not None:
            scan = data["scan"]
            # 新增任务扩展信息记录

            return scan["id"] # 返回任务id

启动/停止任务

启动任务的接口为 POST /scans/{scan_id}/launch scan_id 是上面创建任务返回的任务ID, 它有个可选参数 alt_targets,如果这个参数被指定,那么该任务可以扫描这个参数中指定的主机,而之前创建任务时指定的主机将被替代

停止任务的接口为: POST /scans/{scan_id}/stop
下面给出启动和停止任务的方法

def start_task(task_id, hosts):
    header = {
        'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        'Content-type': 'application/json',
        'Accept': 'text/plain'}

    data = {
        "alt_targets": [hosts] # 重新指定扫描地址
    }

    api = "https://{ip}:{port}/scans/{scan_id}/launch".format(ip=ip, port=port, scan_id=scan_id)
    response = requests.post(api, data=data, verify=False, headers=header)
    if response.status_code != 200:
        return False
    else:
        return True

def stop_task(task_id):
    header = {
        'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=terminal.reserved1,
                                                                          secretkey=terminal.reserved2),
        'Content-type': 'application/json',
        'Accept': 'text/plain'}
    
    api = "https://{ip}:{port}/scans/{scan_id}/stop".format(ip=ip, port=port, task_id)
    response = requests.post(api, headers=header, verify=False)
    if response.status_code == 200 or response.status_code == 409: # 根据nessus api文档可以知道409 表示任务已结束
        return True

    return False

获取扫描结果

使用接口 GET /scans/{scan_id} 可以获取最近一次扫描的任务信息,从接口文档上看,它还可以获取某次历史扫描记录的信息,如果不填这个参数,接口中会返回所有历史记录的id。如果不填历史记录id,那么会返回最近一次扫描到的漏洞信息,也就是说新扫描到的信息会把之前的信息给覆盖

下面是返回信息的部分说明

{
    "info": {
        "edit_allowed": {boolean},
        "status": {string}, //当前状态 completed 字符串表示结束,cancel表示停止
        "policy": {string},
        "pci-can-upload": {boolean},
        "hasaudittrail": {boolean},
        "scan_start": {string},
        "folder_id": {integer},
        "targets": {string},
        "timestamp": {integer},
        "object_id": {integer},
        "scanner_name": {string},
        "haskb": {boolean},
        "uuid": {string},
        "hostcount": {integer},
        "scan_end": {string},
        "name": {string},
        "user_permissions": {integer},
        "control": {boolean}
    },
    "hosts": [ //按主机区分的漏洞信息
        host Resource
    ],
    "comphosts": [
        host Resource
    ],
    "notes": [
        note Resource
    ],
    "remediations": {
        "remediations": [
            remediation Resource
        ],
        "num_hosts": {integer},
        "num_cves": {integer},
        "num_impacted_hosts": {integer},
        "num_remediated_cves": {integer}
    },
    "vulnerabilities": [
        vulnerability Resource //本次任务扫描到的漏洞信息
    ],
    "compliance": [
        vulnerability Resource
    ],
    "history": [
        history Resource //历史扫描信息,可以从这个信息中获取历史记录的id
    ],
    "filters": [
        filter Resource
    ]
}

这个信息里面vulnerabilities和host里面都可以拿到漏洞信息,但是 vulnerabilities中是扫描到的所有漏洞信息,而host则需要根据id再次提交请求,也就是需要额外一次请求,但它是按照主机对扫描到的漏洞进行了分类。而使用vulnerabilities则需要根据漏洞信息中的host_id 手工进行分类

下面是获取任务状态的示例:

def get_task_status(task_id):
    header = {
        "X-ApiKeys": "accessKey={accesskey};secretKey={secretkey}".format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        "Content-Type": "application/json",
        "Accept": "text/plain"
    }

    api = "https://{ip}:{port}/scans/{task_id}".format(ip=ip, port=port,
                                                       task_id=task_id)
    response = requests.get(api, headers=header, verify=False)
    if response.status_code != 200:
        return 2, "Data Error"

    data = json.loads(response.text)
    hosts = data["hosts"]
    for host in hosts:
        get_host_vulnerabilities(scan_id, host["host_id"]) # 按主机获取漏洞信息

    if data["info"]["status"] == "completed" or data["info"]["status"] =='canceled':
        # 已完成,此时更新本地任务状态
    return 1, "OK"

获取漏洞信息

在获取任务信息中,已经得到了本次扫描中发现的弱点信息了,只需要我们解析这个json。它具体的内容如下:

"host_id": {integer}, //主机id
"host_index": {string}, 
"hostname": {integer},//主机名称
"progress": {string}, //扫描进度
"critical": {integer}, //危急漏洞数
"high": {integer}, //高危漏洞数
"medium": {integer}, //中危漏洞数
"low": {integer}, //低危漏洞数
"info": {integer}, //相关信息数目
"totalchecksconsidered": {integer},
"numchecksconsidered": {integer},
"scanprogresstotal": {integer},
"scanprogresscurrent": {integer},
"score": {integer}

根据主机ID可以使用 GET /scans/{scan_id}/hosts/{host_id} 接口获取主机信息,它需要两个参数,一个是扫描任务id,另一个是主机id。
下面列举出来的是返回值得部分内容,只列举了我们感兴趣的部分:

{
    "info": {
        "host_start": {string},
        "mac-address": {string},
        "host-fqdn": {string},
        "host_end": {string},
        "operating-system": {string},
        "host-ip": {string}
    },
    
    "vulnerabilities": [
        {
            "host_id": {integer}, //主机id
            "hostname": {string}, //主机名称
            "plugin_id": {integer}, //策略id
            "plugin_name": {string}, //策略名称
            "plugin_family": {string}, //所属策略组
            "count": {integer}, //该种漏洞数
            "vuln_index": {integer}, 
            "severity_index": {integer},
            "severity": {integer}
        }
    ],
}

根据上面获取任务信息中得到的主机id和任务id,我们可以实现这个功能

def get_host_vulnerabilities(scan_id, host_id):
    header = {
        "X-ApiKeys": "accessKey={accesskey};secretKey={secretkey}".format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        "Content-Type": "application/json",
        "Accept": "text/plain"
    }

    scan_history = ScanHistory.objects.get(id=scan_id)
    api = "https://{ip}:{port}/scans/{task_id}/hosts/{host_id}".format(ip=ip, port=port, task_id=scan_id, host_id=host_id)
    response = requests.get(api, headers=header, verify=False)
    if response.status_code != 200:
        return 2, "Data Error"

    data = json.loads(response.text)
    vulns = data["vulnerabilities"]
    for vuln in vulns:
        vuln_name = vuln["plugin_name"]
        plugin_id = vuln["plugin_id"] #插件id,可以获取更详细信息,包括插件自身信息和扫描到漏洞的解决方案等信息
        #保存漏洞信息

获取漏洞输出信息与漏洞知识库信息

我们在nessus web页面中可以看到每条被检测到的漏洞在展示时会有输出信息和知识库信息,这些信息也可以根据接口来获取

在这里插入图片描述
获取漏洞的知识库可以通过接口 GET /scans/{scan_id}/hosts/{host_id}/plugins/{plugin_id} , 它的路径为: https://localhost:8834/api#/resources/scans/plugin-output

它返回的值如下:

{
    "info": {
        "plugindescription": {
            "severity": {integer}, //危险等级,从info到最后的critical依次为1,2,3,4,5
            "pluginname": {string}, 
            "pluginattributes": {
                "risk_information": {
                    "risk_factor": {string}
                },
                "plugin_name": {string}, //插件名称
                "plugin_information": {
                    "plugin_id": {integer},
                    "plugin_type": {string},
                    "plugin_family": {string},
                    "plugin_modification_date": {string}
                },
                "solution": {string}, //漏洞解决方案
                "fname": {string},
                "synopsis": {string},
                "description": {string} //漏洞描述
            },
            "pluginfamily": {string},
            "pluginid": {integer}
        }
    },
    "output": [
        plugin_output:{
           "plugin_output": {string}, //输出信息
           "hosts": {string}, //主机信息
           "severity": {integer}, 
           "ports": {} //端口信息
        }
    ]
}

有了这些信息,我们可以通过下面的代码获取这些信息:

def get_vuln_detail(scan_id, host_id, plugin_id)
    header = {
        "X-ApiKeys": "accessKey={accesskey};secretKey={secretkey}".format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        "Content-Type": "application/json",
        "Accept": "text/plain"
    }

    api = "https://{ip}:{port}/scans/{scan_id}/hosts/{host_id}/plugins/{plugin_id}".format(ip=ip, port=port, scan_id=scan_id, host_id=host_id, plugin_id=plugin_id)
    response = requests.get(api, headers=header, verify=False)
    data = json.loads(response.text)
    outputs = data["outputs"]
    return outputs

最后总结

这篇文章我们主要介绍了nessus API从扫描设置到扫描任务创建、启动、停止、以及结果的获取的内容,当然nessus的api不止这些,但是最重要的应该是这些,如果能帮助各位解决手头上的问题自然是极好的,如果不能或者说各位朋友需要更细致的控制,可以使用浏览器抓包的方式来分析它的请求和响应包。
在摸索时最好的两个帮手是浏览器 F12工具栏中的 network和nessus api文档页面上的test工具了。

我们可以先按 <kbd>f12</kbd> 打开工具并切换到network,然后在页面上执行相关操作,观察发包即可发现该如何使用这些API,因为Nessus Web端在操作时也是使用API。如下图:


在这里插入图片描述

或者可以使用文档中的test工具,例如下面是测试 获取插件输出信息的接口


在这里插入图片描述
上一篇下一篇

猜你喜欢

热点阅读