14. python数据存储
2018-09-05 本文已影响0人
WESTWALL
读取与检索XML文件
parse()
, doc.iterfind()
, findtext()
, get
from xml.etree.ElementTree import parse
doc = parse('files/products.xml')
for item in doc.iterfind('products/product'):
# 读取节点:findtext
id = item.findtext('id')
name = item.findtext('name')
price = item.findtext('price')
# 读取属性:get
uuid = item.get('uuid')
print('uuid={uuid}, id={id}, name={name}, price={price}'
.format(uuid=uuid, id=id, name=name, price=price))
把字典转为XML
模块:dicttoxml
import dicttoxml
from xml.dom.minidom import parseString
import os
d = [20, 'names', {'name':'Bill','age':30,'salary':2000},
{'name':'李宁','age':123,'salary':20004},
{'name':'John','age':20,'salary':1234}]
# 将字典转为xml格式
bxml = dicttoxml.dicttoxml(d, custom_root = 'persons')
# print(bxml)
xml = bxml.decode('utf-8') # 字典中的中文:\xe6\x9d\x8e\xe5\xae\x81
# print(xml)
# 美化xml的格式
dom = parseString(bxml)
prettyxml = dom.toprettyxml(indent = ' ')
print(prettyxml)
把XML转成字典
模块:xmltodict
pprint:格式化字典
import xmltodict
f = open('./files/products.xml', 'r', encoding = 'utf-8')
xml = f.read()
# print(xml)
d = xmltodict.parse(xml)
# print(d)
for v in d['root']['products']['product']:
print('uuid={}, id={}, name={}, price={}'.format(v['@uuid'], v['id'], v['name'], v['price']))
print(d['root']['products']['product'][0]['@uuid'])
# 格式化字典
import pprint
pp = pprint.PrettyPrinter(indent = 4)
pp.pprint(d)
JSON字符串与字典互转
import json
# 这是一个字典
data = {
'name': "Bill",
'company': "Microsoft",
'age': 18,
}
# 将字典转为json字符串
jsonstr = json.dumps(data)
print(jsonstr)
print(type(jsonstr))
# 将json字符串转为字典
data = json.loads(jsonstr)
print(data)
print(type(data))
# json字符串
s = jsonstr
# 使用eval函数将json字符串转成字典
data = eval(s)
print(type(data))
print(data)
# 使用loads函数
data1 = json.loads(s)
print(data1)
print(data1['company'])
将json字符串转为对象
'''
loads
loads函数的object_hook关键字参数指定一个类或一个回调函数
1. 指定类:loads函数会自动创建指定类的实例,并将有json字符串转换成的
字典通过类的构造方法传入类实例,也就是说,指定的类必须有一个可以接收字典
的构造方法。
2. 指定为回调函数:loads函数会调用回调函数返回类的实例,并将由JSON字符串转换成
的字典传入回调函数,也就是说,回调函数必须有一个参数可以接收字典。
'''
import json
class Product:
def __init__(self, d):
self.__dict__ = d
f = open('files/product.json', 'r')
jsonStr = f.read()
# 通过字典
my1 = json.loads(jsonStr, object_hook = Product)
print(type(my1))
print('name','=',my1.name)
print('price','=',my1.price)
print('count','=',my1.count)
print('----------')
def json2Product(d):
return Product(d)
# 通过一个回调函数
my2 = json.loads(jsonStr, object_hook = json2Product)
print('name','=',my2.name)
print('price','=',my2.price)
print('count','=',my2.count)
f.close()
将对象转成JSON字符串
# dumps:将字典转换为JSON字符串
# default关键字参数指定一个回调函数,该回调函数会接收一个类实例
# 回调函数需要返回一个字典,最后,dumps函数会将这个字典转换为JSON字符串
# object -> dict -> JSON
import json
class Product:
def __init__(self, name, price, count):
self.name = name
self.price = price
self.count = count
def product2Dict(obj):
return {
'name': obj.name,
'price': obj.price,
'count': obj.count}
product = Product('特斯拉',1000000,20)
# dumps
jsonStr = json.dumps(product, default = product2Dict,ensure_ascii=False)
print(jsonStr)
类实例列表和JSON互转
import json
from nltk.parse.bllip import _ensure_ascii
class Product:
def __init__(self, d):
self.__dict__ = d
f = open('./files/products.json', 'r')
jsonStr = f.read()
products = json.loads(jsonStr, object_hook = Product)
print(type(products))
for product in products:
print('name','=', product.name)
print('price','=',product.price)
print('count', '=', product.count)
def product2Dict(product):
return {
'name': product.name,
'price': product.price,
'count': product.count}
jsonStr = json.dumps(products, default = product2Dict, ensure_ascii=False)
print(jsonStr)
JSON和XML互转
import json
import dicttoxml
f = open('./files/products.json', 'r')
jsonStr = f.read()
d = json.loads(jsonStr)
print(d)
xmlStr = dicttoxml.dicttoxml(d).decode('utf-8')
print(xmlStr)
f.close()
python连接MySQL数据库
# import json
import pymysql
import json
def connectDB():
db = pymysql.connect(host="192.168.99.100", user="root", password="123456", db="test", port=32777, charset = 'utf8')
return db
# 创建persons表
def createTable(db):
cursor = db.cursor()
sql = '''CREATE TABLE persons
(id INT PRIMARY KEY NOT NULL,
name TEXT NOT NULL,
age INT NOT NULL,
address CHAR(50),
salary REAL);'''
try:
cursor.execute(sql)
db.commit()
return True
except Exception as e:
print(e)
db.rollback()
return False
# 向persons表插入记录
def insertRows(db):
c = db.cursor()
try:
c.execute('delete from persons')
c.execute("insert into persons(id,name,age,address,salary) values(1,'Bill', 32, 'XA City', 5000)")
c.execute("insert into persons(id,name,age,address,salary) values(2,'Mike', 23, 'Beijing City', 4000)")
db.commit()
return True
except Exception as e:
print(e)
db.rollback()
return False
# 查询
def selectRows(db):
c = db.cursor()
sql = 'select name,age,salary from persons order by age desc'
c.execute(sql)
results = c.fetchall()
fields = {'name', 'age', 'salary'}
records = []
for row in results:
records.append(dict(zip(fields, row)))
return json.dumps(records)
db = connectDB()
print('建表:', createTable(db))
print('插入数据:', insertRows(db))
print('查询:', selectRows(db))
db.close()
ORM框架:SQLAlchemy
ORM: ORM(Object Relational Mapping)
安装:pip install SQLAlchemy
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float, exc, orm
from sqlalchemy.ext.declarative import declarative_base
from flask.globals import session
mysql = 'mysql+pymysql://root:123456@192.168.99.100:32777/test?charset=utf8'
tableName = 'persons1'
engin = create_engine(mysql, encoding='utf-8')
engin.connect()
# 建表
metadata = MetaData(engin)
person = Table(tableName, metadata,
Column('id', Integer, primary_key=True),
Column('name', String(30)),
Column('age', Integer),
Column('address', String(100)),
Column('salary', Float))
metadata.create_all(engin)
# 映射到对象
Base = declarative_base()
class Person(Base):
__tablename__ = tableName
id = Column(Integer, primary_key = True)
name = Column(String(30))
age = Column(Integer)
address = Column(String(100))
salary = Column(Float)
Session = orm.sessionmaker(bind=engin)
session = Session()
# 删除记录
session.query(Person).delete()
session.commit()
# 插入记录
p1 = Person(id=10, name='2Dog', age=30, address='XA City', salary=100000)
p2 = Person(id=20, name='胖', age=18, address='Beijing City', salary=900000)
p3 = Person(id=30, name='Joe', age=6, address='Shanghai City', salary=1)
session.add(p1)
session.add(p2)
session.add(p3)
session.commit()
print('插入记录!')
# update
session.query(Person).filter(Person.name == '胖').update({'address': 'New York'})
# 也是update
query = session.query(Person).filter(Person.name == '2Dog')
print(query) # 是个SQL
person = query.scalar()
print(person.name)
person.age = 55
person.salary = 1111
session.commit()
print('更新记录')
# 查询
persons = session.query(Person).filter((Person.age >= 3) & (Person.salary >= 1))
for person in persons:
print('name =', person.name, end=' ')
print('age =', person.age)
# 玩玩persons
print(type(persons)) # <class 'sqlalchemy.orm.query.Query'>
print(persons.first().name)
print(persons.offset(2).scalar().name)
# delete
session.delete(p2)
session.commit()
# 关闭session
session.close()
ORM框架:SQLObject
from sqlobject import *
from sqlobject.mysql import builder
import json
mysql = 'mysql://root:123456@192.168.99.100:32777/test?charset=utf8'
sqlhub.processConnection = connectionForURI(mysql, driver='pymysql')
# 创建对象
class Person(SQLObject):
class sqlmeta:
table = 'persons'
name = StringCol(length = 30)
age = IntCol()
address = StringCol(length = 100)
salary = FloatCol()
# 删除映射表
try:
Person.dropTable()
except Exception as e:
print(e)
Person.createTable()
print('已创建persons表。')
# 直接创建对象
person1 = Person(name='Bill',age=55,address='Earth', salary=1234)
person2 = Person(name='Mike',age=23,address='Math', salary=4321)
person3 = Person(name='John',age=45,address='Sun', salary=4000)
# update
person2.name = '2Dog'
person2.address = 'Moon'
# 查询
persons = Person.selectBy(name = 'Bill')
print(type(persons)) # <class 'sqlobject.sresults.SelectResults'>
print(persons.count()) # 1
print("Bill的地址:", persons[0].address)
for person in persons:
print(type(person)) # <class '__main__.Person'> 迭代的结果就是person对象了
def person2Dict(obj):
return {
'id': obj.id,
'name': obj.name,
'address': obj.address,
'salary': obj.salary
}
# 将查询结果导出为json
jsonStr = json.dumps(persons[0], default = person2Dict, ensure_ascii=False)
print(jsonStr)
# 删除对象
persons[0].destroySelf()
操作MongoDB
安装:pip install pymongo
from pymongo import *
# 连接
client = MongoClient('192.168.99.100', 32778)
# 打开或创建名为test的Collection,相当于mysql里的database
db = client.test
person1 = {"name":"Bill","age":25,"address":"地球","salary":123.0}
person2 = {"name":"Mary","age":22,"address":"火星","salary":6424}
# 打开或创建名为persons的文档,相当于表
persons = db.persons
# delete
persons.delete_many({'age': {'$gt': 0}}) # 删除岁数大于0的
# insert
# personId1 = persons.insert_one(person1).inserted_id
# print(personId1)
# personId2 = persons.insert_one(person2).inserted_id
# print(personId2)
# 批量insert
personList = [person1, person2]
result = persons.insert_many(personList)
print(result.inserted_ids)
# 查询
print(persons.find_one()) # 查第一条
print(persons.find_one({'name': {'$eq': 'Mary'}}))
print(persons.find_one()['name'])
# 搜索所有的数据
for person in persons.find():
print(person)
# 更新数据
persons.update_one({'age': {'$lt': 23}}, {'$set': {'name': 'superme'}})
for person in persons.find():
print(person)
# 删除
persons.delete_one({'age': {'$gt': 23}})
for person in persons.find({'age':{'$lt': 30}}):
print(person)
# 总数
print(persons.count())
练习1
'''
1. 编写一个Python程序,将products.xml文件的内容保存到MongoDB中,
并且可以查找每一个product。
'''
import xmltodict
from pymongo import *
# step1: 读XML转成字典
xml = open('./files/products.xml', 'r', encoding='utf-8').read()
d = xmltodict.parse(xml)
# productList = []
# for v in d['root']['products']['product']:
# # print(v)
# d = {
# 'uuid': v['@uuid'],
# 'id': v['id'],
# 'name': v['name'],
# 'price': v['price'],
# }
# productList.append(d)
# print(productList)
productList = d['root']['products']['product']
# step2: 把字典存到MongoDB里面
client = MongoClient('192.168.99.100', 32778)
db = client.test
products = db.products
# delete 全部删除,也要传一个空的字典
products.delete_many({})
result = products.insert_many(productList)
# result = products.insert_many(d['root']['products']['product'])
print('插入结果:', result.inserted_ids)
# 查询一下MongoDB里面的结果
for p in products.find():
print('查询:', p)
# 查看总数
print('总数:', products.count())
练习2
'''
编写一个Python程序,通过循环向SQLite数据库的
persons表中录入数据。从控制台输入“exit:”后退出循环,
然后输出persons表中的所有数据。
'''
import json
from sqlobject import *
from sqlobject.mysql import builder
import json
mysql = 'mysql://root:123456@192.168.99.100:32780/test?charset=utf8'
sqlhub.processConnection = connectionForURI(mysql, driver='pymysql')
# step1 输入json,转对象
class Person(SQLObject):
class sqlmeta:
table = 'person111'
pid = StringCol(length = 10)
name = StringCol(length = 30)
age = IntCol()
# def __init__(self, d):
# self.__dict__ = d
def json2Person(d):
'''
使用函数进行转换,进行必要的判断
'''
if d.get('pid') == None: raise Exception('必须包含id信息!')
if d.get('name') == None: raise Exception('必须包含name信息!')
if d.get('age') == None:
raise Exception('必须包含age信息,或age必须为数字!')
return Person(pid=d['pid'], name=d['name'], age=d['age'])
# 如果不存在,建表
Person.createTable(ifNotExists=True)
while True:
# {"id": "1", "name": "Bill", "age": 19}
str = input('输入JSON字符串(exp:{"pid": "1", "name": "Bill", "age": 19}):')
if str == 'exit()': break
try:
p = json.loads(str, object_hook = json2Person)
print('已保存到mysql数据库:', p, ', 记录总数为:', Person.select().count())
except Exception as e:
print('请重新输入!错误信息:', e)