pysnmp 自定义MIB编写SNMP代理 (python 开发

2019-08-23  本文已影响0人  Joncc
简介

PySNMP是一种跨平台的纯Python SNMP 引擎实现。它具有功能齐全的SNMP引擎,能够以代理/管理器/代理角色运行,通过IPv4 / IPv6和其他网络传输来讨论SNMP v1 / v2c / v3协议版本。

尽管它的名字,SNMP并不是一个简单的协议。例如,它的第三个版本引入了复杂的开放式安全框架,多语言功能,远程配置和其他功能。PySNMP实施紧密遵循复杂的系统细节和功能,为用户带来最大的功能和灵活性。
———— pysnmp官网

  1. 安装软件
yum install net-snmp net-snmp-python  \
libsmi-devel libsmi

pip pyasn1==0.4.4
pysnmp==4.4.2
pysmi==0.3.4
  1. 编写mib, MY-MIB
-- MY-MIB这个名字到处python 代码的时候要一样。否则认不出来
MY-MIB DEFINITIONS ::= BEGIN

IMPORTS
        OBJECT-TYPE, Integer32, NOTIFICATION-TYPE, enterprises
                     FROM SNMPv2-SMI
;

myCompany       OBJECT IDENTIFIER ::= {enterprises 42}

testCount OBJECT-TYPE
    SYNTAX Integer32
    MAX-ACCESS read-only
    STATUS current
    DESCRIPTION "A sample count of something."
    ::= {myCompany 1}

testDescription OBJECT-TYPE
    SYNTAX OCTET STRING
    MAX-ACCESS read-only
    STATUS current
    DESCRIPTION "A description of something"
    ::= {myCompany 2}

testTrap NOTIFICATION-TYPE
    STATUS current
    DESCRIPTION "Test notification"
    ::= {myCompany 3}

END
  1. 把MIB文件转换为Python模块pysnmp可以读取的代码

ps : 这里MY-MIB.py 的文件名要与mib的文件MY-MIB DEFINITIONS ::= BEGIN 名字一样

build-pysnmp-mib  -o MY-MIB.py MY-MIB

这将导出一个名为的新文件MY-MIB.py。如果要查询我们的代理,您需要将MIB的副本添加到 net-snmp搜索MIB的位置我添加了MY-MIB文件/usr/lib/python2.7/site-packages/pysnmp/smi/mibs

  1. 代码
from pysnmp.entity import engine, config
from pysnmp import debug
from pysnmp.entity.rfc3413 import cmdrsp, context, ntforg
from pysnmp.carrier.asynsock.dgram import udp
from pysnmp.smi import builder

import threading
import collections
import time

#can be useful
#debug.setLogger(debug.Debug('all'))

MibObject = collections.namedtuple('MibObject', ['mibName',
                                   'objectType', 'valueFunc'])

class Mib(object):
    """Stores the data we want to serve. 
    """

    def __init__(self):
        self._lock = threading.RLock()
        self._test_count = 0

    def getTestDescription(self):
        return "My Description"

    def getTestCount(self):
        with self._lock:
            return self._test_count

    def setTestCount(self, value):
        with self._lock:
            self._test_count = value


def createVariable(SuperClass, getValue, *args):
    """This is going to create a instance variable that we can export. 
    getValue is a function to call to retreive the value of the scalar
    """
    class Var(SuperClass):
        def readGet(self, name, *args):
            return name, self.syntax.clone(getValue())
    return Var(*args)


class SNMPAgent(object):
    """Implements an Agent that serves the custom MIB and
    can send a trap.
    """

    def __init__(self, mibObjects):
        """
        mibObjects - a list of MibObject tuples that this agent
        will serve
        """

        #each SNMP-based application has an engine
        self._snmpEngine = engine.SnmpEngine()

        #open a UDP socket to listen for snmp requests
        config.addSocketTransport(self._snmpEngine, udp.domainName,
                                  udp.UdpTransport().openServerMode(('', 161)))

        #add a v2 user with the community string public
        config.addV1System(self._snmpEngine, "agent", "public")
        #let anyone accessing 'public' read anything in the subtree below,
        #which is the enterprises subtree that we defined our MIB to be in
        config.addVacmUser(self._snmpEngine, 2, "agent", "noAuthNoPriv",
                           readSubTree=(1,3,6,1,4,1))

        #each app has one or more contexts
        self._snmpContext = context.SnmpContext(self._snmpEngine)

        #the builder is used to load mibs. tell it to look in the
        #current directory for our new MIB. We'll also use it to
        #export our symbols later
        mibBuilder = self._snmpContext.getMibInstrum().getMibBuilder()
        mibSources = mibBuilder.getMibSources() + (builder.DirMibSource('.'),)
        mibBuilder.setMibSources(*mibSources)

        #our variables will subclass this since we only have scalar types
        #can't load this type directly, need to import it
        MibScalarInstance, = mibBuilder.importSymbols('SNMPv2-SMI',
                                                      'MibScalarInstance')
        #export our custom mib
        for mibObject in mibObjects:
            nextVar, = mibBuilder.importSymbols(mibObject.mibName,
                                                mibObject.objectType)
            instance = createVariable(MibScalarInstance,
                                      mibObject.valueFunc,
                                      nextVar.name, (0,),
                                      nextVar.syntax)
            #need to export as <var name>Instance
            instanceDict = {str(nextVar.name)+"Instance":instance}
            mibBuilder.exportSymbols(mibObject.mibName,
                                     **instanceDict)

        # tell pysnmp to respotd to get, getnext, and getbulk
        cmdrsp.GetCommandResponder(self._snmpEngine, self._snmpContext)
        cmdrsp.NextCommandResponder(self._snmpEngine, self._snmpContext)
        cmdrsp.BulkCommandResponder(self._snmpEngine, self._snmpContext)


    def setTrapReceiver(self, host, community):
        """Send traps to the host using community string community
        """
        config.addV1System(self._snmpEngine, 'nms-area', community)
        config.addVacmUser(self._snmpEngine, 2, 'nms-area', 'noAuthNoPriv',
                           notifySubTree=(1,3,6,1,4,1))
        config.addTargetParams(self._snmpEngine,
                               'nms-creds', 'nms-area', 'noAuthNoPriv', 1)
        config.addTargetAddr(self._snmpEngine, 'my-nms', udp.domainName,
                             (host, 162), 'nms-creds',
                             tagList='all-my-managers')
        #set last parameter to 'notification' to have it send
        #informs rather than unacknowledged traps
        config.addNotificationTarget(
            self._snmpEngine, 'test-notification', 'my-filter',
            'all-my-managers', 'trap')


    def sendTrap(self):
        print "Sending trap"
        ntfOrg = ntforg.NotificationOriginator(self._snmpContext)
        errorIndication = ntfOrg.sendNotification(
            self._snmpEngine,
            'test-notification',
            ('MY-MIB', 'testTrap'),
            ())


    def serve_forever(self):
        print "Starting agent"
        self._snmpEngine.transportDispatcher.jobStarted(1)
        try:
           self._snmpEngine.transportDispatcher.runDispatcher()
        except:
            self._snmpEngine.transportDispatcher.closeDispatcher()
            raise

class Worker(threading.Thread):
    """Just to demonstrate updating the MIB
    and sending traps
    """

    def __init__(self, agent, mib):
        threading.Thread.__init__(self)
        self._agent = agent
        self._mib = mib
        self.setDaemon(True)

    def run(self):
        while True:
            time.sleep(3)
            self._mib.setTestCount(mib.getTestCount()+1)
            self._agent.sendTrap()

if __name__ == '__main__':
    mib = Mib()
    # MY-MIB是我们自定义的mib模块名字,testDescription是对应的oid, mib.getTestDescription是获取该oid的值的方法。
    objects = [MibObject('MY-MIB', 'testDescription', mib.getTestDescription),
               MibObject('MY-MIB', 'testCount', mib.getTestCount)]
    agent = SNMPAgent(objects)
    agent.setTrapReceiver('192.168.1.14', 'traps')
    Worker(agent, mib).start()
    try:
        agent.serve_forever()
    except KeyboardInterrupt:
        print "Shutting down"

启动

python snmpAgent.py

测试

snmpget -v2c -c public {snmp服务器ip} {oid}
上一篇下一篇

猜你喜欢

热点阅读