大数据 爬虫Python AI SqlPostgreSQL

Python多进程COPY PostgreSql表实验

2017-04-28  本文已影响0人  OhBonsai

背景

因为需要进行代码优化。所以进行数据表的整表COPY
一直很好奇,多进程对于copy是否有优化呢?于是做了一些实验。
实验环境:32核I5的服务器。内存200G

实验一: 两个进程 分开copy两个表

Python代码如下

dsn = 'postgresql://postgres:pset123456@192.168.10.10/CHN_NAVINFO_2016Spr_0082_0002_108'

conn1 = psycopg2.connect(dsn=dsn)
conn2 = psycopg2.connect(dsn=dsn)

io1 = open('rdb_node.csv', 'w')
io2 = open('rdb_node_with_all_attri_view.csv', 'w')


sql1 = """copy (select * from rdb_node order by node_id_t, node_id) to STDOUT delimiter '|' csv header"""
sql2 = """copy (select * from rdb_node_with_all_attri_view order by node_id_t, node_id) to STDOUT delimiter '|' csv header"""


def table_size(table_name, c):
    cur = c.cursor()
    cur.execute("select pg_size_pretty(pg_relation_size('%s'));" % table_name)
    s = cur.fetchone()[0]
    cur.close()
    return s

print 'rdb_node size:', table_size('rdb_node', conn1)
print 'rdb_node_with_all_attri_view:', table_size('rdb_node_with_all_attri_view', conn1)


def work(conn, sql, io):
    ss = time.time()
    cur = conn.cursor()
    cur.copy_expert(sql, io)
    print 'PID {} cost: {}'.format(multiprocessing.current_process().pid, time.time() - ss)


multiprocessing.Process(target=work, args=(conn1, sql1, io1)).start()
multiprocessing.Process(target=work, args=(conn2, sql2, io2)).start()

结果

multi Process COPY multi table
rdb_node size: 2559 MB
rdb_node_with_all_attri_view: 2073 MB
PID 18489 cost: 69.7677941322
PID 18490 cost: 75.4461951256

实验二: 一个进程copy两个表

Python代码如下

dsn = 'postgresql://postgres:pset123456@192.168.10.10/CHN_NAVINFO_2016Spr_0082_0002_108'
conn = psycopg2.connect(dsn=dsn)
io1 = open('rdb_node.csv', 'w')
io2 = open('rdb_node_with_all_attri_view.csv', 'w')
sql1 = """copy (select * from rdb_node order by node_id_t, node_id) to STDOUT delimiter '|' csv header"""
sql2 = """copy (select * from rdb_node_with_all_attri_view order by node_id_t, node_id) to STDOUT delimiter '|' csv header"""


def table_size(table_name, c):
    cur = c.cursor()
    cur.execute("select pg_size_pretty(pg_relation_size('%s'));" % table_name)
    s = cur.fetchone()[0]
    cur.close()
    return s

print 'rdb_node size:', table_size('rdb_node', conn)
print 'rdb_node_with_all_attri_view:', table_size('rdb_node_with_all_attri_view', conn)

s = time.time()
cur1 = conn.cursor()
cur1.copy_expert(sql1, io1)
cur1.close()
cur2 = conn.cursor()
cur2.copy_expert(sql2, io2)


print 'cost:', time.time() - s

cur2.close()
conn.close()
io1.close()
io2.close()

结果

one Process COPY multi table
rdb_node size: 2559 MB
rdb_node_with_all_attri_view: 2073 MB
cost: 92.9935839176

实验三:多进程访问单表

代码

def main(cpu_count):
    process_num = cpu_count - 1
    dsn = 'postgresql://postgres:pset123456@192.168.10.10/CHN_NAVINFO_2016Spr_0082_0002_108'

    sql = """
    copy (select * from rdb_node where node_id_t >= {} and node_id_t <= {} order by node_id_t) to STDOUT delimiter '|' csv header 
    """

    init_conn = psycopg2.connect(dsn)
    init_cursor = init_conn.cursor()
    init_cursor.execute('select node_id_t from rdb_node order by node_id_t')
    tile_id_list = [row[0] for row in init_cursor.fetchall()]

    def table_size(table_name, c):
        cur = c.cursor()
        cur.execute("select pg_size_pretty(pg_relation_size('%s'));" % table_name)
        s = cur.fetchone()[0]
        cur.close()
        return s

    print 'rdb_node size:', table_size('rdb_node', init_conn)
    print 'process num:', process_num

    init_cursor.close()
    init_conn.close()

    conn_pool = []
    for i in range(0, process_num):
        conn_pool.append(psycopg2.connect(dsn=dsn))

    io_pool = []
    for i in range(0, process_num):
        io_pool.append(open('b{}.csv'.format(i), 'w'))
    tile_range = []
    for i in xrange(process_num):
        start_index = (len(tile_id_list) / process_num) * i
        if i == process_num - 1:
            end_index = len(tile_id_list) - 1
        else:
            end_index = (len(tile_id_list) / process_num) * (i+1) -1

        if len(tile_range) > 0 and tile_id_list[end_index] == tile_range[-1][-1]:
                tile_range.append((tile_id_list[start_index], tile_id_list[end_index]+1))
        else:
            tile_range.append((tile_id_list[start_index], tile_id_list[end_index] + 1))

    def work(conn, io, r):
        s = time.time()
        cur = conn.cursor()
        cur.copy_expert(sql.format(*r), io)
        io.close()
        cur.close()
        conn.close()
        print 'PID {} cost: {}'.format(multiprocessing.current_process().pid, time.time() - s)

    process_pool = []
    for i in xrange(0, process_num):
        process_pool.append(multiprocessing.Process(target=work, args=(conn_pool[i], io_pool[i], tile_range[i])))

    for p in process_pool:
        p.start()

    for p in process_pool:
        p.join()


if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument('cpu_core', type=int)
    args = parser.parse_args()
    main(args.cpu_core)
    time.sleep(5)

结果

multi Process COPY one table
rdb_node size: 2559 MB
process num: 30
PID 18394 cost: 29.2145748138
...
PID 18383 cost: 38.1118938923

process num: 15
PID 18409 cost: 15.162913084
...
PID 18416 cost: 38.1842360497

process num: 8
PID 18422 cost: 30.9688498974
...
PID 18424 cost: 44.9193379879

process num: 2
PID 18443 cost: 44.0086810589
PID 18442 cost: 44.4861030579

process num: 1
PID 18448 cost: 44.115489006

结论

上一篇 下一篇

猜你喜欢

热点阅读