Python | 一些奇怪但有用的处理技巧

2018-08-02  本文已影响0人  cathy1997

列举出文件夹下需要的所有xx格式文档

import os
# 第一种情况:当前文件夹下存在,不遍历子文件夹
docLabels = [f for f in os.listdir(filepath) if f.endswith('.xml')]

#第二种情况:遍历当前文件夹及其所有子文件夹
def print_dir(path_list, filepath, pattern):
    for i in os.listdir(filepath):
        path = os.path.join(filepath, i)
        if os.path.isdir(path):
            print_dir(path_list, path, pattern)
        if path.endswith(pattern):
            path_list.append(path)

    return path_list

path_list = []
path_list = print_dir(path_list, filepath, ".xxx")
for path in tqdm(path_list, ncols=70):
    xxx

pandas追加写csv

data = {'vid':data['vid'], 'text':data['token'], 'final_label':json.dumps(final_label)}
df = pd.DataFrame(data, index = [0]) # data是json数据
if os.path.exists(filepath):
    df.to_csv(filepath, header=0, mode='a', index=False, sep=',', encoding='utf-8-sig')
else:
    df.to_csv(filepath, mode='a', index=False, sep=',', encoding='utf-8-sig')

dataframe追加写列

data.loc[index, 'celebrity'] = str(name_list)

读tsv文件

data = pd.read_csv(path, sep='\t', dtype=object, header=None, error_bad_lines=False)
data.columns = ['gid', 'uid', 'uri', 'label', 'inference', 'result', 'msg']

json转tsv

with open('test.json', 'r') as f:
    json_data = f.readlines()

for data in tqdm(json_data, ncols=70):
    data = json.loads(json.loads(data))
        
    data['label'] = label_dic[data['poi_id']]
    del data['poi_id']

    head = ['uri','ai_ocr_sentence','backend_type_name','comment_cnt_all']
    # 第一次打开文件时,第一行写入表头
    path = 'train.tsv'
    if not os.path.exists(path):
        with open(path, "w", newline='', encoding='utf-8') as csvfile:  # newline='' 去除空白行
            writer = csv.DictWriter(csvfile, fieldnames=head, delimiter='\t')  # 写字典的方法
            writer.writeheader()  # 写表头的方法

    # 接下来追加写入内容
    with open(path, "a", newline='', encoding='utf-8') as csvfile:  # newline='' 一定要写,否则写入数据有空白行
        writer = csv.DictWriter(csvfile, fieldnames=head, delimiter='\t')
        writer.writerow(data)  # 按行写入数据

tsv转json

lines = pd.read_csv(path, sep='\t', dtype=object, header=None, error_bad_lines=False)
lines.columns = ['vid', 'cid', 'album', 'cid_title', 'score', 'item_id', 'item_title', 'vv_all']
lines = lines.drop_duplicates()
for i in tqdm(range(len(lines)), ncols=70):
    line = lines.iloc[i].to_json()
    line = json.loads(line)

pickle读写

with open('vid_dic.pkl','wb') as in_data:
    pickle.dump(vid_dic, in_data, pickle.HIGHEST_PROTOCOL)
    
with open('vid_dic.pkl','rb') as out_data:
    vid_dic = pickle.load(out_data)

参数

import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--hdfs_path", type=str, default='hdfs://haruna/home/byte_arnold_hl_vc/user/name.1997/')
parser.add_argument("--save_path", type=str, default = '')
args = parser.parse_args()

os.system('hdfs dfs -get {}'.format(args.hdfs_path))
os.system('hdfs dfs -put {} {}'.format(item, args.save_path))

字典(dict)按键(key)和值(value)排序

>>> dic = {'a':2, 'b':1}
>>> d = sorted(dic.items(), key = lambda k: k[0])
>>> print(d)
[('a', 2), ('b', 1)]
>>> dic = {'a':2, 'b':1}
>>> d = sorted(dic.items(), key = lambda k: k[1])
>>> print(d)
[('b', 1), ('a', 2)]

打印进度条

方法一:print()函数实现

print("\r", "---- 处理到第" + str(j) + "个", end="", flush=True)

方法二:tqdm

for patent in tqdm(patent_list, ncols=10):
    pass

数据分布统计

def status(x) : 
    return pd.Series([x.min(),x.quantile(.25),x.median(),x.quantile(.75),x.mean(),x.max(),x.var(),x.std(),x.skew(),x.kurt()],
                      index=['最小值','25%分位数','中位数','75%分位数','均值','最大值','方差','标准差','偏度','峰度'])

字符串与日期转换

# 输出时间戳对应的年月日信息
test['日期'].apply(lambda x: print(x.year, x.month, x.day))

# 将时间戳转换为字符串
myString = test['日期'].apply(lambda x: x.strftime('%Y-%m-%d'))
published_time = "20" + pub_time[0]
published_time = datetime.datetime.strptime(published_time,'%Y%m%d').date()

写json

with open('train_data.json', 'a') as f:
    json.dump(info_dic, f)
    f.write('\n')
    # f.write(json.dumps(info_dic, ensure_ascii=False)+'\n')

统计出现次数并存入字典

date_dic = {}

if not date in date_dic:
    date_dic[date]=1
else:
    date_dic[date]=date_dic[date]+1

使用RandomForestClassifier查看不同特征的重要程度

from sklearn.ensemble import RandomForestClassifier

clf = RandomForestClassifier(n_estimators=200, criterion='entropy', max_depth=4)
rf_clf = clf.fit(x, y)
rf_clf.feature_importances_

读excel

import xlrd

# 读取训练集
def read_excel(filename):
    data = xlrd.open_workbook(filename)
    table = data.sheet_by_name(u'Sheet1')
    abstract_list = table.col_values(0) #第一列内容
    title_list = table.row_values(0) #第一行内容
    
    train = [abstract_list, title_list]
    
    return train

写excel

import xlrd
from xlrd import open_workbook
import xlwt
from xlutils.copy import copy
import os

# 写到excel中
def write_excel_xls_append(path, value):
    index = len(value)  # 获取需要写入数据的行数
    workbook = open_workbook(path)  # 打开工作簿
    sheet = workbook.sheet_by_index(0)
    rows_old = sheet.nrows
    new_workbook = copy(workbook)  # 将xlrd对象拷贝转化为xlwt对象
    new_worksheet = new_workbook.get_sheet(0)  # 获取转化后工作簿中的第一个表格
    for i in range(0, index):
        for j in range(0, len(value[i])):
            new_worksheet.write(i + rows_old, j,value[i][j])  # 追加写入数据,注意是从i+rows_old行开始写入
    new_workbook.save(path)  # 保存工作簿

if __name__ == "__main__":
    filename = " "
    content_list = []
    if os.path.exists(filename):
        write_excel_xls_append(filename, content_list)
    else:
        excel = xlwt.Workbook()
        sheet = excel.add_sheet("Sheet1")
        head = ["abstract", "claims_n", "claim", "result"]
        for index, value in enumerate(head):
            sheet.write(0, index, value)
        for index, value_list in enumerate(content_list, 1):
            for i, value in enumerate(value_list):
                sheet.write(index, i, value)
        excel.save(filename)

读数据库

import pymysql

def read_content(path):
    con = pymysql.connect(host="",user="",password="",port=3306,charset="utf8",db="")
    cursor = con.cursor()
    sql = "SELECT result FROM table_name WHERE id= '' limit 1;"
    try:
        # 执行sql语句
        cursor.execute(sql)
        # 获取记录列表
        result = cursor.fetchone()
    except:
        # 如果发生错误则回滚
        con.rollback()
    cursor.close()
    # 关闭数据库连接
    con.close()

写数据库

import pymysql

# 保存到数据库
def save_data_to_mysql(val):
    con = pymysql.connect(host="",user="",password="",port=3306,charset="utf8",db="")
    cursor = con.cursor()
    sql="insert into match_result(application_id,filed_time,published_time,location,result)\
        VALUES('"+val[0]+"','"+val[1]+"','"+val[2]+"','"+val[3]+"','"+val[4]+"')"
    # print(sql)
    try:
        # 执行sql语句
        cursor.execute(sql)
        # 提交到数据库执行
        con.commit()
    except:
        # 如果发生错误则回滚
        con.rollback()
    cursor.close()

    # 关闭数据库连接
    con.close()

自动解压当前文件夹下所有zip包

import zipfile
import os

def unzip(path, zfile):
    file_path = path + os.sep + zfile
    desdir = path + os.sep + zfile[:zfile.index('.zip')]
    srcfile = zipfile.ZipFile(file_path)
    for filename in srcfile.namelist():
        srcfile.extract(filename, desdir)
        if filename.endswith('.zip'):
            # if zipfile.is_zipfile(filename):
            path = desdir
            zfile = filename
            unzip(path, zfile)

# 定位到每个zip文件
def print_dir(filepath):
    for i in os.listdir(filepath):
        path = os.path.join(filepath, i)
        if os.path.isdir(path):
            print_dir(path)
        if path.endswith(".zip"):
            unzip(file_path, path.split('/')[-1])

file_path = "D:/Pythonworkspace/patent/data/Application/2010/"
print_dir(file_path)

读取xml文本

import  xml.dom.minidom

#打开xml文档
dom = xml.dom.minidom.parse('C:/Users/asus/Desktop/1.xml')
#得到文档元素对象
root = dom.documentElement

urls = dom.getElementsByTagName('url')
copus = ""
for url in urls:
    copus = copus + url.firstChild.data + ";"
    # copus.append(url.firstChild.data)

text = "https://www.drugs.com/sfx/nytol-quickcaps-side-effects.html"

if copus.find(text) == 0:
    print("已经存在")

python list 和dict的查找效率比较

import time

query_lst = [-60000,-6000,-600,-60,-6,0,6,60,600,6000,60000]

lst = []
dic = {}
for i in range(100000000):
    lst.append(i)
    dic[i] = 1 
start = time.time()
for v in query_lst:
    if v in lst:
        continue
end1 = time.time()
for v in query_lst:
    if v in dic:
        continue
end2 = time.time()
print "list search time : %f"%(end1-start)
print "dict search time : %f"%(end2-end1)

运行结果:
list search time : 11.836798
dict search time : 0.000007
list的查找效率远远低于dict的效率,原因在于:python中list对象的存储结构采用的是线性表,因此其查询复杂度为O(n),而dict对象的存储结构采用的是散列表(hash表),其在最优情况下查询复杂度为O(1)。

效率:set>dic>list

上一篇下一篇

猜你喜欢

热点阅读