【4】进程管理-10-并行化运行--pathos

pathos.multiprocesssing是multiprocesssing的替代,它fork的multiprocesssing,但是用dill来进行序列化处理,因此也能在类的方法中来进行多进程的处理

一、介绍

二、案例

例一

类中进行多线程运算

#self.do_sample_qc(sample):
   pass

from pathos.multiprocessing import ProcessingPool as Pool
Pool(20).map(self.do_sample_qc, [sample for sample in self.chip])

例二

多线程运算,汇总得到一个结果

from pathos import multiprocessing as mp
def analysis_pseudogene(args):   #以List的形式,把数据传进来
	args_1 = args[0]
	args_2 = args[1]
	pass

args_list = []
results = []
for k1 in primers_locs.keys():
   args_list.append([args_1,args_2]]) 

pool = mp.ProcessingPool(mp.cpu_count()-5)
results.extend(pool.map(analysis_pseudogene, args_list))
pool.close()
pool.join()
pool.clear()  ## 为了同一个脚本中能接着使用Pool

例三

多线程的处理,分开保存数据(避免数据的混乱)

import multiprocessing as mp
from pathos import multiprocessing as pathos_mp
import gzip

example_data = range(100)
def process_point(point):
    output = "output-%d.gz" % mp.current_process().pid
    with gzip.open(output, "a+") as fout:
        fout.write('%d\n' % point**2)

pool = pathos_mp.Pool(8)
pool.map(process_point, example_data)

三、报错

四、讨论

4.1 报错1 Pool not running

square = lambda x:x*x

import pathos.pools as pp
pool = pp.ProcessPool()
results = pool.map(square, range(4))

pool.close()
pool.join()

print(results)

[0, 1, 4, 9]

再使用pool,就会报错

square = lambda x:x*x

import pathos.pools as pp
pool = pp.ProcessPool()
results = pool.map(square, range(4))

pool.close()
pool.join()

print(results)

报错内容:

ValueError                                Traceback (most recent call last)
<ipython-input-5-81cfae6e96f9> in <module>()
      3 import pathos.pools as pp
      4 pool = pp.ProcessPool()
----> 5 results = pool.map(square, range(4))
      6 
      7 pool.close()

/data/software/anaconda3/lib/python3.6/site-packages/pathos/multiprocessing.py in map(self, f, *args, **kwds)
    135         AbstractWorkerPool._AbstractWorkerPool__map(self, f, *args, **kwds)
    136         _pool = self._serve()
--> 137         return _pool.map(star(f), zip(*args)) # chunksize
    138     map.__doc__ = AbstractWorkerPool.map.__doc__
    139     def imap(self, f, *args, **kwds):

/data/software/anaconda3/lib/python3.6/site-packages/multiprocess/pool.py in map(self, func, iterable, chunksize)
    264         in a list that is returned.
    265         '''
--> 266         return self._map_async(func, iterable, mapstar, chunksize).get()
    267 
    268     def starmap(self, func, iterable, chunksize=None):

/data/software/anaconda3/lib/python3.6/site-packages/multiprocess/pool.py in _map_async(self, func, iterable, mapper, chunksize, callback, error_callback)
    372         '''
    373         if self._state != RUN:
--> 374             raise ValueError("Pool not running")
    375         if not hasattr(iterable, '__len__'):
    376             iterable = list(iterable)

ValueError: Pool not running

当然,可以重启启动pool,记得join和clear一下,下次就可以正常使用了

pool.restart()

results=pool.map(square, range(4))

# the other option is to remove the old pool instance
pool.close()
pool.join() 
pool.clear()

print(results)

[0, 1, 4, 9]

这会就可以了

pool = pp.ProcessPool()
results = pool.map(square, range(4))

pool.close()
pool.join()
pool.clear()
print(results)

[0, 1, 4, 9]

参考资料

药企,独角兽,苏州。团队长期招人,感兴趣的都可以发邮件聊聊:tiehan@sina.cn
个人公众号,比较懒,很少更新,可以在上面提问题,如果回复不及时,可发邮件给我: tiehan@sina.cn