程序员程序猿阵线联盟-汇总各类技术干货

Fibonacci数列高效解法大全及时间复杂度分析 连载【9】

2018-11-24  本文已影响153人  FSS_Sosei
在数学上,斐波那契数列是以递归的方法来定义

……续上回Fibonacci数列高效解法大全及时间复杂度分析 连载【8】

在家用电脑、手机都是一堆核心,数框框的今日

自然会想到的一项提速手段是——并行运算

就以 Fibonacci数列高效解法大全及时间复杂度分析 连载【7】中第14节的“生成数列的GMP内置fib2函数解法”来说

生成从0至n的斐波那契数列是一个数算完放入结果列表,再算下一个数

这完全可以用我们的CPU多核同时算几个数,四核的话同时算四个数多好多快,完美

这就是分布在多核的多进程并行算法

四核下会是单核的4倍速吗?也就是加速比能到达4吗?

以四核为例,当实现加速比为4,那么并行算法效率就为100%

呃,那是理想状态,实际远远到不了

到不了的原因在于变成多核多进程,一个大任务要在主进程分解成几个子任务(子进程)分配到各核运算,子任务算完后再汇总回主进程,这就有了附加开销。包括任务分解、任务调度、传参、子进程上下文切换、进程间传递数据、进程间同步锁、结果合并等,这些开销很大。让效率下降不少

就生成斐波那契数列而言,实现其并行算法,中间开销最大的应该是进程间传递数据,子进程要返回主进程超大规模的数据

那么对于单机这情况,都知道通常是共享内存方式最快

可以套用时髦的概念“内存计算

那么下面我来实现一下,一个没有多少优化的原型共享内存并行算法

17.  并行共享内存解法

import time

import multiprocessing as mp

import gmpy2

import ctypes

def fib2_processes(shared_variables_for_fib_seq_results: 'array ctypes.c_ubyte', element_length_record: 'array ctypes.c_ulonglong', write_protection_pointer: 'ctypes.c_ulonglong', unread_quantity: 'ctypes.c_ulonglong', overflow_of_n: 'ctypes.c_longlong', mutexlock: 'Lock', n_iterable: 'iterable') -> None:

    start_time = time.process_time_ns()

    Fib_seq_array_size = len(shared_variables_for_fib_seq_results)

    write_pointer = 0

    for n in n_iterable:

        if (Fib_seq_array_size - write_pointer) < (n // 4):  #预估第n和n-1项两fib数总共可能占用的字节。公式是 n * 2 / 8 -> n / 4

            write_pointer = 0

            overflow_of_n.value = n

        for element in gmpy2.fib2(n):

            element_bytes = gmpy2.to_binary(element)[2:]

            element_length = len(element_bytes)

            next_write_pointer = write_pointer + element_length

            while (overflow_of_n.value > 0) and (next_write_pointer > write_protection_pointer.value):

                pass  #在溢出状态未消除情况下,如果要写入的范围高于写保护,就空转等待。

            element_length_record[n] = element_length

            with mutexlock:

                shared_variables_for_fib_seq_results[write_pointer: next_write_pointer] = element_bytes

                unread_quantity.value += 1

            write_pointer = next_write_pointer

            n -= 1

    end_time = time.process_time_ns()

    print ('计算和序列化用时 %.1f seconds.' % ((end_time - start_time)/10**9))

    return None

def Fibonacci_sequence_21 (n: int) -> list:  #参数n是表示求n项Fibonacci数列

    '多进程共享内存的返回列表的GMP内置fib函数解法'

    assert isinstance(n, int), 'n is an error of non-integer type.'

    if n>=0:

        start_time = time.process_time_ns()

        Number_of_processes = max(mp.cpu_count() - 1, 1)

        size_of_shared_variables_for_fib_processes = 1 * (1024 ** 2) // Number_of_processes  #就是占用的内存数量,单位Byte。注意太大的n也要随之加大这个尺寸

        list_of_shared_variables_for_fib_seq_results = []

        list_of_element_length_record = []

        list_of_write_protection_pointer = []

        list_of_unread_quantity = []

        list_of_overflow_n = []

        list_of_mutexlock = []

        for i in range(Number_of_processes):

            list_of_shared_variables_for_fib_seq_results.append(mp.RawArray(ctypes.c_ubyte, size_of_shared_variables_for_fib_processes))

            list_of_element_length_record.append(mp.RawArray(ctypes.c_ulonglong, n + 1))

            list_of_write_protection_pointer.append(mp.RawValue(ctypes.c_ulonglong, size_of_shared_variables_for_fib_processes - 1))

            list_of_unread_quantity.append(mp.RawValue(ctypes.c_ulonglong, 0))

            list_of_overflow_n.append(mp.RawValue(ctypes.c_longlong, -1))

            list_of_mutexlock.append(mp.Lock())

        list_of_n_iterable_for_fib2_processes = [range(n - i * 2, 0, -(Number_of_processes * 2)) for i in range(Number_of_processes)]

        fib2_process_list = [None] * Number_of_processes

        for i in range(Number_of_processes):

            fib2_process_list[i] = mp.Process(target = fib2_processes, args = (list_of_shared_variables_for_fib_seq_results[i], list_of_element_length_record[i], list_of_write_protection_pointer[i], list_of_unread_quantity[i], list_of_overflow_n[i], list_of_mutexlock[i], list_of_n_iterable_for_fib2_processes[i]))

            fib2_process_list[i].start()

        fib_list = [None] * (n + 1)

        n_list_for_fib2_processes = []

        for n_iterable in list_of_n_iterable_for_fib2_processes:

            n_list_for_fib2_processes.append(list(n_iterable))

        list_of_pointers_to_n_list = [0] * Number_of_processes

        list_of_number_of_reciprocating = [0] * Number_of_processes

        list_of_read_pointer =  [0] * Number_of_processes

        while True:

            if mp.active_children() == []:  #检测 当全部子进程都不是活的以后,进行下一个检测准备结束循环

                for unread_quantity in list_of_unread_quantity:

                    if unread_quantity.value != 0: break

                else:    #检测当所有进程结果的未读数都等于零就结束循环

                    break

            for i in range(Number_of_processes):

                if list_of_unread_quantity[i].value != 0:

                    n_for_fib2_processes = n_list_for_fib2_processes[i][list_of_pointers_to_n_list[i]] - list_of_number_of_reciprocating[i]

                    if n_for_fib2_processes != list_of_overflow_n[i].value:

                        next_read_pointer = list_of_read_pointer[i] + list_of_element_length_record[i][n_for_fib2_processes]

                        fib_list[n_for_fib2_processes] = int.from_bytes(bytes(list_of_shared_variables_for_fib_seq_results[i][list_of_read_pointer[i]: next_read_pointer]), 'little')

                        with list_of_mutexlock[i]:

                            list_of_write_protection_pointer[i].value = next_read_pointer

                            list_of_unread_quantity[i].value -= 1

                        list_of_read_pointer[i] = next_read_pointer

                        list_of_pointers_to_n_list[i] += list_of_number_of_reciprocating[i]

                        list_of_number_of_reciprocating[i] ^= 1

                    else:

                        list_of_read_pointer[i] = 0

                        with list_of_mutexlock[i]:

                            list_of_write_protection_pointer[i].value = 0

                            list_of_overflow_n[i].value = -1

        if n & 1 == 0:

            fib_list[0] = 0

        end_time = time.process_time_ns()

        print ('主进程用时 %.1f seconds.' % ((end_time - start_time)/10**9))

        return fib_list

    else:

        return None

if __name__ == '__main__':

        start_time = time.perf_counter()

        Fibonacci_sequence_21(1000000)

        end_time = time.perf_counter()

        print ('最终用时 %.1f seconds.' % (end_time - start_time))

在我的四核16GB物理内存电脑上算100万斐波那契数列(数据总占用内存40GB,所以包括虚拟内存)用时为:

1384秒

在之前14节单进程解法算100万斐波那契数列用时为:

1857秒

缩短用时25.5%

然而这是4核并行,并行效率为33.5%。并不高效的并行算法

各位大佬,以14节那个单进程解法为基础,来尝试写出你们的高效并行解法,在下方留言吧

结尾,嘿,欢迎点赞和赞赏支持

未完待续……

上一篇下一篇

猜你喜欢

热点阅读