【4】进程管理-10-并行化运行--pathos
pathos.multiprocesssing是multiprocesssing的替代,它fork的multiprocesssing,但是用dill来进行序列化处理,因此也能在类的方法中来进行多进程的处理
一、介绍
二、案例
例一
类中进行多线程运算
#self.do_sample_qc(sample):
pass
from pathos.multiprocessing import ProcessingPool as Pool
Pool(20).map(self.do_sample_qc, [sample for sample in self.chip])
例二
多线程运算,汇总得到一个结果
from pathos import multiprocessing as mp
def analysis_pseudogene(args): #以List的形式,把数据传进来
args_1 = args[0]
args_2 = args[1]
pass
args_list = []
results = []
for k1 in primers_locs.keys():
args_list.append([args_1,args_2]])
pool = mp.ProcessingPool(mp.cpu_count()-5)
results.extend(pool.map(analysis_pseudogene, args_list))
pool.close()
pool.join()
pool.clear() ## 为了同一个脚本中能接着使用Pool
例三
多线程的处理,分开保存数据(避免数据的混乱)
import multiprocessing as mp
from pathos import multiprocessing as pathos_mp
import gzip
example_data = range(100)
def process_point(point):
output = "output-%d.gz" % mp.current_process().pid
with gzip.open(output, "a+") as fout:
fout.write('%d\n' % point**2)
pool = pathos_mp.Pool(8)
pool.map(process_point, example_data)
三、报错
四、讨论
4.1 报错1 Pool not running
square = lambda x:x*x
import pathos.pools as pp
pool = pp.ProcessPool()
results = pool.map(square, range(4))
pool.close()
pool.join()
print(results)
[0, 1, 4, 9]
再使用pool,就会报错
square = lambda x:x*x
import pathos.pools as pp
pool = pp.ProcessPool()
results = pool.map(square, range(4))
pool.close()
pool.join()
print(results)
报错内容:
ValueError Traceback (most recent call last)
<ipython-input-5-81cfae6e96f9> in <module>()
3 import pathos.pools as pp
4 pool = pp.ProcessPool()
----> 5 results = pool.map(square, range(4))
6
7 pool.close()
/data/software/anaconda3/lib/python3.6/site-packages/pathos/multiprocessing.py in map(self, f, *args, **kwds)
135 AbstractWorkerPool._AbstractWorkerPool__map(self, f, *args, **kwds)
136 _pool = self._serve()
--> 137 return _pool.map(star(f), zip(*args)) # chunksize
138 map.__doc__ = AbstractWorkerPool.map.__doc__
139 def imap(self, f, *args, **kwds):
/data/software/anaconda3/lib/python3.6/site-packages/multiprocess/pool.py in map(self, func, iterable, chunksize)
264 in a list that is returned.
265 '''
--> 266 return self._map_async(func, iterable, mapstar, chunksize).get()
267
268 def starmap(self, func, iterable, chunksize=None):
/data/software/anaconda3/lib/python3.6/site-packages/multiprocess/pool.py in _map_async(self, func, iterable, mapper, chunksize, callback, error_callback)
372 '''
373 if self._state != RUN:
--> 374 raise ValueError("Pool not running")
375 if not hasattr(iterable, '__len__'):
376 iterable = list(iterable)
ValueError: Pool not running
当然,可以重启启动pool,记得join和clear一下,下次就可以正常使用了
pool.restart()
results=pool.map(square, range(4))
# the other option is to remove the old pool instance
pool.close()
pool.join()
pool.clear()
print(results)
[0, 1, 4, 9]
这会就可以了
pool = pp.ProcessPool()
results = pool.map(square, range(4))
pool.close()
pool.join()
pool.clear()
print(results)
[0, 1, 4, 9]
参考资料
这里是一个广告位,,感兴趣的都可以发邮件聊聊:tiehan@sina.cn
个人公众号,比较懒,很少更新,可以在上面提问题,如果回复不及时,可发邮件给我: tiehan@sina.cn
个人公众号,比较懒,很少更新,可以在上面提问题,如果回复不及时,可发邮件给我: tiehan@sina.cn