我必须逐行处理一个庞大的pandas.DataFrame(几十GB),其中每行 *** 作都很长(几十毫秒).所以我有了将框架拆分成块并使用多处理并行处理每个块的想法.这确实加快了任务,但内存消耗是一场噩梦.
虽然每个子进程原则上只占用一小部分数据,但它需要(几乎)与包含原始DataFrame的原始父进程一样多的内存.即使删除父进程中使用过的部分也无济于事.
我写了一个复制这种行为的最小例子.它唯一能做的就是创建一个带有随机数的大型DataFrame,将其分成最多100行的小块,并在多处理期间简单地打印一些有关DataFrame的信息(这里通过大小为4的mp.Pool).
并行执行的主要功能:
def just_wait_and_print_len_and_IDx(df): """Waits for 5 seconds and prints df length and first and last index""" # Extract some info IDx_values = df.index.values first_IDx,last_IDx = IDx_values[0],IDx_values[-1] length = len(df) pID = os.getpID() # Waste some cpu cycles time.sleep(1) # Print the info print('First IDx {},last IDx {} and len {} ' 'from process {}'.format(first_IDx,last_IDx,length,pID))
帮助生成器将DataFrame分块为小块:
def df_chunking(df,chunksize): """Splits df into chunks,drops data of original df inplace""" count = 0 # Counter for chunks while len(df): count += 1 print('Preparing chunk {}'.format(count)) # Return df chunk yIEld df.iloc[:chunksize].copy() # Delete data in place because it is no longer needed df.drop(df.index[:chunksize],inplace=True)
主要例程:
def main(): # Job parameters n_jobs = 4 # Poolsize size = (10000,1000) # Size of DataFrame chunksize = 100 # Maximum size of Frame Chunk # Preparation df = pd.DataFrame(np.random.rand(*size)) pool = mp.Pool(n_jobs) print('Starting MP') # Execute the wait and print function in parallel pool.imap(just_wait_and_print_len_and_IDx,df_chunking(df,chunksize)) pool.close() pool.join() print('DONE')
标准输出如下所示:
Starting MPPreparing chunk 1Preparing chunk 2First IDx 0,last IDx 99 and len 100 from process 9913First IDx 100,last IDx 199 and len 100 from process 9914Preparing chunk 3First IDx 200,last IDx 299 and len 100 from process 9915Preparing chunk 4...DONE
问题:
主进程需要大约120MB的内存.但是,池的子进程需要相同的内存量,尽管它们只包含原始DataFame的1%(块大小为100,原始长度为10000).为什么?
我能做些什么呢?尽管我的分块,Python(3)是否将整个DataFrame发送到每个子进程?这是大熊猫内存管理的问题还是多处理和数据酸洗的错误?谢谢!
用于简单复制和粘贴的整个脚本,以防您想自己尝试:
import multiprocessing as mpimport pandas as pdimport numpy as npimport timeimport osdef just_wait_and_print_len_and_IDx(df): """Waits for 5 seconds and prints df length and first and last index""" # Extract some info IDx_values = df.index.values first_IDx,pID))def df_chunking(df,inplace=True)def main(): # Job parameters n_jobs = 4 # Poolsize size = (10000,chunksize)) pool.close() pool.join() print('DONE')if __name__ == '__main__': main()
最佳答案好的,所以我在Sebastianopałczyński的评论中暗示了这一点.问题是子进程是从父进程分叉的,所以它们都包含对原始DataFrame的引用.但是,帧在原始进程中被 *** 作,因此写时复制行为会缓慢地终止并最终在达到物理内存限制时终止.
有一个简单的解决方案:我使用多处理的新上下文功能代替pool = mp.Pool(n_jobs):
ctx = mp.get_context('spawn')pool = ctx.Pool(n_jobs)
这可以保证池进程只是生成而不是从父进程分叉.因此,他们都没有访问原始DataFrame,并且所有这些只需要父级内存的一小部分.
请注意,mp.get_context(‘spawn’)仅在Python 3.4及更高版本中可用.
总结以上是内存溢出为你收集整理的python – Pandas和多处理内存管理:将DataFrame拆分为多个块全部内容,希望文章能够帮你解决python – Pandas和多处理内存管理:将DataFrame拆分为多个块所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)