Scale a CPU intensive python tas

2020-06-18  本文已影响0人  李传亮

本文旨在尝试使用 Ray 将一个运行在单机环境,非常消耗 CPU 且运行时间较长的算法任务,改成分布式环境运行的程序,以达到同时降低单台机器的负载和提高任务整体运行的速度的作用

1.Setup environment

1.1 Miniconda

我们知道 python 有自带的包管理工具 pip, 为什么我们需要 conda,还有 miniconda 和 conda 的关系是什么?

http://blog.sina.com.cn/s/blog_8a122dcf0102x9vn.html

安装 miniconda 可以参考

https://developers.google.com/earth-engine/python_install-conda

Conda 国内镜像加速

conda config --add channels https://mirrors.ustc.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.ustc.edu.cn/anaconda/pkgs/main/
conda config --set show_channel_urls yes

使用 conda 创建专用于 ray 的环境

conda create -n ray python=3.7
conda activate ray

# 如果发现 python 的版本不符合自己的要求
conda uninstall python
conda install python=3.7

# 将 ray 设置为 conda 登录默认环境
printf '\n# add path to conda\nexport PATH="$HOME/miniconda3/bin:$PATH"\n' >> ~/.bashrc
echo 'source activate ray' >> ~/.bashrc

1.1 Ray

pip install -U ray

1.2 Pandas

pip install -U pandas

1.3 Facebook Faiss

# CPU version only
conda install faiss-cpu -c pytorch

# GPU version
conda install faiss-gpu cudatoolkit=8.0 -c pytorch # For CUDA8
conda install faiss-gpu cudatoolkit=9.0 -c pytorch # For CUDA9
conda install faiss-gpu cudatoolkit=10.0 -c pytorch # For CUDA10

1.4 Ray cluster

# start head
ray start --head --redis-port=6379
# add worker
ray start --address='xxxxxxxx:6379' --redis-password='5241590000000000'

2.Rewrite python script

faiss_query_actor_pool.py

import ray
import load
from ray.util import ActorPool
import faiss_index
import sys
import pandas as pd

ray.init(include_webui=False)

@ray.remote(memory=1500 * 1024 * 1024)
class FaissQuery(object):
    def __init__(self, goods_embed):
        self.TOPN = 1000
        self.index = faiss_index.FaissIndex(goods_embed)

    def search(self, rows):
        #slow operation
        rs = self.index.search_by_vectors(toVectors(rows), self.TOPN)
        return extractResult(rs)        

if __name__ == "__main__":
    goods_file = sys.argv[1]
    query_file = sys.argv[2]
    output_file = sys.argv[3]
    BATCH_SIZE = int(sys.argv[4])
    POOL_SIZE = int(sys.argv[5])

    goods_embed = load.load_embedding(goods_file)
    print('Load goods  ' + str(len(goods_embed)))
    query_embed = pd.read_csv(query_file, sep='\t', header=None, names=['query','embed']).to_dict("records")
    print('Load queries ' + str(len(query_embed)))

    print(f'BATCH_SIZE={BATCH_SIZE}, RAY_ACTOR_POOL_SIZE={POOL_SIZE}')

    actorPool = ActorPool([FaissQuery.remote(goods_embed) for i in range(POOL_SIZE)])

    index = 0
    while index < len(query_embed):
        rows = query_embed[index : index + BATCH_SIZE]

        actorPool.submit(lambda a, v: a.search.remote(rows), rows)

        index += BATCH_SIZE

    with open(output_file, "a") as f:
        while actorPool.has_next():
            f.write(actorPool.get_next())

3.Testing

可以看到当 actor 的数量增加时,计算时间在减少

$ time python3 faiss_query_actor_pool.py  goods.csv quries.csv result 1000 1

Load goods  10000
Load queries 10000
BATCH_SIZE=1000, RAY_ACTOR_POOL_SIZE=1

real    0m21.716s
user    0m4.979s
sys 0m1.908s
$ time python3 faiss_query_actor_pool.py  goods.csv quries.csv result 1000 3

Load goods  10000
Load queries 10000
BATCH_SIZE=1000, RAY_ACTOR_POOL_SIZE=3

real    0m11.756s
user    0m4.168s
sys 0m1.539s

$ time python3 faiss_query_actor_pool.py  goods.csv quries.csv result 1000 6

2020-06-18 17:01:27,406 INFO resource_spec.py:212 -- Starting Ray with 12.84 GiB memory available for workers and up to 6.44 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
Load goods  10000
Load queries 10000
BATCH_SIZE=1000, RAY_ACTOR_POOL_SIZE=6

real    0m8.698s
user    0m4.129s
sys 0m1.414s
上一篇 下一篇

猜你喜欢

热点阅读