您的代码中的这一行:
pool.map(calc_dist, ['lat','lon'])
产生2个进程-
一个运行
calc_dist('lat'),另一个运行
calc_dist('lon')。比较doc中的第一个示例。(基本上,
pool.map(f,[1,2,3])调用
f在下面的列表中给出的参数三次
f(1),
f(2)和
f(3))。如果我没有记错的话,你的函数
calc_dist只能叫
calc_dist('lat','lon')。而且它不允许并行处理。解
我相信您希望将工作拆分到多个进程之间,可能将每个元组发送
(grp, lst)到一个单独的进程。以下代码正是这样做的。
首先,让我们准备拆分:
grp_lst_args = list(df.groupby('co_nm').groups.items())print(grp_lst_args)[('aa', [0, 1, 2]), ('cc', [7, 8, 9]), ('bb', [3, 4, 5, 6])]
我们将在单独的进程中将每个元组(这里有三个)作为参数传递给函数。我们需要重写函数,我们称它为
calc_dist2。为了方便起见,它的参数是一个元组,如
calc_dist2(('aa',[0,1,2]))
def calc_dist2(arg): grp, lst = arg return pd.Dataframe( [ [grp, df.loc[c[0]].ser_no, df.loc[c[1]].ser_no, vincenty(df.loc[c[0], ['lat','lon']], df.loc[c[1], ['lat','lon']]) ] for c in combinations(lst, 2) ], columns=['co_nm','machineA','machineB','distance'])
现在是多重处理:
pool = mp.Pool(processes = (mp.cpu_count() - 1))results = pool.map(calc_dist2, grp_lst_args)pool.close()pool.join()results_df = pd.concat(results)
results是结果的通话清单(此处数据帧)
calc_dist2((grp,lst))为
(grp,lst)在
grp_lst_args。的元素
results随后被连接到一个数据帧。
print(results_df) co_nm machineA machineB distance0 aa 1 2 156.876149391 km1 aa 1 3 313.705445447 km2 aa 2 3 156.829329105 km0 cc 8 9 156.060165391 km1 cc 8 0 311.910998169 km2 cc 9 0 155.851498134 km0 bb 4 5 156.665641837 km1 bb 4 6 313.214333025 km2 bb 4 7 469.622535339 km3 bb 5 6 156.548897414 km4 bb 5 7 312.957597466 km5 bb 6 7 156.40899677 km
顺便说一句,在Python 3中,我们可以使用以下
with构造:
with mp.Pool() as pool: results = pool.map(calc_dist2, grp_lst_args)
更新资料
我仅在linux上测试了此代码。在Linux上,
df子进程可以访问只读数据帧,并且不会将其复制到其内存空间中,但是我不确定它在Windows上如何正常工作。您可以考虑将其拆分
df为多个块(按分组
co_nm),然后将这些块作为参数发送给的其他版本的
calc_dist。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)