jmeter

Python实现数据库连接池

2020-08-06  本文已影响0人  李白开水

1.初始化

def __init__(self, **kwargs):
        self.size = kwargs.get('size', 10)
        self.kwargs = kwargs
        self.conn_queue = queue.Queue(maxsize=self.size)
        for i in range(self.size):
            self.conn_queue.put(self._create_new_conn())

2.私有方法:连接数据库

    def _create_new_conn(self):
        return MySQLdb.connect(host=self.kwargs.get('host', '127.0.0.1'),
                               user=self.kwargs.get('user'),
                               passwd=self.kwargs.get('password'),
                               port=self.kwargs.get('port', 3306),
                               connect_timeout=5)

3.私有方法:put

    def _put_conn(self, conn):
        self.conn_queue.put(conn)

4.私有方法:获取连接

    def _get_conn(self):
        conn = self.conn_queue.get()
        if conn is None:
            self._create_new_conn()
        return conn

5.执行SQL语句函数

    def exec_sql(self, sql):
        conn = self._get_conn()
        try:
            with conn as cur:
                cur.execute(sql)
                return cur.fetchall()
        except MySQLdb.ProgrammingError as e:
            LOG.error("execute sql ({0}) error {1}".format(sql, e))
            raise e
        except MySQLdb.OperationalError as e:
            conn = self._create_new_conn()
            raise e
        finally:
            self._put_conn(conn)

6.删除连接

    def __del__(self):
        try:
            while True:
                conn = self.conn_queue.get_nowait()
                if conn:
                    conn.close()
        except queue.Empty:
            pass

7.完整代码

# -*- coding:UTF-8 -*-
import queue
import MySQLdb

class ConnectionPool(object):
    def __init__(self, **kwargs):
        self.size = kwargs.get('size', 10)
        self.kwargs = kwargs
        self.conn_queue = queue.Queue(maxsize=self.size)
        for i in range(self.size):
            self.conn_queue.put(self._create_new_conn())

    def _create_new_conn(self):
        return MySQLdb.connect(host=self.kwargs.get('host', '127.0.0.1'),
                               user=self.kwargs.get('user'),
                               passwd=self.kwargs.get('password'),
                               port=self.kwargs.get('port', 3306),
                               connect_timeout=5)

    def _put_conn(self, conn):
        self.conn_queue.put(conn)

    def _get_conn(self):
        conn = self.conn_queue.get()
        if conn is None:
            self._create_new_conn()
        return conn

    def exec_sql(self, sql):
        conn = self._get_conn()
        try:
            with conn as cur:
                cur.execute(sql)
                return cur.fetchall()
        except MySQLdb.ProgrammingError as e:
            # 可以加一行将异常记录到日志中
            raise e
        except MySQLdb.OperationalError as e:
            conn = self._create_new_conn()
            raise e
        finally:
            self._put_conn(conn)

    def __del__(self):
        try:
            while True:
                conn = self.conn_queue.get_nowait()
                if conn:
                    conn.close()
        except queue.Empty:
            pass
上一篇下一篇

猜你喜欢

热点阅读