Python学习笔记-第9天:项目实战练习(4)+数据库编程
第九天 项目实战练习(4)+数据库编程
今天计划继续坦克大战练习,学习项目及练习源码地址:
GitHub源码
坦克大战继续
碰撞检测
在游戏开发中,通常把显示图像的对象叫做精灵Spire精灵需要有两个属性 image要显示的图像,rect图像要显示在屏幕的位置。
在Pygame框架中使用pygame.sprite模块中的内置函数可以实现碰撞检测。
pygame.sprite.collide_rect(first, second) #返回布尔值
pygame.sprite.Sprite是pygame精灵的基类,一般来说,总是需要写一个自己的精灵类继承pygame.sprite.Sprite。让坦克类、子弹类都继承编写的精灵类。
音效处理
music是pygame中控制流音频的pygame模块,音乐模块与pygame.mixer紧密相连, pygame.mixer是一个用来处理声音的模块,其含义为“混音器”。游戏中对声音的处理一般包括制造声音和播放声音两部分。使用pygame.mixer.music.load()加载一个播放音乐的文件pygame.mixer.music.play()开始播放音乐流。
小结
总算完成了,留有一点遗憾,就是敌方坦克在初始化的时候可能重叠坐标,这样会导致其无法移动(穿墙限制),应该很好处理,只是练习就愉快的跳过去了,我让敌方坦克可以穿透任意坦克,就行了,感觉这样挺有意思:)。需要看源代码的请在上一天的目录里面找地址,我就不贴出来了。
花絮
想发布自己的小程序怎么办?前面讲过如何打包,但是像游戏这样的有资源文件,Pyinstaller如何将资源文件一起打包到一个app中呢?
基本原理:Pyinstaller 可以将资源文件一起bundle到exe中,当exe在运行时,会生成一个临时文件夹,程序可通过sys._MEIPASS访问临时文件夹中的资源
官方说明:文档
测试案例功能描述,访问资源文件夹res/a.txt,并打印其内容。实现方法如下:
源码如下,比较简单,resource_path方法说明了如何使用sys._MEIPASS变量来访问临时文件夹中的资源
#coding:utf-8
import sys
import os
#生成资源文件目录访问路径
def resource_path(relative_path):
if getattr(sys, 'frozen', False): #是否Bundle Resource
base_path = sys._MEIPASS
else:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
#访问res文件夹下a.txt的内容
filename = resource_path(os.path.join("res","a.txt"))
print(filename)
with open(filename) as f:
lines = f.readlines()
print(lines)
f.close()
结下来介绍如何生成可执行文件:
首先需要生成spec文件,pyi-makespec -F test.py (如果要添加Icon等可以在这里就使用pyi-makespec --icon abc.jpg -F test.py语句生成spec文件)。编辑spec文件,在datas选项中说明需要将哪些文件加入exe,在零时文件夹中命名成什么即可。
然后:pyinstaller -F test.spec
数据库编程
数据库的重要性不言而喻!
Python支持多种数据库,从Python3.x版本开始,在标准库中已经内置了SQLlite3模块,它可以支持SQLite3数据库的访问和相关的数据库操作。在需要操作SQLite3数据库数据时,只须在程序中导入SQLite3模块即可。虽然SQLlite3用的比较广泛,但实际遇到更多的还是Mysql,MongoDB,Redis等
SQLite3
Python语言操作SQLite3数据库的基本流程如下所示。
- 导入相关库或模块(SQLite3)。
- 使用connect()连接数据库并获取数据库连接对象。它提供了以下方法:
- .cursor() 方法来创建一个游标对象
- .commit() 方法来处理事务提交
- .rollback() 方法来处理事务回滚
- .close() 方法来关闭一个数据库连接
- 使用con.cursor()获取游标对象。
- 使用游标对象的方法(execute()、executemany()、fetchall()等)来操作数据库,实现插入、修改和删除操作,并查询获取显示相关的记录。
在Python程序中,连接函数sqlite3.connect()有如下两个常用参数。
- database:表示要访问的数据库名。
- timeout:表示访问数据的超时设定。
- 使用close()关闭游标对象和数据库连接。数据库操作完成之后,必须及时调用其close()方法关闭数据库连接,这样做的目的是减轻数据库服务器的压力。
一个样例代码: 示例
Mysql
在Python3中大家用的比较多的还是pymysql模块
安装
pip3 install pymysql
基本使用
# coding:utf-8
import pymysql
#关于中文问题
#1. mysql命令行创建数据库,设置编码为gbk:create databse demo2 character set utf8;
#2. python代码中连接时设置charset="gbk"
#3. 创建表格时设置default charset=utf8
#连接数据库
#charset和mysql服务端设置格式一样(还可设置为gbk, gb2312)
conn = pymysql.connect(host="localhost", user="root", passwd="", db='test', charset='utf8', port=3306)
#创建游标
cursor = conn.cursor()
try:
#执行sql语句
cursor.execute("""create table if not exists t_sales(
id int primary key auto_increment not null,
nickName varchar(128) not null,
color varchar(128) not null,
size varchar(128) not null,
comment text not null,
saledate varchar(128) not null)engine=InnoDB default charset=utf8;""")
# cursor.execute("""insert into t_sales(nickName,color,size,comment,saledate)
# values('%s','%s','%s','%s','%s');""" % ("zack", "黑色", "L", "大小合适", "2019-04-20"))
cursor.execute("""insert into t_sales(nickName,color,size,comment,saledate)
values(%s,%s,%s,%s,%s);""" , ("zack", "黑色", "L", "大小合适", "2019-04-20"))
#提交
conn.commit()
### 插入数据
insert_sql = "insert into t_sales(nickName,color,size,comment,saledate) values(%s,%s,%s,%s,%s);"
#返回受影响的行数
row1 = cursor.execute(insert_sql,("Bob", "黑色", "XL", "便宜实惠", "2019-04-20"))
update_sql = "update t_sales set color='白色' where id=%s;"
#返回受影响的行数
row2 = cursor.execute(update_sql,(1,))
select_sql = "select * from t_sales where id>%s;"
#返回受影响的行数
row3 = cursor.execute(select_sql,(1,))
delete_sql = "delete from t_sales where id=%s;"
#返回受影响的行数
row4 = cursor.execute(delete_sql,(4,))
#提交,不然无法保存新建或者修改的数据(增删改得提交)
conn.commit()
### 批量插入
insert_sql = "insert into t_sales(nickName,color,size,comment,saledate) values(%s,%s,%s,%s,%s);"
data = [("Bob", "黑色", "XL", "便宜实惠", "2019-04-20"),("Ted", "黄色", "M", "便宜实惠", "2019-04-20"),("Gary", "黑色", "M", "穿着舒服", "2019-04-20")]
row1 = cursor.executemany(insert_sql, data)
conn.commit()
## 查询数据
select_sql = "select id,nickname,size from t_sales where id>%s;"
cursor.execute(select_sql, (3,))
row1 = cursor.fetchone() #获取第一条数据,获取后游标会向下移动一行
row_n = cursor.fetchmany(3) #获取前n条数据,获取后游标会向下移动n行
row_all = cursor.fetchall() #获取所有数据,获取后游标会向下移动到末尾
print(row1)
print(row_n)
print(row_all)
# fetch获取的数据默认为元组格式,还可以获取字典类型的,需要修改cursor如下如下:
# cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
#conn.commit()
except BaseException as e:
print(e)
finally:
#关闭游标
cursor.close()
#关闭连接
conn.close()
注意execute执行sql语句参数的两种情况:
execute("insert into t_sales(nickName, size) values('%s','%s');" % ("zack","L") )
#此时的%s为字符窜拼接占位符,需要引号加'%s' (有sql注入风险)
execute("insert into t_sales(nickName, size) values(%s,%s);" , ("zack","L"))
#此时的%s为sql语句占位符,不需要引号%s
多线程操控pymysql - 加锁
对于pymysql模块,通过多线程操控数据库容易出错,得加锁串行执行。进行并发时,可以利用DBUtils模块来维护数据库连接池。
import pymysql
import threadind
#**************************无连接池*******************************
# 每个线程都要创立一次连接,线程并发操作间可能有问题?
def func():
conn = pymysql.connect(host="127.0.0.1",port=3306,db="test",user="root",passwd="",charset="utf8")
cursor = conn.cursor()
cursor.execute("select * from user where nid>1;")
result = cursor.fetchone()
print(result)
cursor.close()
conn.close()
if __name__=="__main__":
for i in range(5):
t = threading.Thread(target=func,name="thread-%s"%i)
t.start()
#**************************无连接池*******************************
#创建一个连接,加锁串行执行
from threading import Lock
import pymysql
import threading
conn = pymysql.connect(host="127.0.0.1",port=3306,db="test",user="root",passwd="",charset="utf8")
lock = Lock()
def func():
with lock:
cursor = conn.cursor()
cursor.execute("select * from user where nid>1;")
result = cursor.fetchone()
print(result)
cursor.close()
#conn.close()不能在线程中关闭连接,否则其他线程不可用了
if __name__=="__main__":
threads = []
for i in range(5):
t = threading.Thread(target=func,name="thread-%s"%i)
threads.append(t)
t.start()
for t in threads:
t.join()
conn.close()
多线程操作 - DBUtils连接池
DBUtils连接池有两种连接模式:PersistentDB和PooledDB (官网文档:https://cito.github.io/DBUtils/UsersGuide.html)
模式一(DBUtils.PersistentDB):为每个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭。
PersistentDB使用代码如下:
#coding:utf-8
from DBUtils.PersistentDB import PersistentDB
import pymysql
import threading
pool = PersistentDB(
creator = pymysql, # 使用链接数据库的模块
maxusage = None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping = 0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
closeable = False, # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
threadlocal = None, # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
host="127.0.0.1",
port = 3306,
user = "root",
password="",
database="test",
charset = "utf8"
)
def func():
conn = pool.connection()
cursor = conn.cursor()
cursor.execute("select * from user where nid>1;")
result = cursor.fetchone()
print(result)
cursor.close()
conn.close()
if __name__ == "__main__":
for i in range(5):
t = threading.Thread(target=func,name="thread-%s"%i)
t.start()
模式二(DBUtils.PooledDB):创建一批连接到连接池,供所有线程共享使用。
(由于pymysql、MySQLdb等threadsafety值为1,所以该模式连接池中的线程会被所有线程共享。)
PooledDB使用代码如下:
from DBUtils.PooledDB import PooledDB
import pymysql
import threading
import time
pool = PooledDB(
creator = pymysql, # 使用链接数据库的模块
maxconnections = 6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached = 2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached = 5, # 链接池中最多闲置的链接,0和None不限制
maxshared = 3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking = True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage = None, # 一个链接最多被重复使用的次数,None表示无限制
setsession = [], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping = 0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host="127.0.0.1",
port = 3306,
user="root",
password="",
database = "test",
charset = "utf8"
)
def func():
conn = pool.connection()
cursor = conn.cursor()
cursor.execute("select * from user where nid>1;")
result = cursor.fetchone()
print(result)
time.sleep(5) #为了查看mysql端的线程数量
cursor.close()
conn.close()
if __name__=="__main__":
for i in range(5):
t = threading.Thread(target=func,name="thread-%s"%i)
t.start()
MongoDB
暂时不学
Redis
暂时不学
ORM
暂时不学
- sqlalchemy
- async-sqlalchemy