14. python数据存储

2018-09-05  本文已影响0人  WESTWALL

读取与检索XML文件

parse(), doc.iterfind(), findtext(), get

from xml.etree.ElementTree import parse
doc = parse('files/products.xml')
for item in doc.iterfind('products/product'):
    # 读取节点:findtext
    id = item.findtext('id')
    name = item.findtext('name')
    price = item.findtext('price')
    # 读取属性:get
    uuid = item.get('uuid')
    print('uuid={uuid}, id={id}, name={name}, price={price}'
          .format(uuid=uuid, id=id, name=name, price=price))

把字典转为XML

模块:dicttoxml

import dicttoxml
from xml.dom.minidom import parseString
import os

d = [20, 'names', {'name':'Bill','age':30,'salary':2000},
                  {'name':'李宁','age':123,'salary':20004},
                  {'name':'John','age':20,'salary':1234}]

# 将字典转为xml格式
bxml = dicttoxml.dicttoxml(d, custom_root = 'persons')
# print(bxml)
xml = bxml.decode('utf-8')  # 字典中的中文:\xe6\x9d\x8e\xe5\xae\x81
# print(xml)

# 美化xml的格式
dom = parseString(bxml)
prettyxml = dom.toprettyxml(indent = '    ')
print(prettyxml)

把XML转成字典

模块:xmltodict
pprint:格式化字典

import xmltodict
f = open('./files/products.xml', 'r', encoding = 'utf-8')
xml = f.read()
# print(xml)

d = xmltodict.parse(xml)
# print(d)
for v in d['root']['products']['product']:
    print('uuid={}, id={}, name={}, price={}'.format(v['@uuid'], v['id'], v['name'], v['price']))

print(d['root']['products']['product'][0]['@uuid'])

# 格式化字典
import pprint
pp = pprint.PrettyPrinter(indent = 4)
pp.pprint(d)

JSON字符串与字典互转

import json
# 这是一个字典
data = {
    'name': "Bill",
    'company': "Microsoft",
    'age': 18,
    }

# 将字典转为json字符串
jsonstr = json.dumps(data)
print(jsonstr)
print(type(jsonstr))

# 将json字符串转为字典
data = json.loads(jsonstr)
print(data)
print(type(data))

# json字符串
s = jsonstr

# 使用eval函数将json字符串转成字典
data = eval(s)
print(type(data))
print(data)

# 使用loads函数
data1 = json.loads(s)
print(data1)
print(data1['company'])

将json字符串转为对象

'''
loads

loads函数的object_hook关键字参数指定一个类或一个回调函数

1. 指定类:loads函数会自动创建指定类的实例,并将有json字符串转换成的
字典通过类的构造方法传入类实例,也就是说,指定的类必须有一个可以接收字典
的构造方法。

2. 指定为回调函数:loads函数会调用回调函数返回类的实例,并将由JSON字符串转换成
的字典传入回调函数,也就是说,回调函数必须有一个参数可以接收字典。
'''

import json

class Product:
    def __init__(self, d):
        self.__dict__ = d
        
f = open('files/product.json', 'r')
jsonStr = f.read()

# 通过字典
my1 = json.loads(jsonStr, object_hook = Product)
print(type(my1))
print('name','=',my1.name)
print('price','=',my1.price)
print('count','=',my1.count)
print('----------')

def json2Product(d):
    return Product(d)

# 通过一个回调函数
my2 = json.loads(jsonStr, object_hook = json2Product)
print('name','=',my2.name)
print('price','=',my2.price)
print('count','=',my2.count)
f.close()

将对象转成JSON字符串

# dumps:将字典转换为JSON字符串
# default关键字参数指定一个回调函数,该回调函数会接收一个类实例
# 回调函数需要返回一个字典,最后,dumps函数会将这个字典转换为JSON字符串
# object -> dict -> JSON

import json
class Product:
    def __init__(self, name, price, count):
        self.name = name
        self.price = price
        self.count = count
def product2Dict(obj):
    return {
        'name': obj.name,
        'price': obj.price,
        'count': obj.count}

product = Product('特斯拉',1000000,20)
# dumps
jsonStr = json.dumps(product, default = product2Dict,ensure_ascii=False)
print(jsonStr)

类实例列表和JSON互转

import json
from nltk.parse.bllip import _ensure_ascii

class Product:
    def __init__(self, d):
        self.__dict__ = d
f = open('./files/products.json', 'r')
jsonStr = f.read()

products = json.loads(jsonStr, object_hook = Product)
print(type(products))
for product in products:
    print('name','=', product.name)
    print('price','=',product.price)
    print('count', '=', product.count)
    
def product2Dict(product):
    return {
        'name': product.name,
        'price': product.price,
        'count': product.count}
jsonStr = json.dumps(products, default = product2Dict, ensure_ascii=False)
print(jsonStr)

JSON和XML互转

import json
import dicttoxml

f = open('./files/products.json', 'r')
jsonStr = f.read()
d = json.loads(jsonStr)
print(d)
xmlStr = dicttoxml.dicttoxml(d).decode('utf-8')
print(xmlStr)
f.close()

python连接MySQL数据库

# import json
import pymysql
import json

def connectDB():
    db = pymysql.connect(host="192.168.99.100", user="root", password="123456", db="test", port=32777, charset = 'utf8')
    return db
# 创建persons表
def createTable(db):
    cursor = db.cursor()
    sql = '''CREATE TABLE persons
               (id INT PRIMARY KEY NOT NULL,
                name TEXT NOT NULL,
                age INT NOT NULL,
                address CHAR(50),
                salary REAL);'''
    try:
        cursor.execute(sql)
        db.commit()
        return True
    except Exception as e:
        print(e)
        db.rollback()
    return False
# 向persons表插入记录
def insertRows(db):
    c = db.cursor()
    try:
        c.execute('delete from persons')
        c.execute("insert into persons(id,name,age,address,salary) values(1,'Bill', 32, 'XA City', 5000)")
        c.execute("insert into persons(id,name,age,address,salary) values(2,'Mike', 23, 'Beijing City', 4000)")
        db.commit()
        return True
    except Exception as e:
        print(e)
        db.rollback()
    return False
# 查询
def selectRows(db):
    c = db.cursor()
    sql = 'select name,age,salary from persons order by age desc'
    c.execute(sql)
    results = c.fetchall()
    fields = {'name', 'age', 'salary'}
    records = []
    for row in results:
        records.append(dict(zip(fields, row)))
    return json.dumps(records)
    
db = connectDB()
print('建表:', createTable(db))
print('插入数据:', insertRows(db))
print('查询:', selectRows(db))
db.close()

ORM框架:SQLAlchemy

ORM: ORM(Object Relational Mapping)
安装:pip install SQLAlchemy

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, Float, exc, orm
from sqlalchemy.ext.declarative import declarative_base
from flask.globals import session

mysql = 'mysql+pymysql://root:123456@192.168.99.100:32777/test?charset=utf8'
tableName = 'persons1'

engin = create_engine(mysql, encoding='utf-8')
engin.connect()

# 建表
metadata = MetaData(engin)
person = Table(tableName, metadata,
               Column('id', Integer, primary_key=True),
               Column('name', String(30)),
               Column('age', Integer),
               Column('address', String(100)),
               Column('salary', Float))
metadata.create_all(engin)

# 映射到对象
Base = declarative_base()
class Person(Base):
    __tablename__ = tableName
    id = Column(Integer, primary_key = True)
    name = Column(String(30))
    age = Column(Integer)
    address = Column(String(100))
    salary = Column(Float)
Session = orm.sessionmaker(bind=engin)
session = Session()

# 删除记录
session.query(Person).delete()
session.commit()

# 插入记录
p1 = Person(id=10, name='2Dog', age=30, address='XA City', salary=100000)
p2 = Person(id=20, name='胖', age=18, address='Beijing City', salary=900000)
p3 = Person(id=30, name='Joe', age=6, address='Shanghai City', salary=1)
session.add(p1)
session.add(p2)
session.add(p3)
session.commit()
print('插入记录!')

# update
session.query(Person).filter(Person.name == '胖').update({'address': 'New York'})

# 也是update
query = session.query(Person).filter(Person.name == '2Dog')
print(query)  # 是个SQL
person = query.scalar()
print(person.name)
person.age = 55
person.salary = 1111

session.commit()
print('更新记录')

# 查询
persons = session.query(Person).filter((Person.age >= 3) & (Person.salary >= 1))
for person in persons:
    print('name =', person.name, end=' ')
    print('age =', person.age)

# 玩玩persons    
print(type(persons))  # <class 'sqlalchemy.orm.query.Query'>
print(persons.first().name)
print(persons.offset(2).scalar().name)

# delete
session.delete(p2)
session.commit()

# 关闭session
session.close()

ORM框架:SQLObject

from sqlobject import *
from sqlobject.mysql import builder
import json

mysql = 'mysql://root:123456@192.168.99.100:32777/test?charset=utf8'
sqlhub.processConnection = connectionForURI(mysql, driver='pymysql')

# 创建对象
class Person(SQLObject):
    class sqlmeta:
        table = 'persons'
    name = StringCol(length = 30)
    age = IntCol()
    address = StringCol(length = 100)
    salary = FloatCol()
    
# 删除映射表
try:
    Person.dropTable()
except Exception as e:
    print(e)
    
Person.createTable()
print('已创建persons表。')

# 直接创建对象
person1 = Person(name='Bill',age=55,address='Earth', salary=1234)
person2 = Person(name='Mike',age=23,address='Math', salary=4321)
person3 = Person(name='John',age=45,address='Sun', salary=4000)

# update
person2.name = '2Dog'
person2.address = 'Moon'

# 查询
persons = Person.selectBy(name = 'Bill')
print(type(persons))  # <class 'sqlobject.sresults.SelectResults'>
print(persons.count()) # 1
print("Bill的地址:", persons[0].address)
for person in persons:
    print(type(person))  # <class '__main__.Person'>  迭代的结果就是person对象了

def person2Dict(obj):
    return {
        'id': obj.id,
        'name': obj.name,
        'address': obj.address,
        'salary': obj.salary
        }

# 将查询结果导出为json    

jsonStr = json.dumps(persons[0], default = person2Dict, ensure_ascii=False)
print(jsonStr)

# 删除对象
persons[0].destroySelf()

操作MongoDB

安装:pip install pymongo

from pymongo import *

# 连接
client = MongoClient('192.168.99.100', 32778)
# 打开或创建名为test的Collection,相当于mysql里的database
db = client.test

person1 = {"name":"Bill","age":25,"address":"地球","salary":123.0}
person2 = {"name":"Mary","age":22,"address":"火星","salary":6424}
# 打开或创建名为persons的文档,相当于表
persons = db.persons

# delete
persons.delete_many({'age': {'$gt': 0}})  # 删除岁数大于0的

# insert
# personId1 = persons.insert_one(person1).inserted_id
# print(personId1)
# personId2 = persons.insert_one(person2).inserted_id
# print(personId2)

# 批量insert
personList = [person1, person2]
result = persons.insert_many(personList)
print(result.inserted_ids)

# 查询
print(persons.find_one()) # 查第一条
print(persons.find_one({'name': {'$eq': 'Mary'}}))
print(persons.find_one()['name'])

# 搜索所有的数据
for person in persons.find():
    print(person)
    
# 更新数据
persons.update_one({'age': {'$lt': 23}}, {'$set': {'name': 'superme'}})
for person in persons.find():
    print(person)
    
# 删除
persons.delete_one({'age': {'$gt': 23}})
for person in persons.find({'age':{'$lt': 30}}):
    print(person)
    
# 总数
print(persons.count())

练习1

'''
1.  编写一个Python程序,将products.xml文件的内容保存到MongoDB中,
并且可以查找每一个product。
'''

import xmltodict
from pymongo import *

# step1: 读XML转成字典
xml = open('./files/products.xml', 'r', encoding='utf-8').read()

d = xmltodict.parse(xml)

# productList = []
# for v in d['root']['products']['product']:
# #     print(v)
#     d = {
#         'uuid': v['@uuid'],
#         'id': v['id'],
#         'name': v['name'],
#         'price': v['price'],
#         }
#     productList.append(d)
# print(productList)
productList = d['root']['products']['product']

# step2: 把字典存到MongoDB里面
client = MongoClient('192.168.99.100', 32778)
db = client.test
products = db.products

# delete  全部删除,也要传一个空的字典
products.delete_many({})

result = products.insert_many(productList)
# result = products.insert_many(d['root']['products']['product'])
print('插入结果:', result.inserted_ids)

# 查询一下MongoDB里面的结果
for p in products.find():
    print('查询:', p)
    
# 查看总数
print('总数:', products.count())

练习2

'''
编写一个Python程序,通过循环向SQLite数据库的
persons表中录入数据。从控制台输入“exit:”后退出循环,
然后输出persons表中的所有数据。
'''
import json
from sqlobject import *
from sqlobject.mysql import builder
import json

mysql = 'mysql://root:123456@192.168.99.100:32780/test?charset=utf8'
sqlhub.processConnection = connectionForURI(mysql, driver='pymysql')

# step1 输入json,转对象
class Person(SQLObject):
    class sqlmeta:
        table = 'person111'
    pid = StringCol(length = 10)
    name = StringCol(length = 30)
    age = IntCol()
#     def __init__(self, d):
#         self.__dict__ = d
        
def json2Person(d):
    '''
使用函数进行转换,进行必要的判断
    '''
    if d.get('pid') == None: raise Exception('必须包含id信息!')
    if d.get('name') == None: raise Exception('必须包含name信息!')
    if d.get('age') == None: 
        raise Exception('必须包含age信息,或age必须为数字!')
    return Person(pid=d['pid'], name=d['name'], age=d['age'])

# 如果不存在,建表
Person.createTable(ifNotExists=True)

while True:
    # {"id": "1", "name": "Bill", "age": 19}
    str = input('输入JSON字符串(exp:{"pid": "1", "name": "Bill", "age": 19}):')
    if str == 'exit()': break
    try:
        p = json.loads(str, object_hook = json2Person)
        print('已保存到mysql数据库:', p, ', 记录总数为:', Person.select().count())
    except Exception as e:
        print('请重新输入!错误信息:', e)
上一篇下一篇

猜你喜欢

热点阅读