Python进阶随笔【一】

2019-11-12  本文已影响0人  RabbitMask

这里整理了一些笔者的小轮子,可能比较杂,也可能并不基础,但是,你会回来感谢我的。

动态函数调用

应用背景:笔者有次项目,需要调几千个脚本用来做并发,我不可能每一个都要写个方法去执行,几千个文件名就能把我整懵逼了,所以只能动态函数名称来解决,自动调用。

#workindex.py
workindex={'task1':'result1'}
#task1.py
def run(username):
    if username=='rabbitmask':
        return 1
import task1
import taskindex

def work(username,workname):
    if eval(workname).run(username) == 1:
        return workindex['{}'.format(workname)]

work('rabbitmask','task1')

异步——多进程文件存储

应用背景:说到多进程并发存储,如果是数据库还好,但如果是文件,就会造成条件竞争,所以需要做异步调用。

def getinfo(j,k,q):
    pass
    q.put(j)
    return result

def saveinfo(result):
    for i in result:
        fw=open('result.txt','a')
        fw.write(i+'\n')
        fw.close()

def poolmana(keyword):
    p = Pool(10)
    q = Manager().Queue()
    for i in range(100000000/100):
        for j in range(i*100,i*100+100):
            p.apply_async(getinfo, args=(j+1, keyword,q),callback=saveinfo)
    p.close()
    p.join()

内存分批调度

应用背景:从上面看到了,简单提下,看下下面两个方案会有什么区别?或者干脆跑一下,你就晓得为什么要用前者了。

for i in range(100000000 / 100):
    for j in range(i * 100, i * 100 + 100):
        p.apply_async(getinfo, args=(j + 1, keyword, q), callback=saveinfo)

or

for i in range(100000000):
    p.apply_async(getinfo, args=(i + 1, keyword, q), callback=saveinfo)

Redis——Set存储非字符串

应用背景:笔者遇到了一个临时存储list的需求,redis自带list存储操作,但笔者的需求是整存一个复杂列表,测试之后发现极其不便,最终通过将数据json序列化之后以字符串的形式存储,读出并经过web传输后(刚好,因为列表也是没法直接传输的)后,再进行json反序列化,最终拿到的数据类型依然为list。

def redissaveinfo(timetoken,result):
    r.set(timetoken,result)

def redisgetinfo(timetoken):
    res=r.get(timetoken)
    return res

def save(timetoken): 
    res=['rabbit','carrot']
    list_str = json.dumps(res)
    redissaveinfo(timetoken,list_str)

def get(timetoken):
    res=redisgetinfo(timetoken)
    return res

def run(timetoken):
    res=get(timetoken)
    str_list =json.loads(res)
    return set_list

list-str互转

应用背景:显然,这个标题也是因为上面的需求拓展的,list_str众所周知可以说极其简单,为什么中间要在引入序列化与反序列化的机制呢?原因上面提到了,复杂列表,如果列表二维或三维,穿插着各种特殊字符和字典,下面的公知做法还能解决吗?

list = [1, 2, 3, 4, 5]
list_str=''.join(list)

str = '1,2,3,4,5'
str_list = str.split(",") 

异步——拉起非阻塞线程

应用背景:有的时候我们通过传参执行一项作业,至少对于当前请求并不关注他有没有结果,当前请求的任务就是把参数传达到即可,但如果因为后端作业复杂,并不能短时间内给出response,这我们并不能接受,因为我们并不想阻塞在这里等待回应。以上场景再前后端分离开发中Ajax常会遇到。

from concurrent.futures import ThreadPoolExecutor

def run(keyword):
    pass

@app.route('/API', methods=['POST'])
def API():
    username = request.form['keyword']
    executor.submit(run, keyword)
    result = {'status': 'Task Start!'}
    return jsonify(result)

多线程——返回值获取

应用背景:原生的多线程库并没有提供结果返回读取,而有的情景我们需要实时获取每个线程的return,可以通过方法重构完成。

#ThreadPlus.py
from threading import Thread

class MyThread(Thread):
    def __init__(self, func, args):
        super(MyThread, self).__init__()
        self.func = func
        self.args = args

    def run(self):
        self.result = self.func(*self.args)

    def get_result(self):
        try:
            return self.result
        except Exception:
            return None
from ThreadPlus import MyThread

def scan(keylword,timetoken):
    t1 = MyThread(task1.run, args=(keylword,))
    t2 = MyThread(task2.run, args=(keylword,))
    t3 = MyThread(task3.run, args=(keylword,))

    t1.setDaemon(True)
    t2.setDaemon(True)
    t3.setDaemon(True)

    for i in [t1, t2, t3]:
        i.start()

    for i in [t1, t2, t3]:
        i.join()

    res={'task1':t1.get_result() , 'task2':t2.get_result() , 'task3':t3.get_result()}
    return res

进程间通信——全局变量

应用背景:先吐槽一句,并发这个东西真的有意思,多线程需要防止线程间干扰所以需要线程同步,多进程因为相互独立又要实现进程间通信,难搞哦。笔者现在有一个索引值,需要搜集每个进程间的结果。

from multiprocessing import Manager,Pool

def getinfo(j,k,res,q):
    res.append('hello'+j+k)
    q.put(j)

def poolmana(keyword):
    res = Manager().list([])
    p = Pool(10)
    q = Manager().Queue()
    for i in range(10000/100):
        for j in range(i*100,i*100+100):
            p.apply_async(getinfo, args=(j+1, keyword,res,q))
    p.close()
    p.join()
    return res

END

以上是可以看作是笔者的近期随笔,所以顺序和知识点很随意,但干的一匹,如果有幸的话,很高兴能帮得到您。

上一篇 下一篇

猜你喜欢

热点阅读