【4】进程管理-11-多线程--threading

进程就是一个应用程序在处理机上的一次执行过程,它是一个动态的概念,而线程是进程中的一部分,进程包含多个线程在运行。threading通过对thread模块进行二次封装,提供了更方便的API来操作线程。这里将介绍如何通过多线程去加速你得Python程序的进程。

一、单线程

例子:

from time import ctime,sleep,time
def music():
	for i in range(2):
		print "I was listening to music. %s" %ctime()
		sleep(1)
def move():
	for i in range(2):
		print "I was at the movies! %s" %ctime()
		sleep(5)

if __name__ == '__main__':
	time_start =time()
	music()
	move()
	print "all over %s" %ctime()
	print time()-time_start

运行结果:

 I was listening to music. Sun Aug 9 12:44:52 2015
 I was listening to music. Sun Aug 9 12:44:53 2015
 I was at the movies! Sun Aug 9 12:44:54 2015
 I was at the movies! Sun Aug 9 12:44:59 2015
 all over Sun Aug 9 12:45:04 2015
 12.0174899101

其实,music()和move()更应该被看作是音乐和视频播放器,至于要播放什么歌曲和视频应该由我们使用时决定。所以,我们对上面代码做了改造:

#coding=utf-8
import threading
from time import ctime,sleep,time

def music(func):
	for i in range(2):
		print "I was listening to %s. %s" %(func,ctime())
		sleep(1)

def move(func):
	for i in range(2):
		print "I was at the %s! %s" %(func,ctime())
		sleep(5)

if __name__ == '__main__':
	time_start=time()
	music(u'爱情买卖')
	move(u'阿凡达')
	print "all over %s" %ctime()
	print time()- time_start

运行结果:

 I was listening to 爱情买卖. Sun Aug 9 12:59:45 2015
 I was listening to 爱情买卖. Sun Aug 9 12:59:46 2015
 I was at the 阿凡达! Sun Aug 9 12:59:47 2015
 I was at the 阿凡达! Sun Aug 9 12:59:52 2015
 all over Sun Aug 9 12:59:57 2015
 12.0150828362

对music()和move()进行了传参处理。体验中国经典歌曲和欧美大片文化。

二、多线程

Thread 是threading模块中最重要的类之一,可以使用它来创建线程。有两种方式来创建线程:

  • 创建线程要执行的函数,把这个函数传递进Thread对象里,让它来执行;
  • 继承Thread类,创建一个新的class,将要执行的代码 写到run函数里面。

方法一:(创建函数并且传入Thread 对象中:)

#coding=utf-8
import threading
from time import ctime,sleep,time
def music(func):
	for i in range(2):
		print "I was listening to %s. %s" %(func,ctime())
        
		sleep(1)
def move(func):
	for i in range(2):
		print "I was at the %s! %s" %(func,ctime())
		sleep(5)
threads = []
t1 = threading.Thread(target=music,args=(u'爱情买卖',))
threads.append(t1)
t2 = threading.Thread(target=move,args=(u'阿凡达',))
threads.append(t2)
if __name__ == '__main__':
	time_start=time()
	for t in threads:
		t.setDaemon(True)
		t.start()
	print "all over %s" %ctime()
	print time()- time_start

代码解析:

import threading #首先导入threading 模块,这是使用多线程的前提。
threads = []
t1 = threading.Thread(target=music,args=(u'爱情买卖',))
threads.append(t1)  

创建了threads数组,创建线程t1,使用threading.Thread()方法,在这个方法中调用music方法target=music,args方法对music进行传参。 把创建好的线程t1装到threads数组中。

接着以同样的方式创建线程t2,并把t2也装到threads数组。

for t in threads:
	t.setDaemon(True)
	t.start()

最后通过for循环遍历数组。(数组被装载了t1和t2两个线程)

setDaemon()

setDaemon(True)将线程声明为守护线程,必须在start() 方法调用之前设置,如果不设置为守护线程程序会被无限挂起。子线程启动后,父线程也继续执行下去,当父线程执行完最后一条语句print “all over %s” %ctime()后,没有等待子线程,直接就退出了,同时子线程也一同结束。

start() #开始线程活动。

运行结果:

 I was listening to 爱情买卖. Sun Aug 9 13:03:41 2015
 I was at the 阿凡达! Sun Aug 9 13:03:41 2015
 all over Sun Aug 9 13:03:41 2015
 0.000370979309082

从执行结果来看,子线程(muisc 、move )和主线程(print “all over %s” %ctime())都是同一时间启动,但由于主线程执行完结束,所以导致子线程也终止。

进行改进:

if __name__ == '__main__':
	time_start=time()
	for t in threads:
		t.setDaemon(True)
		t.start()
	 t.join()
	print "all over %s" %ctime()
	print time()- time_start

我们只对上面的程序加了个join()方法,用于等待线程终止。join()的作用是,在子线程完成运行之前,这个子线程的父线程将一直被阻塞。 注意: join()方法的位置是在for循环外的,也就是说必须等待for循环里的两个进程都结束后,才去执行主进程。如果放到for循环里面去了,这个多线程就没有作用了,不信,可以try。

运行结果:

 I was listening to 爱情买卖. Sun Aug 9 13:08:29 2015
 I was at the 阿凡达! Sun Aug 9 13:08:29 2015
 I was listening to 爱情买卖. Sun Aug 9 13:08:30 2015
 I was at the 阿凡达! Sun Aug 9 13:08:34 2015
 all over Sun Aug 9 13:08:39 2015
 10.0077209473

总耗时为10秒。从单线程时减少了2秒,我们可以把music的sleep()的时间调整为4秒。 单线程的时间增加了,但是双线程的总时间没有变化。

细心的你也会会发现,让第一个程序提早介绍的原因并不是join()没加入,而是t.setDaemon(True),如果第二个程序不加t.setDaemon(True),程序能完整运行下来,那么问题来了,这个setDaemon究竟在里面是什么作用?? 当没有存活的非守护进程时,整个python程序才会退出。

也就是说:如果主线程执行完以后,还有其他非守护线程,主线程是不会退出的,会被无限挂起;必须将线程声明为守护线程之后,如果队列中的数据运行完了,那么整个程序想什么时候退出就退出,不用等待。 setDaemon这个方法基本和join是相反的。当我们在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就分兵两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是,只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以用setDaemon方法啦

例二:

import threading,time
from time import sleep, ctime
def now() :
	return str(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()))
def test(nloop, nsec):
	print 'start loop', nloop, 'at:', now()
	sleep(nsec)
	print 'loop', nloop, 'done at:', now()
def main():
	print 'starting at:', now()
	threadpool=[]
    
	for i in xrange(10):
        
		th = threading.Thread(target=test, args=(i, 2))
		threadpool.append(th)
	for th in threadpool:
		th.start()
	#for th in threadpool:
		#threading.Thread.join(th)
		th.join()
	print 'all Done at:', now()
if __name__ == '__main__':
	 main()

运行结果:

tanqianshan@promote:~/python$ python test_thread.py
starting at: 2015-08-09 13:46:29
start loop 0 at: 2015-08-09 13:46:29
 start loopstart loop 2  1 at:at: 2015-08-09 13:46:29
 2015-08-09 13:46:29
 start loop 3 at: 2015-08-09 13:46:29
start loop 4 at: 2015-08-09 13:46:29
start loop 5 at: 2015-08-09 13:46:29
 start loop 6 at: 2015-08-09 13:46:29
 start loop 7start loop 8 at: 2015-08-09 13:46:29
 at:start loop 9 2015-08-09 13:46:29
 at: 2015-08-09 13:46:29
looploop looploop 5looploop 6   20loop  done at:3 2015-08-09 13:46:31
  done at: 2015-08-09 13:46:31
loop8  done at:loop 1 loop 7   2015-08-09 13:46:31
done at: 2015-08-09 13:46:31
done at:9 2015-08-09 13:46:31
done at: 2015-08-09 13:46:31
 4 done at: 2015-08-09 13:46:31
 done at: 2015-08-09 13:46:31
 done at: 2015-08-09 13:46:31
done at: 2015-08-09 13:46:31
all Done at: 2015-08-09 13:46:31

问题:为什么输出的不完整呢?

方法二:

import threading ,time
from time import sleep, ctime
def now() :
	return str( time.strftime( '%Y-%m-%d %H:%M:%S' , time.localtime() ) )
class myThread (threading.Thread) :
	"""docstring for myThread"""
	def __init__(self, nloop, nsec) :
		super(myThread, self).__init__()
		self.nloop = nloop
		self.nsec = nsec
	def run(self):
		print 'start loop', self.nloop, 'at:', ctime()
		sleep(self.nsec)
		print 'loop', self.nloop, 'done at:', ctime()
def main():
	thpool=[]
	print 'starting at:',now()
	for i in xrange(10):
		thpool.append(myThread(i,2))
	for th in thpool:
		th.start()
	for th in thpool:
		th.join()
	print 'all Done at:', now()

if __name__ == '__main__':
	main()

运行结果:

tanqianshan@promote:~/python$ python test_thread.py
starting at: 2015-08-09 13:57:51
start loop 0start loop  at: Sun Aug  9 13:57:51 2015
start loop 21  at:at: Sun Aug  9 13:57:51 2015
 Sun Aug  9 13:57:51 2015
start loop 3 at: Sun Aug  9 13:57:51 2015
start loop 4 at:start loop  Sun Aug  9 13:57:51 2015
 start loop 56  at:at:start loop 7  Sun Aug  9 13:57:51 2015
 Sun Aug  9 13:57:51 2015
at: Sun Aug  9 13:57:51 2015
start loop 8 at: Sun Aug  9 13:57:51 2015
 start loop 9 at: Sun Aug  9 13:57:51 2015
loop 0 done at: Sun Aug  9 13:57:53 2015
loop loop 4loop loop 82 loop loop6  7looploop loop 5done at:done at: Sun Aug  9 13:57:53 2015
 3  done at:1  Sun Aug  9 13:57:53 2015
 done at:  Sun Aug  9 13:57:53 2015
 done at: Sun Aug  9 13:57:53 2015
 done at: 9 done at: Sun Aug  9 13:57:53 2015
done at: Sun Aug  9 13:57:53 2015
Sun Aug  9 13:57:53 2015
Sun Aug  9 13:57:53 2015
done at: Sun Aug  9 13:57:53 2015
all Done at: 2015-08-09 13:57:53

三、Lock

多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。

来看看多个线程同时操作一个变量怎么把内容给改乱了:

import time, threading

# 假定这是你的银行存款:
balance = 0

def change_it(n):
    # 先存后取,结果应该为0:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(100000):
        change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print balance

我们定义了一个共享变量balance,初始值为0,并且启动两个线程,先存后取,理论上结果应该为0,但是,由于线程的调度是由操作系统决定的,当t1、t2交替执行时,只要循环次数足够多,balance的结果就不一定是0了。

原因是因为高级语言的一条语句在CPU执行时是若干条语句,即使一个简单的计算:

balance = balance + n

也分两步:

  1. 计算balance + n,存入临时变量中;
  2. 将临时变量的值赋给balance。

也就是可以看成:

x = balance + n
balance = x

由于x是局部变量,两个线程各自都有自己的x,当代码正常执行时:

初始值 balance = 0

t1: x1 = balance + 5 # x1 = 0 + 5 = 5
t1: balance = x1     # balance = 5
t1: x1 = balance - 5 # x1 = 5 - 5 = 0
t1: balance = x1     # balance = 0

t2: x2 = balance + 8 # x2 = 0 + 8 = 8
t2: balance = x2     # balance = 8
t2: x2 = balance - 8 # x2 = 8 - 8 = 0
t2: balance = x2     # balance = 0

结果 balance = 0

但是t1和t2是交替运行的,如果操作系统以下面的顺序执行t1、t2:

初始值 balance = 0

t1: x1 = balance + 5  # x1 = 0 + 5 = 5

t2: x2 = balance + 8  # x2 = 0 + 8 = 8
t2: balance = x2      # balance = 8

t1: balance = x1      # balance = 5
t1: x1 = balance - 5  # x1 = 5 - 5 = 0
t1: balance = x1      # balance = 0

t2: x2 = balance - 5  # x2 = 0 - 5 = -5
t2: balance = x2      # balance = -5

结果 balance = -5

究其原因,是因为修改balance需要多条语句,而执行这几条语句时,线程可能中断,从而导致多个线程把同一个对象的内容改乱了。

两个线程同时一存一取,就可能导致余额不对,你肯定不希望你的银行存款莫名其妙地变成了负数,所以,我们必须确保一个线程在修改balance的时候,别的线程一定不能改。

如果我们要确保balance计算正确,就要给change_it()上一把锁,当某个线程开始执行change_it()时,我们说,该线程因为获得了锁,因此其他线程不能同时执行change_it(),只能等待,直到锁被释放后,获得该锁以后才能改。由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。创建一个锁就是通过threading.Lock()来实现:

balance = 0
lock = threading.Lock()

def run_thread(n):
    for i in range(100000):
        # 先要获取锁:
        lock.acquire()
        try:
            # 放心地改吧:
            change_it(n)
        finally:
            # 改完了一定要释放锁:
            lock.release()

当多个线程同时执行lock.acquire()时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止。

获得锁的线程用完后一定要释放锁,否则那些苦苦等待锁的线程将永远等待下去,成为死线程。所以我们用try…finally来确保锁一定会被释放。

锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行,坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。

四、多核CPU

如果你不幸拥有一个多核CPU,你肯定在想,多核应该可以同时执行多个线程。

如果写一个死循环的话,会出现什么情况呢?

打开Mac OS X的Activity Monitor,或者Windows的Task Manager,都可以监控某个进程的CPU使用率。

我们可以监控到一个死循环线程会100%占用一个CPU。

如果有两个死循环线程,在多核CPU中,可以监控到会占用200%的CPU,也就是占用两个CPU核心。

要想把N核CPU的核心全部跑满,就必须启动N个死循环线程。

试试用Python写个死循环:

import threading, multiprocessing

def loop():
    x = 0
    while True:
        x = x ^ 1

for i in range(multiprocessing.cpu_count()):
    t = threading.Thread(target=loop)
    t.start()

启动与CPU核心数量相同的N个线程,在4核CPU上可以监控到CPU占用率仅有160%,也就是使用不到两核。

即使启动100个线程,使用率也就170%左右,仍然不到两核。

但是用C、C++或Java来改写相同的死循环,直接可以把全部核心跑满,4核就跑到400%,8核就跑到800%,为什么Python不行呢?

因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。

不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

四、总结

  • 多线程编程,模型复杂,容易发生冲突,必须用锁加以隔离,同时,又要小心死锁的发生。
  • Python解释器由于设计时有GIL全局锁,导致了多线程无法利用多核。多线程的并发在Python中就是一个美丽的梦

参考资料

药企,独角兽,苏州。团队长期招人,感兴趣的都可以发邮件聊聊:tiehan@sina.cn
个人公众号,比较懒,很少更新,可以在上面提问题,如果回复不及时,可发邮件给我: tiehan@sina.cn