【4】进程管理-10-多进程--multiprocessing

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。

一、基本原理

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程

# multiprocessing.py
import os

print 'Process (%s) start...' % os.getpid()
pid = os.fork()
if pid==0:
    print 'I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())
else:
    print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)

运行结果如下:

Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

由于Windows没有fork调用,上面的代码在Windows上无法运行。由于Mac系统是基于BSD(Unix的一种)内核,所以,在Mac下运行是没有问题的,推荐大家用Mac学Python!

有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。

二、multiprocessing

multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该进程可以运行在Python程序内部编写的函数。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。此外multiprocessing包中也有Lock/Event/Semaphore/Condition类 (这些对象可以像多线程那样,通过参数传递给各个进程),用以同步进程,其用法与threading包中的同名类一致。所以,multiprocessing的很大一部份与threading使用同一套API,只不过换到了多进程的情境。

但在使用这些共享API的时候,我们要注意以下几点:

  • 在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。
  • multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。
  • 多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。

Process.PID中保存有PID,如果进程还没有start(),则PID为None。

2.1 创建一个进程 Process

multiprocessing模块提供了一个Process类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:

from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__=='__main__':
    print 'Parent process %s.' % os.getpid()
    p = Process(target=run_proc, args=('test',))
    print 'Process will start.'
    p.start()
    p.join()
    print 'Process end.'

执行结果如下:

Parent process 928.
Process will start.
Run child process test (929)...
Process end.

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。

join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

2.2 创建多个进程

params_list = [(1,2), (2,3), (3,4)]
ps = []
for params in params_list:
    p = Process(target=foo, args=(1,2))
    p.daemon = True
    ps.append(p)
    p.start()
for p in ps:
    # 实际上,对ps的遍历也会被p.join()阻塞。但最终执行完毕的时机是一样的
    p.join()
print('多个进程执行完毕')

2.3 利用Pool管控多进程

当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,10几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,这时候进程池Pool发挥作用的时候就到了。

Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会 创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。这里有一个简单的例子:

from multiprocessing import Pool, cpu_count
# 创建进程池,最大进程数为cpu逻辑核心数
def foo(a,b):
    time.sleep(1)
    print(a+b)

pool = Pool(cpu_count())
results = []
for params in params_list:
     # 异步添加任务
     result = pool.apply_async(foo, params)

     results.append(result)
# 关闭进程池接收
pool.close()
# 阻塞主进程
pool.join()
print('多个进程执行完毕')
# 获取结果
for result in results:
    print(result.get())

对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

由于Pool的默认大小是CPU的核数,如果你不幸拥有8核CPU,你要提交至少9个子进程才能看到上面的等待效果。

运行脚本后利用ps aux | grep pool.py查看进程情况,会发现最多只会有四个进程执行。pool.apply_async()用来向进程池提交目标请求,pool.join()是用来等待

apply_async是非阻塞;apply是阻塞的,维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去,跟串联运行没啥区别。所以现在已丢弃apply

2.4 利用Pool.imap()迭代管控多进程

这种方式利用了Python迭代特性。 在面对海量数据源和巨大计算量时,既利用了多核性能,又避免在内存中读入过多数据。

# 迭代从mongodb查询数据并处理,利用多进程提高计算效率,同时保证内存中仅存在有限几条数据

from multiprocessing import Pool, cpu_count
from pymongo import MongoClient

# 连接mongodb,获取集合testdb.test
connection = MongoClient()
db = connection.testdb
collection = db.test

# 数据处理任务
def deal_doc(doc):
    # 处理文档,如利用文档进行复杂计算
    result = 1
    return result

# 创建进程池,最大进程数为cpu逻辑核心数
pool = Pool(cpu_count())
# Pool.imap()接收可迭代对象为参数,迭代入进程池处理。
# collection.find() 返回mongodb查询游标,可迭代
iter = pool.imap(deal_doc,  collection.find())
# 迭代iter,获取计算结果。每迭代一次,利用一个线程,进行一次任务
for result in iter:
     print(result)

2.5 多个进程共享list

#!/usr/bin/python

from multiprocessing import Process, Manager

def worker(x, i, *args):
    sub_l = manager.list(x[i])
    sub_l.append(i)
    x[i] = sub_l


if __name__ == '__main__':
    manager = Manager()
    x = manager.list([[]]*5)
    print x
    p = []
    for i in range(5):
        p.append(Process(target=worker, args=(x, i)))
        p[i].start()

    for i in range(5):
        p[i].join()

    print x

输出结果:

[[0], [1], [2], [3], [4]]

2.6 进程间通信

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。

我们以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    for value in ['A', 'B', 'C']:
        print 'Put %s to queue...' % value
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    while True:
        value = q.get(True)
        print 'Get %s from queue.' % value

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

运行结果如下:

Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

在Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所有,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。

参考资料

个人公众号,比较懒,很少更新,可以在上面提问题,如果回复不及时,可发邮件给我: tiehan@sina.cn

Sam avatar
About Sam
专注生物信息 专注转化医学