Z.S.K.'s Records

python并发编程二三事(二)

年前接到一个小活,有一阶段是需要处理某一文件夹下的CSV文本文件,起初文件量不大,用python写了个小脚本批量串行处理,后来想想估计后期还有这样的需求,如果文件量较大的话,再使用单线程时间就会比较长了,所以周末改写了个并发脚本,所用时间节省也是之前的1/2(限于笔记本多进程切换消耗资源,理论应该还更短),还是比较给力,下面就总结下python的并发那点事.
有这样一句话:”Python下多线程是鸡肋,推荐使用多进程“,但是为什么这么说呢?要了解多线程多进程之前,首先要了解python的GIL.

GIL:

GIL的全称是Global Interpreter Lock(全局解释器锁),先来看看官方的解释:

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

从这段话,可以看出来几点比较重要的信息:

  1. GIL是一个防止多个线程同时执行python字节码的全局的解释器(从解释器的角度)锁(从互斥的角度)
  2. CPython的内存管理是非线程安全的

CPython非线程安全:

这里不深入讨论python虚拟机(解释器主循环)jvm的运行机制,简言之,线程安不安全主要是指多线程在访问同一段代码时会不会产生不确定的结果,如果多线程运行的结果与单线程的运行结果一致而且其它变量的值和预期的一样,则可以说是线程安全的,否之为非线程安全.
python本身就是用C写的,所以python解释器大多数默认为CPython解释器.为了利用多核,Python开始支持多线程的时候在考虑如何解决多线程之间数据完整性和状态同步的最简单方法–自然就是加锁.于是有了GIL这把超级大锁,而当越来越多的代码库开发者接受了这种设定后,他们开始大量依赖这种特性(即默认python内部对象是thread-safe的,无需在实现时考虑额外的内存锁和同步操作)
python代码的执行由python虚拟机来控制,即Python先把代码(.py文件)编译成字节码(字节码在Python虚拟机程序里对应的是PyCodeObject对象,.pyc文件是字节码在磁盘上的表现形式),交给字节码虚拟机,然后虚拟机一条一条执行字节码指令,从而完成程序的执行。同样地,虽然python解释器中可以运行多个线程,但在任意时刻,只有一个线程在解释器中运行。而对python虚拟机的访问由全局解释器锁来控制,正是由于GIL的存在,每个CPU在同一时间只能执行一个线程(在单核CPU下的多线程其实都只是并发,不是并行,并发和并行从宏观上来讲都是同时处理多路请求的概念.但并发和并行又有区别:并发通常指有多个任务需要同时进行,并行则是同一时刻有多个任务执行。用上课来举例就是,并发情况下是一个老师在同一时间段辅助不同的人功课。并行则是好几个老师分别同时辅助多个学生功课.)
python多线程的执行流程如下:

  1. 设置GIL
  2. 切换到一个线程执行
  3. 执行代码直到sleep或者是python虚拟机将其挂起
  4. 把线程设置为睡眠状态
  5. 释放GIL
  6. 循环以上步骤

可见,某个线程想要执行,必须先拿到GIL,我们可以把GIL看作是“通行证”,并且在一个python进程中GIL只有一个.拿不到通行证的线程就不允许进入CPU执行.
一般我们的代码无非就是处理两种工作:

  • CPU密集型代码(长时间占用CPU:各种循环处理、计数等等):
    在这种情况下,由于计算工作多,ticks计数很快就会达到阈值,然后触发GIL的释放与再竞争(多个线程来回切换当然是需要消耗资源的),所以python下的多线程对CPU密集型代码并不友好.
  • IO密集型代码(长时间请求IO:文件处理、网络爬虫等):
    多线程能够有效提升效率(单线程下有IO操作会进行IO等待,造成不必要的时间浪费,而开启多线程能在线程A等待时,自动切换到线程B,可以不浪费CPU的资源,从而能提升程序执行效率)。所以python的多线程对IO密集型代码比较友好.

线程颠簸:

关于多线程在多核CPU下为何执行效率不高的原因,大家可参考这篇文章UnderstandingGIL,写的非常详细.

python_concurrent7

从上图可以看出 多核情况下,CPU1上的thread1运行完之后释放GIL,而后引起操作系统调度(按照Python社区的想法,操作系统本身的线程调度已经非常成熟稳定了,没有必要自己搞一套,所以可以粗略看成python的调度是依靠操作系统),此时唤醒CPU2上的thread2,但GIL可能会马上又被CPU1拿到,导致CPU2上被唤醒后的线程thread2醒着等待到切换时间后因为没有GIL又进入待调度状态,这就是线程颠簸(thrashing),每次释放GIL锁,线程进行锁竞争、切换线程,上下文切换都会消耗资源,所以说CPU密集型多核CPU下thread效率很差.

Python3.x线程改进:

当然Python社区也在非常努力的不断改进GIL,甚至是尝试去除GIL,尤其很多的项目都依赖GIL,为了保证向前兼容,去除GIL还是困难的,所以出现了很多改善方案,先来比对一下python2.x与python3.x关于GIL的改变:在Python2.x里,GIL的释放逻辑是当前线程遇见IO操作或者ticks计数达到100(ticks可以看作是Python自身的一个计数器,专门做用于GIL,每次释放后归零,这个计数可以通过 sys.setcheckinterval 来调整),进行释放.
而在python3.x中,GIL不使用ticks计数,改为使用计时器(执行时间达到阈值后,当前线程释放GIL),这样对CPU密集型程序更加友好,但依然没有解决GIL导致的同一时间只能执行一个线程的问题,所以效率依然不尽如人意.
我们可以使用以下方案解决GIL的影响

  1. 使用多进程代替多线程

  2. 使用如JPython,Pypy等其它没有GIL的解释器

  3. 使用Python3.x(同样参考这篇文章UnderstandingGIL的第4、5章)

    1. GIL释放从基于ticks计数改成基于时间
    2. 避免最近一次CPU释放GIL后又得到
    3. 新增线程优先级,优先级低的线程会被迫让出GIL给优先级高的线程

而每个进程有各自独立的GIL,互不干扰,这样就可以真正意义上的并行执行,所以在python中,多进程的执行效率优于多线程(仅仅针对多核CPU而言)

线程/进程/协程:

这部分不在这里详细讨论,以后单独总结写成–python之线程/进程/协程及进程间通信.

线程:

  • thread
  • threading

进程:

  • subprocess
  • multiprocessing

协程:

  • yield
  • async
  • greenlet
  • gevent

线程池/进程池:

threadpool:

threadpool需要下载安装,关于threadpool的详细介绍可参考官网

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import threadpool
def worker(fileone):
#do something
#这里为了简单只统计文件了行数
with open(fileone,'r') as f:
f.readlines()
totalline = len(f)
f.close()
print("totalline length:%d" %totalline)

if __name__ == '__main__':
files = os.listdir(".")
pool = threadpool.threadPool(4)
results = threadpool.makeRequests(worker, files)
#使用列表解析
[pool.putRequest(req) for req in results]
#等待线程都执行完
pool.wait()

因为脚本主要使用了进程池,所以这里重点介绍进程模块.

multiprocessing.Pool:

这里使用官方提供的代码做案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import multiporcessing
def worker(fileone):
#do something
#这里为了简单只统计文件了行数
with open(fileone,'r') as f:
f.readlines()
totalline = len(f)
f.close()
return fileone,totalline

if __name__ == '__main__':
files = os.listdir(".")
#windows下防止出现python运行错误
multiprocessing.freeze_support()
#设定进程数为机器的cup核数
cpus = multiprocessing.cpu_count()
#初始化进程池,进程池的进程个数最好等于cpu的核数,这样避免1个CPU同时运行两个任务带来任务切换,使效率减低
pool = multiprocessing.Pool(cpus)
results = []
for x in range(len(files)):
result = pool.apply_async(worker, (files[x], ))
#返回的是class list 对象,最好在关闭进程池后使用get()方法获取
#result.get()如果写在这里则会阻塞进程,需要进程等待运行结果返回
results.append(result)
pool.close()
#join()前必须close()
pool.join()
for result in results:
print(result.get())

这是个最简单逻辑的进程池,我们知道,当进程池中任务队列非空时,才会触发worker进程去工作,那么如何向进程池中的任务队列中添加任务呢?进程池类有两组关键方法来创建任务:分别是apply/apply_async和map/map_async进程池内的apply和map方法与python内建的两个同名方法类似,apply_async和map_async分别为它们的非阻塞版.

apply与apply_async:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def apply_async(self, func, args=(), kwds={}, callback=None):
assert self._state == RUN
result = ApplyResult(self._cache, callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
#func表示执行此任务的方法
#args、kwds分别表func的位置参数和关键字参数
#callback表示一个单参数的方法,当有结果返回时,callback方法会被调用,参数即为任务执行后的结果

#同时ApplyResult有如下两个方法:
def get(self, timeout=None):
self.wait(timeout)
if not self._ready:
raise TimeoutError
if self._success:
return self._value
else:
raise self._value
def _set(self, i, obj):
self._success, self._value = obj
if self._callback and self._success:
self._callback(self._value)
self._cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
del self._cache[self._job]

通过源码可以看出这两者的区别就在当把任务加入进程池中后apply_async不需要等待进程执行,直接返回创建的ApplyResult对象,set()将运行结果保存在ApplyResult._value中,唤醒阻塞在条件变量上的get()方法。客户端通过调用get()方法,返回运行结果.这就是上面所说的apply_async()需要在join()后去get()结果.
而apply,但是它不返回ApplyResult,而是直接调用的是apply_async的get()方法得到worker进程运行的结果:

1
2
3
def apply(self, func, args=(), kwds={}):
assert self._state == RUN
return self.apply_async(func, args, kwds).get()

map与map_async:

以上的apply/apply_async方法,每次只能向进程池分配一个任务,那如果想一次分配多个任务到进程池中,可以使用map/map_async方法,
机制比较麻烦,以后有机会再研究,感兴趣的可参考这篇文章python进程池专题总结

concurrent.futures:

concurrent模块是python3.x才有的,python2.x使用的话需要先安装,详细介绍请看官方文档
concurrent模块是对线程池与进程池的再一次封装,且接口封装的非常好,代码几乎可任意在线程池/进程池中切换
concurrent.futures比较常用的方法则有submit()及map(),详细介绍请看官方文档
我们可以改写上述代码:

使用submit()版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import concurrent.futures
def worker(fileone):
#do something
#这里为了简单只统计文件了行数
with open(fileone,'r') as f:
f.readlines()
totalline = len(f)
f.close()
return totalline

if __name__ == '__main__':
files = os.listdir(".")
#with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: 则变成线程池
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
files_list = {executor.submit(worker,fileone):fileone for fileone in files}
#as_completed返回一个iterator对象
for f in concourrent.futures.as_completed(files_list):
fileone = files_list[f]
try:
totalline=f.result()
except Exception as e:
print("%s generated an exception:%s" %(fileone,e))
else:
print("%s has totalline:%d" %(fileone,totalline))

使用map()版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import concurrent.futures
def worker(fileone):
#do something
#这里为了简单只统计文件了行数
with open(fileone,'r') as f:
f.readlines()
totalline = len(f)
f.close()
return totalline
if __name__ == '__main__':
files = os.listdir(".")
#with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: 则变成线程池
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
for fileone, totalline in zip(files,executor.map(worker,files)):
print("%s has totalline:%d" %(fileone,totalline))

可以看出使用map()更加的简洁.但是map()是以files列表元素的顺序返回的,而使用as_completed()则是乱序返回.
这里挑选部分文件来比对串行处理结果与用多进程处理结果时间:
使用串行处理所用时间:

python_concurrent3

python_concurrent1

python_concurrent2

使用进程池处理结果:

python_concurrent4

python_concurrent6

python_concurrent5

说明:由于是在自己的已经用了多年的笔记本上做的测试,使用进程池的时间受笔记本实际配置的限制理论上多进程使用的时间应该要更短点.
而且从上图也可以看出,每一个图是按照文件顺序依次执行,而图2则是同时进行.
在使用单进程的时候,通过观察windwos资源管理器我们可以看到,串行处理的时候只有一个python进程
而使用进程池则可以看到有4个python进程在活动(上述代码指定开启的进程数为笔记本的CPU核数)

进程通信:

  • memory shared(Value + Array)
  • Queue
  • multiprocessing.JoinableQueue
  • Pipe
  • multiprocessing.Manager()

更多的关于进程间通信以后也单独写到这篇文章–python之线程/进程/协程及进程间通信.

参考文章:

转载请注明出处https://izsk.me


 wechat
Scan Me To Read on Phone
I know you won't do this,but what if you did?