使用numpyscipy最大限度地减少Python multiprocessing.Pool的开销

使用numpyscipy最大限度地减少Python multiprocessing.Pool的开销,第1张

概述我花了几个小时来尝试并行化我的数字运算代码,但是当我这样做时它只会变慢.不幸的是,当我尝试将其减少到下面的示例时,问题就消失了,我真的不想在这里发布整个程序.所以问题是:在这类程序中我应该避免哪些陷阱? (注意:Unutbu的答案在底部后跟进.) 以下是情况: >它是关于一个模块,它定义了一个包含大量内部数据的类BigData.在该示例中,存在一个插值函数列表ff;在实际程序中,还有更多,例如ff 我花了几个小时来尝试并行化我的数字运算代码,但是当我这样做时它只会变慢.不幸的是,当我尝试将其减少到下面的示例时,问题就消失了,我真的不想在这里发布整个程序.所以问题是:在这类程序中我应该避免哪些陷阱?

(注意:Unutbu的答案在底部后跟进.)

以下是情况:

>它是关于一个模块,它定义了一个包含大量内部数据的类BigData.在该示例中,存在一个插值函数列表ff;在实际程序中,还有更多,例如ffA [k],ffB [k],ffC [k].
>计算将被归类为“令人尴尬的并行”:可以一次在较小的数据块上完成工作.在示例中,这是do_chunk().
>在我的实际程序中,示例中显示的方法将导致最差的性能:每个块约1秒(在单个线程中完成实际计算时间的0.1秒左右).因此,对于n = 50,do_single()将在5秒内运行,do_multi()将在55秒内运行.
>我还尝试通过将xi和yi数组切割成连续的块并迭代每个块中的所有k值来分解工作.这工作得更好一点.现在,无论是使用1,2,3或4个线程,总执行时间都没有差别.但当然,我希望看到实际的加速!
>这可能是相关的:Multiprocessing.Pool makes Numpy matrix multiplication slower.但是,在程序的其他地方,我使用多处理池进行更加孤立的计算:一个看起来像def do_chunk(array1,array2,array3)的函数(未绑定到类)并对该数组进行仅限numpy计算.在那里,有显着的速度提升.
> cpu使用率随预期的并行进程数量而变化(三个线程的cpu使用率为300%).

#!/usr/bin/python2.7import numpy as np,time,sysfrom multiprocessing import Poolfrom scipy.interpolate import RectBivariateSpline_tm=0def stopwatch(msg=''):    tm = time.time()    global _tm    if _tm==0: _tm = tm; return    print("%s: %.2f seconds" % (msg,tm-_tm))    _tm = tmclass BigData:    def __init__(self,n):        z = np.random.uniform(size=n*n*n).reshape((n,n,n))        self.ff = []        for i in range(n):            f = RectBivariateSpline(np.arange(n),np.arange(n),z[i],kx=1,ky=1)            self.ff.append(f)        self.n = n    def do_chunk(self,k,xi,yi):        s = np.sum(np.exp(self.ff[k].ev(xi,yi)))        sys.stderr.write(".")        return s    def do_multi(self,numproc,yi):        procs = []        pool = Pool(numproc)        stopwatch('Pool setup')        for k in range(self.n):            p = pool.apply_async( _do_chunk_wrapper,(self,yi))            procs.append(p)        stopwatch('Jobs queued (%d processes)' % numproc)        sum = 0.0        for k in range(self.n):            # Edit/BUGfix: replaced p.get by procs[k].get            sum += np.sum(procs[k].get(timeout=30)) # timeout allows ctrl-C interrupt            if k == 0: stopwatch("\nFirst get() done")        stopwatch('Jobs done')        pool.close()        pool.join()        return sum    def do_single(self,yi):        sum = 0.0        for k in range(self.n):            sum += self.do_chunk(k,yi)        stopwatch('\nAll in single process')        return sumdef _do_chunk_wrapper(bd,yi): # must be outsIDe class for apply_async to chunk    return bd.do_chunk(k,yi)        if __name__ == "__main__":    stopwatch()    n = 50    bd = BigData(n)    m = 1000*1000    xi,yi = np.random.uniform(0,size=m*2).reshape((2,m))    stopwatch('Initialized')    bd.do_multi(2,yi)    bd.do_multi(3,yi)    bd.do_single(xi,yi)

输出:

Initialized: 0.06 secondsPool setup: 0.01 secondsJobs queued (2 processes): 0.03 seconds..First get() done: 0.34 seconds................................................Jobs done: 7.89 secondsPool setup: 0.05 secondsJobs queued (3 processes): 0.03 seconds..First get() done: 0.50 seconds................................................Jobs done: 6.19 seconds..................................................All in single process: 11.41 seconds

计时采用Intel Core i3-3227 cpu,具有2个内核,4个线程,运行64位linux.对于实际程序,多处理版本(池机制,即使只使用一个核心)比单进程版本慢10倍.

跟进

Unutbu的回答让我走上正轨.在实际的程序中,self被腌制成一个需要传递给工作进程的37到140 MB的对象.更糟糕的是,Python酸洗非常缓慢;酸洗本身花了几秒钟,这发生在传递给工人流程的每一块工作中.除了挑选和传递大数据对象之外,linux中apply_async的开销非常小;对于一个小函数(添加几个整数参数),每个apply_async / get对只需0.2 ms.因此,以非常小的块分割工作本身并不是问题.所以,我将所有大数组参数作为索引传递给全局变量.为了cpu缓存优化,我保持小块大小.

全局变量存储在全局字典中;在设置工作池之后,将立即在父进程中删除这些条目.只有dict的密钥才会传送给工作人员.酸洗/ IPC唯一的大数据是工人创建的新数据.

#!/usr/bin/python2.7import numpy as np,sysfrom multiprocessing import Pool_mproc_data = {}  # global storage for objects during multiprocessing.class BigData:    def __init__(self,size):        self.blah = np.random.uniform(0,1,size=size)    def do_chunk(self,yi):        # do the work and return an array of the same shape as xi,yi        zi = k*np.ones_like(xi)        return zi    def do_all_work(self,yi,num_proc):        global _mproc_data        mp_key = str(ID(self))        _mproc_data['bd'+mp_key] = self # BigData        _mproc_data['xi'+mp_key] = xi        _mproc_data['yi'+mp_key] = yi        pool = Pool(processes=num_proc)        # processes have Now inherited the global variabele; clean up in the parent process        for v in ['bd','xi','yi']:            del _mproc_data[v+mp_key]        # setup indices for the worker processes (placeholder)        n_chunks = 45        n = len(xi)        chunk_len = n//n_chunks        i1List = np.arange(0,chunk_len)        i2List = i1List + chunk_len        i2List[-1] = n        kList = range(n_chunks) # placeholder        procs = []        for i in range(n_chunks):            p = pool.apply_async( _do_chunk_wrapper,(mp_key,i1List[i],i2List[i],kList[i]) )            sys.stderr.write(".")            procs.append(p)        sys.stderr.write("\n")        # allocate space for combined results        zi = np.zeros_like(xi)        # get data from workers and finish          for i,p in enumerate(procs):            zi[i1List[i]:i2List[i]] = p.get(timeout=30) # timeout allows ctrl-C handling        pool.close()        pool.join()        return zidef _do_chunk_wrapper(key,i1,i2,k):    """All arguments are small objects."""    global _mproc_data    bd = _mproc_data['bd'+key]    xi = _mproc_data['xi'+key][i1:i2]    yi = _mproc_data['yi'+key][i1:i2]    return bd.do_chunk(k,yi)if __name__ == "__main__":    xi,yi = np.linspace(1,100,100001),np.linspace(1,100001)    bd = BigData(int(1e7))    bd.do_all_work(xi,4)

以下是速度测试的结果(同样,2个内核,4个线程),改变了工作进程的数量和块中的内存量(xi,zi数组切片的总字节数).这些数字是“每秒百万结果值”,但这对比较并不重要. “1 process”的行是带有完整输入数据的do_chunk的直接调用,没有任何子进程.

#Proc   125K    250K    500K   1000K   unlimited1                                      0.82 2       4.28    1.96    1.3     1.31 3       2.69    1.06    1.06    1.07 4       2.17    1.27    1.23    1.28

数据大小对内存的影响非常大. cpu具有3 MB共享L3缓存,每个核心具有256 KB L2缓存.请注意,计算还需要访问BigData对象的几MB内部数据.因此,我们从中学到的是进行这种速度测试很有用.对于这个程序,2个进程最快,其次是4个,3个是最慢的.

解决方法 尝试减少进程间通信.
在多处理模块中,通过队列完成所有(单机)进程间通信.通过队列传递的对象
被腌制.因此,尝试通过队列发送更少和/或更小的对象.

>不要通过队列发送自我,BigData的实例.它相当大,随着自我数据量的增加而变大:

In [6]: import pickleIn [14]: len(pickle.dumps(BigData(50)))Out[14]: 1052187

一切
调用time pool.apply_async(_do_chunk_wrapper,yi)),
self在主进程中被腌制并在工作进程中被取消.该
len的大小(pickle.dumps(BigData(N)))增加N增加.
>让数据从全局变量中读取.在linux上,您可以利用copy-on-Write.如Jan-Philip Gehrcke explains:

After fork(),parent and child are in an equivalent state. It would be stupID to copy the entire memory of the parent to another place in the RAM. That’s [where] the copy-on-write principle [comes] in. As long as the child does not change its memory state,it actually accesses the parent’s memory. Only upon modification,the corresponding bits and pIEces are copIEd into the memory space of the child.

因此,您可以避免通过Queue传递BigData实例
通过简单地将实例定义为全局,bd = BigData(n),(正如您已经在做的那样)并在工作进程中引用它的值(例如_do_chunk_wrapper).它基本上等于从对pool.apply_async的调用中删除self:

p = pool.apply_async(_do_chunk_wrapper,(k_start,k_end,yi))

并将bd作为全局访问,并对do_chunk_wrapper的呼叫签名进行必要的附加更改.
>尝试将运行时间较长的函数func传递给pool.apply_async.
如果你有很多快速完成对pool.apply_async的调用,那么传递参数和通过队列返回值的开销将成为整个时间的重要部分.相反,如果你对pool.apply_async进行较少的调用,并在返回结果之前给每个func做更多工作,那么进程间通信将占总时间的一小部分.

下面,我修改了_do_chunk_wrapper以接受k_start和k_end参数,这样每次调用pool.apply_async都会在返回结果之前计算k的许多值的总和.

import mathimport numpy as npimport timeimport sysimport multiprocessing as mpimport scipy.interpolate as interpolate_tm=0def stopwatch(msg=''):    tm = time.time()    global _tm    if _tm==0: _tm = tm; return    print("%s: %.2f seconds" % (msg,n))        self.ff = []        for i in range(n):            f = interpolate.RectBivariateSpline(                np.arange(n),yi):        n = self.n        s = np.sum(np.exp(self.ff[k].ev(xi,yi)))        sys.stderr.write(".")        return s    def do_chunk_of_chunks(self,k_start,yi):        s = sum(np.sum(np.exp(self.ff[k].ev(xi,yi)))                    for k in range(k_start,k_end))        sys.stderr.write(".")        return s    def do_multi(self,yi):        procs = []        pool = mp.Pool(numproc)        stopwatch('\nPool setup')        ks = List(map(int,np.linspace(0,self.n,numproc+1)))        for i in range(len(ks)-1):            k_start,k_end = ks[i:i+2]            p = pool.apply_async(_do_chunk_wrapper,yi))            procs.append(p)        stopwatch('Jobs queued (%d processes)' % numproc)        total = 0.0        for k,p in enumerate(procs):            total += np.sum(p.get(timeout=30)) # timeout allows ctrl-C interrupt            if k == 0: stopwatch("\nFirst get() done")        print(total)        stopwatch('Jobs done')        pool.close()        pool.join()        return total    def do_single(self,yi):        total = 0.0        for k in range(self.n):            total += self.do_chunk(k,yi)        stopwatch('\nAll in single process')        return totaldef _do_chunk_wrapper(k_start,yi):     return bd.do_chunk_of_chunks(k_start,yi)

产量

Initialized: 0.15 secondsPool setup: 0.06 secondsJobs queued (2 processes): 0.00 secondsFirst get() done: 6.56 seconds83963796.0404Jobs done: 0.55 seconds..Pool setup: 0.08 secondsJobs queued (3 processes): 0.00 secondsFirst get() done: 5.19 seconds83963796.0404Jobs done: 1.57 seconds...All in single process: 12.13 seconds

与原始代码相比:

Initialized: 0.10 secondsPool setup: 0.03 secondsJobs queued (2 processes): 0.00 secondsFirst get() done: 10.47 secondsJobs done: 0.00 seconds..................................................Pool setup: 0.12 secondsJobs queued (3 processes): 0.00 secondsFirst get() done: 9.21 secondsJobs done: 0.00 seconds..................................................All in single process: 12.12 seconds
总结

以上是内存溢出为你收集整理的使用numpy / scipy最大限度地减少Python multiprocessing.Pool的开销全部内容,希望文章能够帮你解决使用numpy / scipy最大限度地减少Python multiprocessing.Pool的开销所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/1207042.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-04
下一篇 2022-06-04

发表评论

登录后才能评论

评论列表(0条)

保存